This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1193fca  [IOTDB-692] Fix merge update bug (#1262)
1193fca is described below

commit 1193fca6a42d937916f5b9a4d070bfa56f67547d
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Thu May 28 23:10:35 2020 +0800

    [IOTDB-692] Fix merge update bug (#1262)
    
    * fix merge bug and add IT test
---
 .../db/engine/merge/task/MergeMultiChunkTask.java  | 87 ++++++++++++----------
 .../iotdb/db/integration/IoTDBSensorUpdateIT.java  | 87 ++++++++++++++++++++++
 2 files changed, 134 insertions(+), 40 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index f63a5cc..5d483c7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -223,19 +223,21 @@ class MergeMultiChunkTask {
     int[] ptWrittens = new int[seqChunkMeta.length];
     int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig()
         .getMergeChunkSubThreadNum();
-    PriorityQueue<MetaListEntry>[] chunkMetaHeaps = new 
PriorityQueue[mergeChunkSubTaskNum];
+    MetaListEntry[] metaListEntries = new 
MetaListEntry[currMergingPaths.size()];
+    PriorityQueue<Integer>[] chunkIdxHeaps = new 
PriorityQueue[mergeChunkSubTaskNum];
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
-      chunkMetaHeaps[i] = new PriorityQueue<>();
+      chunkIdxHeaps[i] = new PriorityQueue<>();
     }
     int idx = 0;
     for (int i = 0; i < currMergingPaths.size(); i++) {
+      chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i);
       if (seqChunkMeta[i].isEmpty()) {
         continue;
       }
+
       MetaListEntry entry = new MetaListEntry(i, seqChunkMeta[i]);
       entry.next();
-
-      chunkMetaHeaps[idx % mergeChunkSubTaskNum].add(entry);
+      metaListEntries[i] = entry;
       idx++;
       ptWrittens[i] = 0;
     }
@@ -247,7 +249,9 @@ class MergeMultiChunkTask {
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
       int finalI = i;
       futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> {
-        mergeChunkHeap(chunkMetaHeaps[finalI], ptWrittens, reader, 
mergeFileWriter, unseqReaders,
+        mergeChunkHeap(chunkIdxHeaps[finalI], metaListEntries, ptWrittens,
+            reader,
+            mergeFileWriter, unseqReaders,
             currFile,
             isLastFile);
         return null;
@@ -270,49 +274,52 @@ class MergeMultiChunkTask {
     return mergedChunkNum.get() > 0;
   }
 
-  private void mergeChunkHeap(PriorityQueue<MetaListEntry> chunkMetaHeap, 
int[] ptWrittens,
+  private void mergeChunkHeap(PriorityQueue<Integer> chunkIdxHeap, 
MetaListEntry[] metaListEntries,
+      int[] ptWrittens,
       TsFileSequenceReader reader,
       RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders,
       TsFileResource currFile,
       boolean isLastFile) throws IOException {
-    while (!chunkMetaHeap.isEmpty()) {
-      MetaListEntry metaListEntry = chunkMetaHeap.poll();
-      ChunkMetadata currMeta = metaListEntry.current();
-      int pathIdx = metaListEntry.getPathId();
-      boolean isLastChunk = !metaListEntry.hasNext();
+    while (!chunkIdxHeap.isEmpty()) {
+      int pathIdx = chunkIdxHeap.poll();
       Path path = currMergingPaths.get(pathIdx);
       MeasurementSchema measurementSchema = resource.getSchema(path);
       IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
-
-      boolean chunkOverflowed = 
MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
-      boolean chunkTooSmall = MergeUtils
-          .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, 
minChunkPointNum);
-
-      Chunk chunk;
-      synchronized (reader) {
-        chunk = reader.readMemChunk(currMeta);
-      }
-      ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, 
chunkTooSmall, chunk,
-          ptWrittens[pathIdx], pathIdx, mergeFileWriter, 
unseqReaders[pathIdx], chunkWriter,
-          currFile);
-
-      if (!isLastChunk) {
-        metaListEntry.next();
-        chunkMetaHeap.add(metaListEntry);
-      } else {
-        // this only happens when the seqFiles do not contain this series, 
otherwise the remaining
-        // data will be merged with the last chunk in the seqFiles
-        if (isLastFile && currTimeValuePairs[pathIdx] != null) {
-          ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, 
unseqReaders[pathIdx],
-              Long.MAX_VALUE,
-              pathIdx);
-          mergedChunkNum.incrementAndGet();
+      if (metaListEntries[pathIdx] != null) {
+        MetaListEntry metaListEntry = metaListEntries[pathIdx];
+        ChunkMetadata currMeta = metaListEntry.current();
+        boolean isLastChunk = !metaListEntry.hasNext();
+        boolean chunkOverflowed = MergeUtils
+            .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
+        boolean chunkTooSmall = MergeUtils
+            .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, 
minChunkPointNum);
+
+        Chunk chunk;
+        synchronized (reader) {
+          chunk = reader.readMemChunk(currMeta);
+        }
+        ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, 
chunkTooSmall, chunk,
+            ptWrittens[pathIdx], pathIdx, mergeFileWriter, 
unseqReaders[pathIdx], chunkWriter,
+            currFile);
+
+        if (!isLastChunk) {
+          metaListEntry.next();
+          chunkIdxHeap.add(pathIdx);
+          continue;
         }
-        // the last merged chunk may still be smaller than the threshold, 
flush it anyway
-        if (ptWrittens[pathIdx] > 0) {
-          synchronized (mergeFileWriter) {
-            chunkWriter.writeToFileWriter(mergeFileWriter);
-          }
+      }
+      // this only happens when the seqFiles do not contain this series, 
otherwise the remaining
+      // data will be merged with the last chunk in the seqFiles
+      if (isLastFile && currTimeValuePairs[pathIdx] != null) {
+        ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, 
unseqReaders[pathIdx],
+            Long.MAX_VALUE,
+            pathIdx);
+        mergedChunkNum.incrementAndGet();
+      }
+      // the last merged chunk may still be smaller than the threshold, flush 
it anyway
+      if (ptWrittens[pathIdx] > 0) {
+        synchronized (mergeFileWriter) {
+          chunkWriter.writeToFileWriter(mergeFileWriter);
         }
       }
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
new file mode 100644
index 0000000..c404297
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBSensorUpdateIT {
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testMerge()
+      throws SQLException, InterruptedException {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("set storage group to root.demo");
+      statement.execute("create timeseries root.demo.d1.s1 with 
datatype=INT64,encoding=RLE");
+      statement.execute("create timeseries root.demo.d1.s2 with 
datatype=INT64,encoding=RLE");
+      statement.execute("create timeseries root.demo.d1.s3 with 
datatype=INT64,encoding=RLE");
+      statement.execute("insert into root.demo.d1(time,s1,s2) values(1,1,2)");
+      statement.execute("flush");
+      statement.execute("insert into root.demo.d1(time,s3) values(1,1)");
+      statement.execute("flush");
+      try (ResultSet set = statement.executeQuery("SELECT * FROM root")) {
+        int cnt = 0;
+        while (set.next()) {
+          cnt++;
+          assertEquals(1, set.getLong("root.demo.d1.s3"));
+        }
+        assertEquals(1, cnt);
+      }
+      statement.execute("merge");
+      Thread.sleep(1000);
+      // before merge completes
+      try (ResultSet set = statement.executeQuery("SELECT * FROM root")) {
+        int cnt = 0;
+        while (set.next()) {
+          cnt++;
+          assertEquals(1, set.getLong("root.demo.d1.s3"));
+        }
+        assertEquals(1, cnt);
+      }
+
+      // after merge completes
+      statement.execute("DELETE FROM root.demo.d1");
+    }
+  }
+}

Reply via email to