This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ed20fccc2b [IOTDB-3392] File doesn't exist when move tsfile when
virtual_storage_group_num > 1 (#6310)
ed20fccc2b is described below
commit ed20fccc2bbca84e0cf2b00cf8f48b6996064f20
Author: Chen YZ <[email protected]>
AuthorDate: Sun Jun 19 22:19:03 2022 +0800
[IOTDB-3392] File doesn't exist when move tsfile when
virtual_storage_group_num > 1 (#6310)
---
integration/checkstyle.xml | 2 +-
.../db/integration/IoTDBLoadExternalTsfileIT.java | 8 +-
.../IoTDBLoadExternalTsfileWithVirtualSGIT.java | 138 +++++++++++++++++++++
.../org/apache/iotdb/db/engine/StorageEngine.java | 73 ++++++++++-
.../dataregion/StorageGroupManager.java | 19 ++-
5 files changed, 231 insertions(+), 9 deletions(-)
diff --git a/integration/checkstyle.xml b/integration/checkstyle.xml
index 6070cd880d..578af5a865 100644
--- a/integration/checkstyle.xml
+++ b/integration/checkstyle.xml
@@ -25,7 +25,7 @@
<property name="severity" value="error"/>
<property name="fileExtensions" value="java"/>
<module name="BeforeExecutionExclusionFileFilter">
- <property name="fileNamePattern"
value="(^.*([\\/]src[\\/]main|[\\/]src[\\/]test[\\/]java[\\/]org[\\/]apache[\\/]iotdb[\\/]session)[\\/].*$)|
|^.*IoTDBSchemaTemplateIT\.java$|
|^.*IoTDBSeriesReaderIT\.java$| |^.*IoTDBCompactionIT\.java$|
|^.*IoTDBAggregationByLevelIT\.java$|
|^.*IoTDBUDFNestAggregationIT\.java$|
|^.*IoTDBAggregationDeleteIT\.java$| |^.*IoTDBUserDefinedA [...]
+ <property name="fileNamePattern"
value="(^.*([\\/]src[\\/]main|[\\/]src[\\/]test[\\/]java[\\/]org[\\/]apache[\\/]iotdb[\\/]session)[\\/].*$)|
|^.*IoTDBSchemaTemplateIT\.java$|
|^.*IoTDBSeriesReaderIT\.java$| |^.*IoTDBCompactionIT\.java$|
|^.*IoTDBAggregationByLevelIT\.java$|
|^.*IoTDBUDFNestAggregationIT\.java$|
|^.*IoTDBAggregationDeleteIT\.java$| |^.*IoTDBUserDefinedA [...]
</module>
<!-- <module name="RegexpOnFilename">-->
<!-- <property name="fileNamePattern" value="^.*IT\.java$"/>-->
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 36bca5e033..852b1484ac 100644
---
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -62,7 +62,7 @@ public class IoTDBLoadExternalTsfileIT {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
- private static String[] insertSequenceSqls =
+ protected static String[] insertSequenceSqls =
new String[] {
"SET STORAGE GROUP TO root.vehicle",
"SET STORAGE GROUP TO root.test",
@@ -132,8 +132,8 @@ public class IoTDBLoadExternalTsfileIT {
private static final String TEST_D0_S1_STR = "root.test.d0.s1";
private static final String TEST_D1_STR = "root.test.d1.g0.s0";
- private int prevVirtualPartitionNum;
- private int prevCompactionThread;
+ protected int prevVirtualPartitionNum;
+ protected int prevCompactionThread;
private static String[] deleteSqls =
new String[] {"DELETE STORAGE GROUP root.vehicle", "DELETE STORAGE GROUP
root.test"};
@@ -1016,7 +1016,7 @@ public class IoTDBLoadExternalTsfileIT {
}
}
- private void prepareData(String[] sqls) {
+ protected void prepareData(String[] sqls) {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
diff --git
a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
new file mode 100644
index 0000000000..36ab81f9a6
--- /dev/null
+++
b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.integration.sync.SyncTestUtil;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@Category({LocalStandaloneTest.class})
+public class IoTDBLoadExternalTsfileWithVirtualSGIT extends
IoTDBLoadExternalTsfileIT {
+ @Before
+ public void setUp() throws Exception {
+ prevVirtualPartitionNum =
IoTDBDescriptor.getInstance().getConfig().getDataRegionNum();
+ IoTDBDescriptor.getInstance().getConfig().setDataRegionNum(2);
+ prevCompactionThread =
+
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
+ EnvironmentUtils.envSetUp();
+ StorageEngine.getInstance().reset();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData(insertSequenceSqls);
+ }
+
+ @Test
+ public void unloadTsfileWithVSGTest() throws SQLException {
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+
+ // move root.vehicle
+ File vehicleDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator +
"root.vehicle");
+ List<File> vehicleFiles = SyncTestUtil.getTsFilePaths(vehicleDir);
+ File tmpDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ "tmp" + File.separator + new PartialPath("root.vehicle"));
+ if (!tmpDir.exists()) {
+ tmpDir.mkdirs();
+ }
+ for (File tsFile : vehicleFiles) {
+ statement.execute(String.format("unload \"%s\" \"%s\"",
tsFile.getAbsolutePath(), tmpDir));
+ }
+ assertEquals(0, SyncTestUtil.getTsFilePaths(vehicleDir).size());
+ assertNotNull(tmpDir.listFiles());
+ assertEquals(2, tmpDir.listFiles().length >> 1);
+
+ // move root.test
+ File testDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator +
"root.test");
+ List<File> testFiles = SyncTestUtil.getTsFilePaths(testDir);
+ tmpDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ "tmp" + File.separator + new PartialPath("root.test"));
+ if (!tmpDir.exists()) {
+ tmpDir.mkdirs();
+ }
+ for (File tsFile : testFiles) {
+ statement.execute(String.format("unload \"%s\" \"%s\"",
tsFile.getAbsolutePath(), tmpDir));
+ }
+ assertEquals(0, SyncTestUtil.getTsFilePaths(testDir).size());
+ assertNotNull(tmpDir.listFiles());
+ assertEquals(2, tmpDir.listFiles().length >> 1);
+ } catch (IllegalPathException e) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void removeTsfileWithVSGTest() throws SQLException {
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+
+ // remove root.vehicle
+ File vehicleDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator +
"root.vehicle");
+ List<File> vehicleFiles = SyncTestUtil.getTsFilePaths(vehicleDir);
+ for (File tsFile : vehicleFiles) {
+ statement.execute(String.format("remove \"%s\"",
tsFile.getAbsolutePath()));
+ }
+ assertEquals(0, SyncTestUtil.getTsFilePaths(vehicleDir).size());
+ // remove root.test
+ File testDir =
+ new File(
+ IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+ IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator +
"root.test");
+ List<File> testFiles = SyncTestUtil.getTsFilePaths(testDir);
+ for (File tsFile : testFiles) {
+ statement.execute(String.format("remove \"%s\"",
tsFile.getAbsolutePath()));
+ }
+ assertEquals(0, SyncTestUtil.getTsFilePaths(testDir).size());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 9643efb22a..e41870df46 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -450,6 +450,24 @@ public class StorageEngine implements IService {
}
}
+ /**
+ * This method is for sync, delete tsfile or sth like them, just get storage
group directly by
+ * dataRegionId
+ *
+ * @param path storage group path
+ * @param dataRegionId dataRegionId
+ * @return storage group processor
+ */
+ public DataRegion getProcessorDirectly(PartialPath path, int dataRegionId)
+ throws StorageEngineException {
+ try {
+ IStorageGroupMNode storageGroupMNode =
IoTDB.schemaProcessor.getStorageGroupNodeByPath(path);
+ return getStorageGroupProcessorById(dataRegionId, storageGroupMNode);
+ } catch (DataRegionException | MetadataException e) {
+ throw new StorageEngineException(e);
+ }
+ }
+
/**
* This method is for insert and query or sth like them, this may get a
virtual storage group
*
@@ -510,6 +528,19 @@ public class StorageEngine implements IService {
return getStorageGroupManager(storageGroupMNode).getProcessor(devicePath,
storageGroupMNode);
}
+ /**
+ * get storage group processor by dataRegionId
+ *
+ * @param dataRegionId dataRegionId
+ * @param storageGroupMNode mnode of the storage group, we need synchronize
this to avoid
+ * modification in mtree
+ * @return found or new storage group processor
+ */
+ private DataRegion getStorageGroupProcessorById(
+ int dataRegionId, IStorageGroupMNode storageGroupMNode)
+ throws DataRegionException, StorageEngineException {
+ return
getStorageGroupManager(storageGroupMNode).getProcessor(dataRegionId,
storageGroupMNode);
+ }
/**
* get storage group manager by storage group mnode
*
@@ -830,12 +861,14 @@ public class StorageEngine implements IService {
}
/** delete all data of storage groups' timeseries. */
+ @TestOnly
public synchronized boolean deleteAll() {
logger.info("Start deleting all storage groups' timeseries");
syncCloseAllProcessor();
for (PartialPath storageGroup :
IoTDB.schemaProcessor.getAllStorageGroupPaths()) {
this.deleteAllDataFilesInOneStorageGroup(storageGroup);
}
+ processorMap.clear();
return true;
}
@@ -883,13 +916,17 @@ public class StorageEngine implements IService {
public boolean deleteTsfile(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
- return getProcessorDirectly(new
PartialPath(getSgByEngineFile(deletedTsfile, true)))
+ return getProcessorDirectly(
+ new PartialPath(getSgByEngineFile(deletedTsfile, true)),
+ getDataRegionIdByEngineFile(deletedTsfile, true))
.deleteTsfile(deletedTsfile);
}
public boolean unloadTsfile(File tsfileToBeUnloaded, File targetDir)
throws StorageEngineException, IllegalPathException {
- return getProcessorDirectly(new
PartialPath(getSgByEngineFile(tsfileToBeUnloaded, true)))
+ return getProcessorDirectly(
+ new PartialPath(getSgByEngineFile(tsfileToBeUnloaded, true)),
+ getDataRegionIdByEngineFile(tsfileToBeUnloaded, true))
.unloadTsfile(tsfileToBeUnloaded, targetDir);
}
@@ -925,6 +962,38 @@ public class StorageEngine implements IService {
}
}
+ /**
+ * The internal file means that the file is in the engine, which is
different from those external
+ * files which are not loaded.
+ *
+ * @param file internal file
+ * @param needCheck check if the tsfile is an internal TsFile. If you make
sure it is inside, no
+ * need to check
+ * @return dataRegionId
+ * @throws IllegalPathException throw if tsfile is not an internal TsFile
+ */
+ public int getDataRegionIdByEngineFile(File file, boolean needCheck) throws
IllegalPathException {
+ if (needCheck) {
+ File dataDir =
+
file.getParentFile().getParentFile().getParentFile().getParentFile().getParentFile();
+ if (dataDir.exists()) {
+ String[] dataDirs =
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ for (String dir : dataDirs) {
+ try {
+ if (Files.isSameFile(Paths.get(dir), dataDir.toPath())) {
+ return
Integer.parseInt(file.getParentFile().getParentFile().getName());
+ }
+ } catch (IOException e) {
+ throw new IllegalPathException(file.getAbsolutePath(),
e.getMessage());
+ }
+ }
+ }
+ throw new IllegalPathException(file.getAbsolutePath(), "it's not an
internal tsfile.");
+ } else {
+ return Integer.parseInt(file.getParentFile().getParentFile().getName());
+ }
+ }
+
/**
* Get all the closed tsfiles of each storage group.
*
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
index 9af9b4ee84..345c995332 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java
@@ -146,6 +146,17 @@ public class StorageGroupManager {
return getProcessor(storageGroupMNode, dataRegionId);
}
+ /**
+ * get processor from data region id
+ *
+ * @param dataRegionId dataRegionId
+ * @return virtual storage group processor
+ */
+ public DataRegion getProcessor(int dataRegionId, IStorageGroupMNode
storageGroupMNode)
+ throws DataRegionException, StorageEngineException {
+ return getProcessor(storageGroupMNode, dataRegionId);
+ }
+
@SuppressWarnings("java:S2445")
// actually storageGroupMNode is a unique object on the mtree, synchronize
it is reasonable
public DataRegion getProcessor(IStorageGroupMNode storageGroupMNode, int
dataRegionId)
@@ -483,13 +494,17 @@ public class StorageGroupManager {
public void setAllowCompaction(boolean allowCompaction) {
for (DataRegion processor : dataRegion) {
- processor.setAllowCompaction(allowCompaction);
+ if (processor != null) {
+ processor.setAllowCompaction(allowCompaction);
+ }
}
}
public void abortCompaction() {
for (DataRegion processor : dataRegion) {
- processor.abortCompaction();
+ if (processor != null) {
+ processor.abortCompaction();
+ }
}
}