This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dev_new_merge_rebase
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 64063bde262b36a1f145548a0a088052ea1bb1ad
Author: EJTTianyu <[email protected]>
AuthorDate: Wed Jun 10 16:38:19 2020 +0800

    fix merge bug
---
 .../seqMerge/inplace/task/MergeMultiChunkTask.java | 90 +++++++++++-----------
 .../seqMerge/squeeze/task/SqueezeMergeTask.java    |  4 +-
 .../task/RegularizationMergeSeriesTask.java        |  8 ++
 .../task/RegularizationMergeTask.java              |  4 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 53 +++++--------
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |  4 +-
 .../iotdb/db/integration/IoTDBSensorUpdateIT.java  |  3 +
 7 files changed, 84 insertions(+), 82 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java
index c2bc33f..67381f4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java
@@ -213,19 +213,21 @@ class MergeMultiChunkTask {
     int[] ptWrittens = new int[seqChunkMeta.length];
     int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig()
         .getMergeChunkSubThreadNum();
-    PriorityQueue<MetaListEntry>[] chunkMetaHeaps = new 
PriorityQueue[mergeChunkSubTaskNum];
+    MetaListEntry[] metaListEntries = new 
MetaListEntry[currMergingPaths.size()];
+    PriorityQueue<Integer>[] chunkIdxHeaps = new 
PriorityQueue[mergeChunkSubTaskNum];
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
-      chunkMetaHeaps[i] = new PriorityQueue<>();
+      chunkIdxHeaps[i] = new PriorityQueue<>();
     }
     int idx = 0;
     for (int i = 0; i < currMergingPaths.size(); i++) {
+      chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i);
       if (seqChunkMeta[i].isEmpty()) {
         continue;
       }
       MetaListEntry entry = new MetaListEntry(i, seqChunkMeta[i]);
       entry.next();
 
-      chunkMetaHeaps[idx % mergeChunkSubTaskNum].add(entry);
+      metaListEntries[i] = entry;
       idx++;
       ptWrittens[i] = 0;
     }
@@ -237,9 +239,8 @@ class MergeMultiChunkTask {
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
       int finalI = i;
       futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> {
-        mergeChunkHeap(chunkMetaHeaps[finalI], ptWrittens, reader, 
mergeFileWriter, unseqReaders,
-            currFile,
-            isLastFile);
+        mergeChunkHeap(chunkIdxHeaps[finalI], metaListEntries, ptWrittens, 
reader, mergeFileWriter,
+            unseqReaders, currFile, isLastFile);
         return null;
       }));
     }
@@ -260,48 +261,49 @@ class MergeMultiChunkTask {
     return mergedChunkNum.get() > 0;
   }
 
-  private void mergeChunkHeap(PriorityQueue<MetaListEntry> chunkMetaHeap, 
int[] ptWrittens,
-      TsFileSequenceReader reader,
-      RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders,
-      TsFileResource currFile,
-      boolean isLastFile) throws IOException {
-    while (!chunkMetaHeap.isEmpty()) {
-      MetaListEntry metaListEntry = chunkMetaHeap.poll();
-      ChunkMetadata currMeta = metaListEntry.current();
-      int pathIdx = metaListEntry.getPathId();
-      boolean isLastChunk = !metaListEntry.hasNext();
+  private void mergeChunkHeap(PriorityQueue<Integer> chunkIdxHeap, 
MetaListEntry[] metaListEntries,
+      int[] ptWrittens, TsFileSequenceReader reader, RestorableTsFileIOWriter 
mergeFileWriter,
+      IPointReader[] unseqReaders, TsFileResource currFile, boolean 
isLastFile) throws IOException {
+    while (!chunkIdxHeap.isEmpty()) {
+      int pathIdx = chunkIdxHeap.poll();
       Path path = currMergingPaths.get(pathIdx);
       IChunkWriter chunkWriter = resource.getChunkWriter(path);
 
-      boolean chunkOverflowed = 
MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
-      boolean chunkTooSmall = MergeUtils
-          .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, 
minChunkPointNum);
-
-      Chunk chunk;
-      synchronized (reader) {
-        chunk = reader.readMemChunk(currMeta);
-      }
-      ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, 
chunkTooSmall, chunk,
-          ptWrittens[pathIdx], pathIdx, mergeFileWriter, 
unseqReaders[pathIdx], chunkWriter,
-          currFile);
-
-      if (!isLastChunk) {
-        metaListEntry.next();
-        chunkMetaHeap.add(metaListEntry);
-      } else {
-        // this only happens when the seqFiles do not contain this series, 
otherwise the remaining
-        // data will be merged with the last chunk in the seqFiles
-        if (isLastFile && currTimeValuePairs[pathIdx] != null) {
-          ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, 
unseqReaders[pathIdx],
-              Long.MAX_VALUE,
-              pathIdx);
-          mergedChunkNum.incrementAndGet();
+      if (metaListEntries[pathIdx] != null) {
+        MetaListEntry metaListEntry = metaListEntries[pathIdx];
+        ChunkMetadata currMeta = metaListEntry.current();
+        boolean isLastChunk = !metaListEntry.hasNext();
+        boolean chunkOverflowed = MergeUtils
+            .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
+        boolean chunkTooSmall = MergeUtils
+            .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, 
minChunkPointNum);
+
+        Chunk chunk;
+        synchronized (reader) {
+          chunk = reader.readMemChunk(currMeta);
+        }
+        ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, 
chunkTooSmall, chunk,
+            ptWrittens[pathIdx], pathIdx, mergeFileWriter, 
unseqReaders[pathIdx], chunkWriter,
+            currFile);
+
+        if (!isLastChunk) {
+          metaListEntry.next();
+          chunkIdxHeap.add(pathIdx);
+          continue;
         }
-        // the last merged chunk may still be smaller than the threshold, 
flush it anyway
-        if (ptWrittens[pathIdx] > 0) {
-          synchronized (mergeFileWriter) {
-            chunkWriter.writeToFileWriter(mergeFileWriter);
-          }
+      }
+      // this only happens when the seqFiles do not contain this series, 
otherwise the remaining
+      // data will be merged with the last chunk in the seqFiles
+      if (isLastFile && currTimeValuePairs[pathIdx] != null) {
+        ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, 
unseqReaders[pathIdx],
+            Long.MAX_VALUE,
+            pathIdx);
+        mergedChunkNum.incrementAndGet();
+      }
+      // the last merged chunk may still be smaller than the threshold, flush 
it anyway
+      if (ptWrittens[pathIdx] > 0) {
+        synchronized (mergeFileWriter) {
+          chunkWriter.writeToFileWriter(mergeFileWriter);
         }
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java
index 9303d12..13c1b11 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java
@@ -152,7 +152,7 @@ public class SqueezeMergeTask implements Callable<Void> {
   }
 
   private void deleteFile(TsFileResource seqFile) {
-    seqFile.getWriteQueryLock().writeLock().lock();
+    seqFile.writeLock();
     try {
       resource.removeFileReader(seqFile);
       ChunkMetadataCache.getInstance().remove(seqFile);
@@ -163,7 +163,7 @@ public class SqueezeMergeTask implements Callable<Void> {
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
     } finally {
-      seqFile.getWriteQueryLock().writeLock().unlock();
+      seqFile.writeUnlock();
     }
   }
 }
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java
index 0ee3d00..51783bc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java
@@ -53,6 +53,14 @@ public class RegularizationMergeSeriesTask extends 
BaseMergeSeriesTask {
         .split(IoTDBConstant.TSFILE_NAME_SEPARATOR);
     String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "")
         .split(IoTDBConstant.TSFILE_NAME_SEPARATOR);
+
+    //TODO: for test only
+    try {
+      Long.parseLong(items1[0]);
+    } catch (NumberFormatException e) {
+      return items1[0].compareTo(items2[0]);
+    }
+
     return Long.compare(Long.parseLong(items1[0]), Long.parseLong(items2[0]));
   }));
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java
index 5ff1a38..62f3ca9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java
@@ -161,7 +161,7 @@ public class RegularizationMergeTask implements 
Callable<Void> {
   }
 
   private void deleteFile(TsFileResource seqFile) {
-    seqFile.getWriteQueryLock().writeLock().lock();
+    seqFile.writeLock();
     try {
       resource.removeFileReader(seqFile);
       ChunkMetadataCache.getInstance().remove(seqFile);
@@ -170,7 +170,7 @@ public class RegularizationMergeTask implements 
Callable<Void> {
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
     } finally {
-      seqFile.getWriteQueryLock().writeLock().unlock();
+      seqFile.writeUnlock();
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 90351c7..b2b1e54 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1708,13 +1708,6 @@ public class StorageGroupProcessor {
   }
 
   private void removeSeqFiles(List<TsFileResource> seqFiles) {
-    mergeLock.writeLock().lock();
-    try {
-      sequenceFileTreeSet.removeAll(seqFiles);
-    } finally {
-      mergeLock.writeLock().unlock();
-    }
-
     for (TsFileResource seqFile : seqFiles) {
       seqFile.writeLock();
       try {
@@ -1726,13 +1719,6 @@ public class StorageGroupProcessor {
   }
 
   private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
-    mergeLock.writeLock().lock();
-    try {
-      unSequenceFileList.removeAll(unseqFiles);
-    } finally {
-      mergeLock.writeLock().unlock();
-    }
-
     for (TsFileResource unseqFile : unseqFiles) {
       unseqFile.writeLock();
       try {
@@ -1808,29 +1794,34 @@ public class StorageGroupProcessor {
 
   private void handleInplaceMerge(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, File mergeLog) {
-
+    mergeLock.writeLock().lock();
+    try {
+      unSequenceFileList.removeAll(unseqFiles);
+    } finally {
+      mergeLock.writeLock().unlock();
+    }
     removeUnseqFiles(unseqFiles);
     endMerge(mergeLog, seqFiles);
   }
 
   private void handleOtherMerge(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, File mergeLog, List<TsFileResource> 
newFile) {
-    // make sure no queries are holding the seqFiles
-    for (TsFileResource seqFile : seqFiles) {
-      seqFile.writeLock();
-    }
-    for (TsFileResource unseqFile : unseqFiles) {
-      unseqFile.writeLock();
-    }
     // block new queries and insertions to prevent the seqFiles from changing
+    writeLock();
     mergeLock.writeLock().lock();
     try {
-      removeUnseqFiles(unseqFiles);
-      removeSeqFiles(seqFiles);
+
+      unSequenceFileList.removeAll(unseqFiles);
+      sequenceFileTreeSet.removeAll(seqFiles);
+
       // move modifications generated during merge into the new file
       for (TsFileResource tsFileResource : newFile) {
-        tsFileResource
-            
.setProcessor(getOrCreateTsFileProcessor(tsFileResource.getTimePartition(), 
true));
+        TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName,
+            tsFileResource.getFile().getAbsoluteFile(),
+            
getVersionControllerByTimePartitionId(tsFileResource.getTimePartition()),
+            this::closeUnsealedTsFileProcessorCallBack,
+            this::updateLatestFlushTimeCallback, true);
+        tsFileResource.setProcessor(tsFileProcessor);
         if (mergingModification != null) {
           logger.info("{} is updating the merged file's modification file", 
storageGroupName);
           for (Modification modification : 
mergingModification.getModifications()) {
@@ -1847,15 +1838,13 @@ public class StorageGroupProcessor {
       logger.error("{} fails to do the after merge action,", storageGroupName, 
e);
     } finally {
       isMerging = false;
+      writeUnlock();
       mergeLock.writeLock().unlock();
       logger.info("{} a merge task ends", storageGroupName);
-      for (TsFileResource seqFile : seqFiles) {
-        seqFile.writeUnlock();
-      }
-      for (TsFileResource unseqFile : unseqFiles) {
-        unseqFile.writeUnlock();
-      }
     }
+
+    removeUnseqFiles(unseqFiles);
+    removeSeqFiles(seqFiles);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index 1e5e1e4..3350e24 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -35,8 +35,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.mnode.InternalMNode;
-import org.apache.iotdb.db.metadata.mnode.LeafMNode;
 import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -271,7 +271,7 @@ public class MergeUtils {
     for (String device : devices) {
       InternalMNode deviceNode = (InternalMNode) 
MManager.getInstance().getNodeByPath(device);
       for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) {
-        MeasurementSchema measurementSchema = ((LeafMNode) 
entry.getValue()).getSchema();
+        MeasurementSchema measurementSchema = ((MeasurementMNode) 
entry.getValue()).getSchema();
         chunkWriterCacheMap
             .put(new Path(device, entry.getKey()), new 
ChunkWriterImpl(measurementSchema));
       }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
index c404297..9999f9b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
@@ -25,6 +25,8 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.merge.seqMerge.SeqMergeFileStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 import org.junit.After;
@@ -37,6 +39,7 @@ public class IoTDBSensorUpdateIT {
   public void setUp() throws Exception {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.envSetUp();
+    
IoTDBDescriptor.getInstance().getConfig().setSeqMergeFileStrategy(SeqMergeFileStrategy.INPLACE);
     Class.forName(Config.JDBC_DRIVER_NAME);
   }
 

Reply via email to