This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d4492c  add vm level test and fix bug (#1522)
0d4492c is described below

commit 0d4492c066e7b806d9bd0ec59660f08f6687edce
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Mon Jul 20 14:24:42 2020 +0800

    add vm level test and fix bug (#1522)
    
    * fix vm recover bug
    Co-authored-by: 张凌哲 <[email protected]>
---
 .../engine/storagegroup/StorageGroupProcessor.java |  2 +-
 .../db/engine/storagegroup/TsFileProcessor.java    | 26 ++++++++++++++++++----
 .../writelog/recover/TsFileRecoverPerformer.java   |  3 ++-
 .../storagegroup/TsFileProcessorEnableVmTest.java  |  7 ++++++
 4 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6a30c5f..db890bc 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -585,7 +585,7 @@ public class StorageGroupProcessor {
   public static int getVmLevel(File file) {
     String vmLevelStr = file.getPath()
         
.substring(file.getPath().lastIndexOf(TSFILE_SUFFIX)).replaceAll(TSFILE_SUFFIX, 
"")
-        .split(IoTDBConstant.FILE_NAME_SEPARATOR)[0];
+        .split(IoTDBConstant.FILE_NAME_SEPARATOR)[1];
     return Integer.parseInt(vmLevelStr);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9031941..c5b687c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -605,7 +605,7 @@ public class TsFileProcessor {
     }
   }
 
-  public File createNewVMFile(TsFileResource tsFileResource, int level) {
+  public File createNewVMFileWithLock(TsFileResource tsFileResource, int 
level) {
     vmFileCreateLock.writeLock().lock();
     try {
       TimeUnit.MILLISECONDS.sleep(1);
@@ -624,6 +624,22 @@ public class TsFileProcessor {
     }
   }
 
+  public static File createNewVMFile(TsFileResource tsFileResource, int level) 
{
+    try {
+      TimeUnit.MILLISECONDS.sleep(1);
+      File parent = tsFileResource.getTsFile().getParentFile();
+      return FSFactoryProducer.getFSFactory().getFile(parent,
+          tsFileResource.getTsFile().getName() + 
IoTDBConstant.FILE_NAME_SEPARATOR + level
+              + IoTDBConstant.FILE_NAME_SEPARATOR + System
+              .currentTimeMillis() + VM_SUFFIX);
+    } catch (InterruptedException e) {
+      logger.error("{}: {}, closing task is interrupted.",
+          tsFileResource.getTsFile().getParent(), 
tsFileResource.getTsFile().getName(), e);
+      Thread.currentThread().interrupt();
+      return null;
+    }
+  }
+
   private void deleteVmFiles(List<TsFileResource> vmMergeTsFiles,
       List<RestorableTsFileIOWriter> vmMergeWriters) throws IOException {
     logger.debug("{}: {} vm merge starts to delete file", storageGroupName,
@@ -693,7 +709,7 @@ public class TsFileProcessor {
           }
           int level = getVmLevel(sourceFileList.get(0));
           if (isMergeFinished) {
-            File newVmFile = createNewVMFile(tsFileResource, level + 1);
+            File newVmFile = createNewVMFileWithLock(tsFileResource, level + 
1);
             if (!targetFile.renameTo(newVmFile)) {
               logger.error("Failed to rename {} to {}", targetFile, newVmFile);
             } else {
@@ -760,7 +776,7 @@ public class TsFileProcessor {
         if (config.isEnableVm()) {
           logger.info("{}: {} [Flush] start to flush a memtable to a vm", 
storageGroupName,
               tsFileResource.getTsFile().getName());
-          File newVmFile = createNewVMFile(tsFileResource, 0);
+          File newVmFile = createNewVMFileWithLock(tsFileResource, 0);
           if (vmWriters.isEmpty()) {
             vmWriters.add(new ArrayList<>());
             vmTsFileResources.add(new ArrayList<>());
@@ -1111,9 +1127,11 @@ public class TsFileProcessor {
       List<List<TsFileResource>> currMergeVmFiles, VmLogger vmLogger) throws 
IOException {
     VmMergeUtils.merge(writer, packVmWritersToSequenceList(currMergeVmWriters),
         storageGroupName, vmLogger, new HashSet<>(), sequence);
+    vmMergeLock.writeLock().lock();
     for (int i = 0; i < currMergeVmFiles.size(); i++) {
       deleteVmFiles(currMergeVmFiles.get(i), currMergeVmWriters.get(i));
     }
+    vmMergeLock.writeLock().unlock();
   }
 
   private List<RestorableTsFileIOWriter> packVmWritersToSequenceList(
@@ -1187,7 +1205,7 @@ public class TsFileProcessor {
               for (RestorableTsFileIOWriter vmWriter : vmMergeWriters.get(i)) {
                 vmLogger.logFile(SOURCE_NAME, vmWriter.getFile());
               }
-              File newVmFile = createNewVMFile(tsFileResource, i + 1);
+              File newVmFile = createNewVMFileWithLock(tsFileResource, i + 1);
               vmLogger.logFile(TARGET_NAME, newVmFile);
               logger.info("{}: {} [Hot Compaction] merge level-{}'s {} vms to 
next level vm",
                   storageGroupName, tsFileResource.getTsFile().getName(), i,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 1a146d6..d75de25 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.writelog.recover;
 
 import static 
org.apache.iotdb.db.engine.flush.MemTableFlushTask.getFlushLogFile;
 import static org.apache.iotdb.db.engine.flush.VmLogger.isVMLoggerFileExist;
+import static 
org.apache.iotdb.db.engine.storagegroup.TsFileProcessor.createNewVMFile;
 import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
 
 import java.io.File;
@@ -176,7 +177,7 @@ public class TsFileRecoverPerformer {
             if (tsFileNotCrashed) {
 
               // if wal exists, we should open a new vmfile to replay it
-              File newVmFile = 
resource.getProcessor().createNewVMFile(resource, 0);
+              File newVmFile = createNewVMFile(resource, 0);
               TsFileResource newVmTsFileResource = new 
TsFileResource(newVmFile);
               RestorableTsFileIOWriter newVMWriter = new 
RestorableTsFileIOWriter(newVmFile);
               if (redoLogs(newVMWriter, newVmTsFileResource)) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
index 1af2794..7b8044d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import static junit.framework.TestCase.assertTrue;
+import static 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.getVmLevel;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
@@ -127,6 +128,12 @@ public class TsFileProcessorEnableVmTest {
     tsfileResourcesForQuery.clear();
     processor.query(deviceId, measurementId, dataType, encoding, props, 
context,
         tsfileResourcesForQuery);
+    List<List<TsFileResource>> tsfileResources = 
processor.getVmTsFileResources();
+    for (List<TsFileResource> levelResources : tsfileResources) {
+      for (TsFileResource resource : levelResources) {
+        assertEquals(0, getVmLevel(resource.getTsFile()));
+      }
+    }
 
     assertEquals(1, tsfileResourcesForQuery.size());
     assertEquals(1, 
tsfileResourcesForQuery.get(0).getChunkMetadataList().size());

Reply via email to