This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new fbdbd40eda [To rel/0.12][IOTDB-3392] File doesn't exist when move 
tsfile when virtual_storage_group_num > 1 (#6283)
fbdbd40eda is described below

commit fbdbd40edadf572ad50c9aa5b869aec63fe917e2
Author: Chen YZ <[email protected]>
AuthorDate: Wed Jun 15 17:27:19 2022 +0800

    [To rel/0.12][IOTDB-3392] File doesn't exist when move tsfile when 
virtual_storage_group_num > 1 (#6283)
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  73 +++++++-
 .../virtualSg/VirtualStorageGroupManager.java      |  38 +++++
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |   8 +-
 .../IoTDBLoadExternalTsfileWithVirtualSGIT.java    | 189 +++++++++++++++++++++
 4 files changed, 302 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 4e2e81dd4c..fb19d1aced 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -498,6 +498,24 @@ public class StorageEngine implements IService {
     }
   }
 
+  /**
+   * This method is for sync, delete tsfile or sth like them, just get storage 
group directly by
+   * virtualStorageGroupId
+   *
+   * @param path storage group path
+   * @param virtualStorageGroupId virtual storage group partition id
+   * @return storage group processor
+   */
+  public StorageGroupProcessor getProcessorDirectly(PartialPath path, int 
virtualStorageGroupId)
+      throws StorageEngineException {
+    try {
+      StorageGroupMNode storageGroupMNode = 
IoTDB.metaManager.getStorageGroupNodeByPath(path);
+      return getStorageGroupProcessorByPath(storageGroupMNode, 
virtualStorageGroupId);
+    } catch (StorageGroupProcessorException | MetadataException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
   /**
    * This method is for insert and query or sth like them, this may get a 
virtual storage group
    *
@@ -567,6 +585,40 @@ public class StorageEngine implements IService {
     return virtualStorageGroupManager.getProcessor(devicePath, 
storageGroupMNode);
   }
 
+  /**
+   * get storage group processor by virtualStorageGroupId
+   *
+   * @param storageGroupMNode mnode of the storage group, we need synchronize 
this to avoid
+   *     modification in mtree
+   * @param virtualStorageGroupId virtual storage group partition id
+   * @return found or new storage group processor
+   */
+  @SuppressWarnings("java:S2445")
+  // actually storageGroupMNode is a unique object on the mtree, synchronize 
it is reasonable
+  private StorageGroupProcessor getStorageGroupProcessorByPath(
+      StorageGroupMNode storageGroupMNode, int virtualStorageGroupId)
+      throws StorageGroupProcessorException, StorageEngineException {
+    VirtualStorageGroupManager virtualStorageGroupManager =
+        processorMap.get(storageGroupMNode.getPartialPath());
+    if (virtualStorageGroupManager == null) {
+      // if finish recover
+      if (isAllSgReady.get()) {
+        synchronized (this) {
+          virtualStorageGroupManager = 
processorMap.get(storageGroupMNode.getPartialPath());
+          if (virtualStorageGroupManager == null) {
+            virtualStorageGroupManager = new VirtualStorageGroupManager();
+            processorMap.put(storageGroupMNode.getPartialPath(), 
virtualStorageGroupManager);
+          }
+        }
+      } else {
+        // not finished recover, refuse the request
+        throw new StorageGroupNotReadyException(
+            storageGroupMNode.getFullPath(), 
TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
+      }
+    }
+    return virtualStorageGroupManager.getProcessor(virtualStorageGroupId, 
storageGroupMNode);
+  }
+
   /**
    * build a new storage group processor
    *
@@ -881,12 +933,14 @@ public class StorageEngine implements IService {
   }
 
   /** delete all data of storage groups' timeseries. */
+  @TestOnly
   public synchronized boolean deleteAll() {
     logger.info("Start deleting all storage groups' timeseries");
     syncCloseAllProcessor();
     for (PartialPath storageGroup : 
IoTDB.metaManager.getAllStorageGroupPaths()) {
       this.deleteAllDataFilesInOneStorageGroup(storageGroup);
     }
+    processorMap.clear();
     return true;
   }
 
@@ -938,13 +992,17 @@ public class StorageEngine implements IService {
 
   public boolean deleteTsfile(File deletedTsfile)
       throws StorageEngineException, IllegalPathException {
-    return getProcessorDirectly(new 
PartialPath(getSgByEngineFile(deletedTsfile)))
+    return getProcessorDirectly(
+            new PartialPath(getSgByEngineFile(deletedTsfile)),
+            getVirtualSgIdByEngineFile(deletedTsfile))
         .deleteTsfile(deletedTsfile);
   }
 
   public boolean moveTsfile(File tsfileToBeMoved, File targetDir)
       throws StorageEngineException, IllegalPathException {
-    return getProcessorDirectly(new 
PartialPath(getSgByEngineFile(tsfileToBeMoved)))
+    return getProcessorDirectly(
+            new PartialPath(getSgByEngineFile(tsfileToBeMoved)),
+            getVirtualSgIdByEngineFile(tsfileToBeMoved))
         .moveTsfile(tsfileToBeMoved, targetDir);
   }
 
@@ -959,6 +1017,17 @@ public class StorageEngine implements IService {
     return file.getParentFile().getParentFile().getParentFile().getName();
   }
 
+  /**
+   * The internal file means that the file is in the engine, which is 
different from those external
+   * files which are not loaded.
+   *
+   * @param file internal file
+   * @return virtual storage group partition id
+   */
+  public int getVirtualSgIdByEngineFile(File file) {
+    return Integer.parseInt(file.getParentFile().getParentFile().getName());
+  }
+
   /** @return TsFiles (seq or unseq) grouped by their storage group and 
partition number. */
   public Map<PartialPath, Map<Long, List<TsFileResource>>> 
getAllClosedStorageGroupTsFile() {
     Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index 3af47c5731..650c297223 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -152,6 +152,44 @@ public class VirtualStorageGroupManager {
     return processor;
   }
 
+  /**
+   * get processor from loc
+   *
+   * @param virtualStorageGroupId virtual storage group id
+   * @return virtual storage group processor
+   */
+  @SuppressWarnings("java:S2445")
+  // actually storageGroupMNode is a unique object on the mtree, synchronize 
it is reasonable
+  public StorageGroupProcessor getProcessor(
+      int virtualStorageGroupId, StorageGroupMNode storageGroupMNode)
+      throws StorageGroupProcessorException, StorageEngineException {
+
+    StorageGroupProcessor processor = 
virtualStorageGroupProcessor[virtualStorageGroupId];
+    if (processor == null) {
+      // if finish recover
+      if (StorageEngine.getInstance().isAllSgReady()) {
+        synchronized (storageGroupMNode) {
+          processor = virtualStorageGroupProcessor[virtualStorageGroupId];
+          if (processor == null) {
+            processor =
+                StorageEngine.getInstance()
+                    .buildNewStorageGroupProcessor(
+                        storageGroupMNode.getPartialPath(),
+                        storageGroupMNode,
+                        String.valueOf(virtualStorageGroupId));
+            virtualStorageGroupProcessor[virtualStorageGroupId] = processor;
+          }
+        }
+      } else {
+        // not finished recover, refuse the request
+        throw new StorageGroupNotReadyException(
+            storageGroupMNode.getFullPath(), 
TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
+      }
+    }
+
+    return processor;
+  }
+
   /**
    * recover
    *
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 1479a09bf9..ed256aa1de 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -61,7 +61,7 @@ public class IoTDBLoadExternalTsfileIT {
 
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
-  private static String[] insertSequenceSqls =
+  protected static String[] insertSequenceSqls =
       new String[] {
         "SET STORAGE GROUP TO root.vehicle",
         "SET STORAGE GROUP TO root.test",
@@ -127,7 +127,7 @@ public class IoTDBLoadExternalTsfileIT {
   private static final String TEST_D0_S0_STR = "root.test.d0.s0";
   private static final String TEST_D0_S1_STR = "root.test.d0.s1";
   private static final String TEST_D1_STR = "root.test.d1.g0.s0";
-  private static int virtualPartitionNum = 0;
+  protected static int virtualPartitionNum = 0;
 
   private static String[] deleteSqls =
       new String[] {"DELETE STORAGE GROUP root.vehicle", "DELETE STORAGE GROUP 
root.test"};
@@ -137,11 +137,11 @@ public class IoTDBLoadExternalTsfileIT {
     IoTDBDescriptor.getInstance()
         .getConfig()
         .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+    virtualPartitionNum = 
IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
     IoTDBDescriptor.getInstance().getConfig().setVirtualStorageGroupNum(1);
     HashVirtualPartitioner.getInstance().setStorageGroupNum(1);
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.envSetUp();
-    virtualPartitionNum = 
IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
     Class.forName(Config.JDBC_DRIVER_NAME);
     prepareData(insertSequenceSqls);
   }
@@ -812,7 +812,7 @@ public class IoTDBLoadExternalTsfileIT {
     }
   }
 
-  private void prepareData(String[] sqls) {
+  protected void prepareData(String[] sqls) {
     try (Connection connection =
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
new file mode 100644
index 0000000000..91647df890
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileWithVirtualSGIT.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import 
org.apache.iotdb.db.engine.storagegroup.virtualSg.HashVirtualPartitioner;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class IoTDBLoadExternalTsfileWithVirtualSGIT extends 
IoTDBLoadExternalTsfileIT {
+  @Before
+  public void setUp() throws Exception {
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+    virtualPartitionNum = 
IoTDBDescriptor.getInstance().getConfig().getVirtualStorageGroupNum();
+    IoTDBDescriptor.getInstance().getConfig().setVirtualStorageGroupNum(2);
+    HashVirtualPartitioner.getInstance().setStorageGroupNum(2);
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    StorageEngine.getInstance().reset();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData(insertSequenceSqls);
+  }
+
+  @Test
+  public void moveTsfileWithVSGTest() throws SQLException {
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = connection.createStatement()) {
+
+      // move root.vehicle
+      File vehicleDir =
+          new File(
+              IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+              IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator + 
"root.vehicle");
+      List<File> vehicleFiles = getTsFilePaths(vehicleDir);
+      File tmpDir =
+          new File(
+              IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+              "tmp" + File.separator + new PartialPath("root.vehicle"));
+      if (!tmpDir.exists()) {
+        tmpDir.mkdirs();
+      }
+      for (File tsFile : vehicleFiles) {
+        statement.execute(String.format("move \"%s\" \"%s\"", 
tsFile.getAbsolutePath(), tmpDir));
+      }
+      assertEquals(0, getTsFilePaths(vehicleDir).size());
+      assertNotNull(tmpDir.listFiles());
+      assertEquals(2, tmpDir.listFiles().length >> 1);
+      //
+      //            // move root.test
+      File testDir =
+          new File(
+              IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+              IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator + 
"root.test");
+      List<File> testFiles = getTsFilePaths(testDir);
+      tmpDir =
+          new File(
+              IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+              "tmp" + File.separator + new PartialPath("root.test"));
+      if (!tmpDir.exists()) {
+        tmpDir.mkdirs();
+      }
+      for (File tsFile : testFiles) {
+        statement.execute(String.format("move \"%s\" \"%s\"", 
tsFile.getAbsolutePath(), tmpDir));
+      }
+      assertEquals(0, getTsFilePaths(testDir).size());
+      assertNotNull(tmpDir.listFiles());
+      assertEquals(2, tmpDir.listFiles().length >> 1);
+    } catch (IllegalPathException e) {
+      Assert.fail();
+    }
+  }
+
+  @Test
+  public void removeTsfileWithVSGTest() throws SQLException {
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = connection.createStatement()) {
+
+      // remove root.vehicle
+      File vehicleDir =
+          new File(
+              IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+              IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator + 
"root.vehicle");
+      List<File> vehicleFiles = getTsFilePaths(vehicleDir);
+      for (File tsFile : vehicleFiles) {
+        statement.execute(String.format("remove \"%s\"", 
tsFile.getAbsolutePath()));
+      }
+      assertEquals(0, getTsFilePaths(vehicleDir).size());
+      // remove root.test
+      File testDir =
+          new File(
+              IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0],
+              IoTDBConstant.SEQUENCE_FLODER_NAME + File.separator + 
"root.test");
+      List<File> testFiles = getTsFilePaths(testDir);
+      for (File tsFile : testFiles) {
+        statement.execute(String.format("remove \"%s\"", 
tsFile.getAbsolutePath()));
+      }
+      assertEquals(0, getTsFilePaths(testDir).size());
+    }
+  }
+
+  /**
+   * scan parentDir and return all TsFile sorted by load sequence
+   *
+   * @param parentDir folder to scan
+   */
+  public static List<File> getTsFilePaths(File parentDir) {
+    List<File> res = new ArrayList<>();
+    if (!parentDir.exists()) {
+      Assert.fail();
+      return res;
+    }
+    scanDir(res, parentDir);
+    res.sort(
+        (f1, f2) -> {
+          int diffSg =
+              f1.getParentFile()
+                  .getParentFile()
+                  .getParentFile()
+                  .getName()
+                  
.compareTo(f2.getParentFile().getParentFile().getParentFile().getName());
+          if (diffSg != 0) {
+            return diffSg;
+          } else {
+            return (int)
+                (FilePathUtils.splitAndGetTsFileVersion(f1.getName())
+                    - FilePathUtils.splitAndGetTsFileVersion(f2.getName()));
+          }
+        });
+    return res;
+  }
+
+  private static void scanDir(List<File> tsFiles, File parentDir) {
+    if (!parentDir.exists()) {
+      Assert.fail();
+      return;
+    }
+    File fa[] = parentDir.listFiles();
+    for (int i = 0; i < fa.length; i++) {
+      File fs = fa[i];
+      if (fs.isDirectory()) {
+        scanDir(tsFiles, fs);
+      } else if (fs.getName().endsWith(".resource")) {
+        // only add tsfile that has been flushed
+        tsFiles.add(new File(fs.getAbsolutePath().substring(0, 
fs.getAbsolutePath().length() - 9)));
+      }
+    }
+  }
+}

Reply via email to