This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new fbdbd40eda [To rel/0.12][IOTDB-3392] File doesn't exist when move
tsfile when virtual_storage_group_num > 1 (#6283)
fbdbd40eda is described below
commit fbdbd40edadf572ad50c9aa5b869aec63fe917e2
Author: Chen YZ <[email protected]>
AuthorDate: Wed Jun 15 17:27:19 2022 +0800
[To rel/0.12][IOTDB-3392] File doesn't exist when move tsfile when
virtual_storage_group_num > 1 (#6283)
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 73 +++++++-
.../virtualSg/VirtualStorageGroupManager.java | 38 +++++
.../db/integration/IoTDBLoadExternalTsfileIT.java | 8 +-
.../IoTDBLoadExternalTsfileWithVirtualSGIT.java | 189 +++++++++++++++++++++
4 files changed, 302 insertions(+), 6 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 4e2e81dd4c..fb19d1aced 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -498,6 +498,24 @@ public class StorageEngine implements IService {
}
}
+ /**
+ * This method is for sync, delete tsfile or sth like them, just get storage
group directly by
+ * virtualStorageGroupId
+ *
+ * @param path storage group path
+ * @param virtualStorageGroupId virtual storage group partition id
+ * @return storage group processor
+ */
+ public StorageGroupProcessor getProcessorDirectly(PartialPath path, int
virtualStorageGroupId)
+ throws StorageEngineException {
+ try {
+ StorageGroupMNode storageGroupMNode =
IoTDB.metaManager.getStorageGroupNodeByPath(path);
+ return getStorageGroupProcessorByPath(storageGroupMNode,
virtualStorageGroupId);
+ } catch (StorageGroupProcessorException | MetadataException e) {
+ throw new StorageEngineException(e);
+ }
+ }
+
/**
* This method is for insert and query or sth like them, this may get a
virtual storage group
*
@@ -567,6 +585,40 @@ public class StorageEngine implements IService {
return virtualStorageGroupManager.getProcessor(devicePath,
storageGroupMNode);
}
+ /**
+ * get storage group processor by virtualStorageGroupId
+ *
+ * @param storageGroupMNode mnode of the storage group, we need synchronize
this to avoid
+ * modification in mtree
+ * @param virtualStorageGroupId virtual storage group partition id
+ * @return found or new storage group processor
+ */
+ @SuppressWarnings("java:S2445")
+ // actually storageGroupMNode is a unique object on the mtree, synchronize
it is reasonable
+ private StorageGroupProcessor getStorageGroupProcessorByPath(
+ StorageGroupMNode storageGroupMNode, int virtualStorageGroupId)
+ throws StorageGroupProcessorException, StorageEngineException {
+ VirtualStorageGroupManager virtualStorageGroupManager =
+ processorMap.get(storageGroupMNode.getPartialPath());
+ if (virtualStorageGroupManager == null) {
+ // if finish recover
+ if (isAllSgReady.get()) {
+ synchronized (this) {
+ virtualStorageGroupManager =
processorMap.get(storageGroupMNode.getPartialPath());
+ if (virtualStorageGroupManager == null) {
+ virtualStorageGroupManager = new VirtualStorageGroupManager();
+ processorMap.put(storageGroupMNode.getPartialPath(),
virtualStorageGroupManager);
+ }
+ }
+ } else {
+ // not finished recover, refuse the request
+ throw new StorageGroupNotReadyException(
+ storageGroupMNode.getFullPath(),
TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
+ }
+ }
+ return virtualStorageGroupManager.getProcessor(virtualStorageGroupId,
storageGroupMNode);
+ }
+
/**
* build a new storage group processor
*
@@ -881,12 +933,14 @@ public class StorageEngine implements IService {
}
/** delete all data of storage groups' timeseries. */
+ @TestOnly
public synchronized boolean deleteAll() {
logger.info("Start deleting all storage groups' timeseries");
syncCloseAllProcessor();
for (PartialPath storageGroup :
IoTDB.metaManager.getAllStorageGroupPaths()) {
this.deleteAllDataFilesInOneStorageGroup(storageGroup);
}
+ processorMap.clear();
return true;
}
@@ -938,13 +992,17 @@ public class StorageEngine implements IService {
public boolean deleteTsfile(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
- return getProcessorDirectly(new
PartialPath(getSgByEngineFile(deletedTsfile)))
+ return getProcessorDirectly(
+ new PartialPath(getSgByEngineFile(deletedTsfile)),
+ getVirtualSgIdByEngineFile(deletedTsfile))
.deleteTsfile(deletedTsfile);
}
public boolean moveTsfile(File tsfileToBeMoved, File targetDir)
throws StorageEngineException, IllegalPathException {
- return getProcessorDirectly(new
PartialPath(getSgByEngineFile(tsfileToBeMoved)))
+ return getProcessorDirectly(
+ new PartialPath(getSgByEngineFile(tsfileToBeMoved)),
+ getVirtualSgIdByEngineFile(tsfileToBeMoved))
.moveTsfile(tsfileToBeMoved, targetDir);
}
@@ -959,6 +1017,17 @@ public class StorageEngine implements IService {
return file.getParentFile().getParentFile().getParentFile().getName();
}
+ /**
+ * The internal file means that the file is in the engine, which is
different from those external
+ * files which are not loaded.
+ *
+ * @param file internal file
+ * @return virtual storage group partition id
+ */
+ public int getVirtualSgIdByEngineFile(File file) {
+ return Integer.parseInt(file.getParentFile().getParentFile().getName());
+ }
+
/** @return TsFiles (seq or unseq) grouped by their storage group and
partition number. */
public Map<PartialPath, Map<Long, List<TsFileResource>>>
getAllClosedStorageGroupTsFile() {
Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index 3af47c5731..650c297223 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -152,6 +152,44 @@ public class VirtualStorageGroupManager {
return processor;
}
+ /**
+ * get processor from loc
+ *
+ * @param virtualStorageGroupId virtual storage group id
+ * @return virtual storage group processor
+ */
+ @SuppressWarnings("java:S2445")
+ // actually storageGroupMNode is a unique object on the mtree, synchronize
it is reasonable
+ public StorageGroupProcessor getProcessor(
+ int virtualStorageGroupId, StorageGroupMNode storageGroupMNode)
+ throws StorageGroupProcessorException, StorageEngineException {
+
+ StorageGroupProcessor processor =
virtualStorageGroupProcessor[virtualStorageGroupId];
+ if (processor == null) {
+ // if finish recover
+ if (StorageEngine.getInstance().isAllSgReady()) {
+ synchronized (storageGroupMNode) {
+ processor = virtualStorageGroupProcessor[virtualStorageGroupId];
+ if (processor == null) {
+ processor =
+ StorageEngine.getInstance()
+ .buildNewStorageGroupProcessor(
+ storageGroupMNode.getPartialPath(),
+ storageGroupMNode,
+ String.valueOf(virtualStorageGroupId));
+ virtualStorageGroupProcessor[virtualStorageGroupId] = processor;
+ }
+ }
+ } else {
+ // not finished recover, refuse the request
+ throw new StorageGroupNotReadyException(
+ storageGroupMNode.getFullPath(),
TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
+ }
+ }
+
+ return processor;
+ }
+
/**
* recover
*
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 1479a09bf9..ed256aa1de 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -61,7 +61,7 @@ public class IoTDBLoadExternalTsfileIT {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
- private static String[] insertSequenceSqls =
+ protected static String[] insertSequenceSqls =
new String[] {
"SET STORAGE GROUP TO root.vehicle",
"SET STORAGE GROUP TO root.test",
@@ -127,7 +127,7 @@ public class IoTDBLoadExternalTsfileIT {
private static final String TEST_D0_S0_STR = "root.test.d0.s0";
private static final String TEST_D0_S1_STR = "root.test.d0.s1";
private static final String TEST_D1_STR = "root.test.d1.g0.s0";
- private static int virtualPartitionNum = 0;
+ protected static int virtualPartitionNum = 0;
private static String[] deleteSqls =
new String[] {"DELETE STORAGE GROUP root.vehicle", "DELETE STORAGE GROUP
root.test"};
@@ -137,11 +137,11 @@ public class IoTDBLoadExternalTsfileIT {
IoTDBDescriptor.getInstance()
.getConfig()
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ virtualPartitionNum =
IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
IoTDBDescriptor.getInstance().getConfig().setVirtualStorageGroupNum(1);
HashVirtualPartitioner.getInstance().setStorageGroupNum(1);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
- virtualPartitionNum =
IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
Class.forName(Config.JDBC_DRIVER_NAME);
prepareData(insertSequenceSqls);
}
@@ -812,7 +812,7 @@ public class IoTDBLoadExternalTsfileIT {
}
}
- private void prepareData(String[] sqls) {
+ protected void prepareData(String[] sqls) {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
new file mode 100644
index 0000000000..91647df890
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import
org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class IoTDBLoadExternalTsfileWithVirtualSGIT extends
IoTDBLoadExternalTsfileIT {
+ @Before
+ public void setUp() throws Exception {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ virtualPartitionNum =
IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
+ IoTDBDescriptor.getInstance().getConfig().setVirtualStorageGroupNum(2);
+ HashVirtualPartitioner.getInstance().setStorageGroupNum(2);
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ StorageEngine.getInstance().reset();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData(insertSequenceSqls);
+ }
+
+ @Test
+ public void moveTsfileWithVSGTest() throws SQLException {
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+
+ // move root.vehicle
+ File vehicleDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator +
"root.vehicle");
+ List<File> vehicleFiles = getTsFilePaths(vehicleDir);
+ File tmpDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ "tmp" + File.separator + new PartialPath("root.vehicle"));
+ if (!tmpDir.exists()) {
+ tmpDir.mkdirs();
+ }
+ for (File tsFile : vehicleFiles) {
+ statement.execute(String.format("move \"%s\" \"%s\"",
tsFile.getAbsolutePath(), tmpDir));
+ }
+ assertEquals(0, getTsFilePaths(vehicleDir).size());
+ assertNotNull(tmpDir.listFiles());
+ assertEquals(2, tmpDir.listFiles().length >> 1);
+ //
+ // // move root.test
+ File testDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator +
"root.test");
+ List<File> testFiles = getTsFilePaths(testDir);
+ tmpDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ "tmp" + File.separator + new PartialPath("root.test"));
+ if (!tmpDir.exists()) {
+ tmpDir.mkdirs();
+ }
+ for (File tsFile : testFiles) {
+ statement.execute(String.format("move \"%s\" \"%s\"",
tsFile.getAbsolutePath(), tmpDir));
+ }
+ assertEquals(0, getTsFilePaths(testDir).size());
+ assertNotNull(tmpDir.listFiles());
+ assertEquals(2, tmpDir.listFiles().length >> 1);
+ } catch (IllegalPathException e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void removeTsfileWithVSGTest() throws SQLException {
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+
+ // remove root.vehicle
+ File vehicleDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator +
"root.vehicle");
+ List<File> vehicleFiles = getTsFilePaths(vehicleDir);
+ for (File tsFile : vehicleFiles) {
+ statement.execute(String.format("remove \"%s\"",
tsFile.getAbsolutePath()));
+ }
+ assertEquals(0, getTsFilePaths(vehicleDir).size());
+ // remove root.test
+ File testDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator +
"root.test");
+ List<File> testFiles = getTsFilePaths(testDir);
+ for (File tsFile : testFiles) {
+ statement.execute(String.format("remove \"%s\"",
tsFile.getAbsolutePath()));
+ }
+ assertEquals(0, getTsFilePaths(testDir).size());
+ }
+ }
+
+ /**
+ * scan parentDir and return all TsFile sorted by load sequence
+ *
+ * @param parentDir folder to scan
+ */
+ public static List<File> getTsFilePaths(File parentDir) {
+ List<File> res = new ArrayList<>();
+ if (!parentDir.exists()) {
+ Assert.fail();
+ return res;
+ }
+ scanDir(res, parentDir);
+ res.sort(
+ (f1, f2) -> {
+ int diffSg =
+ f1.getParentFile()
+ .getParentFile()
+ .getParentFile()
+ .getName()
+
.compareTo(f2.getParentFile().getParentFile().getParentFile().getName());
+ if (diffSg != 0) {
+ return diffSg;
+ } else {
+ return (int)
+ (FilePathUtils.splitAndGetTsFileVersion(f1.getName())
+ - FilePathUtils.splitAndGetTsFileVersion(f2.getName()));
+ }
+ });
+ return res;
+ }
+
+ private static void scanDir(List<File> tsFiles, File parentDir) {
+ if (!parentDir.exists()) {
+ Assert.fail();
+ return;
+ }
+ File fa[] = parentDir.listFiles();
+ for (int i = 0; i < fa.length; i++) {
+ File fs = fa[i];
+ if (fs.isDirectory()) {
+ scanDir(tsFiles, fs);
+ } else if (fs.getName().endsWith(".resource")) {
+ // only add tsfile that has been flushed
+ tsFiles.add(new File(fs.getAbsolutePath().substring(0,
fs.getAbsolutePath().length() - 9)));
+ }
+ }
+ }
+}