This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1193fca [IOTDB-692] Fix merge update bug (#1262)
1193fca is described below
commit 1193fca6a42d937916f5b9a4d070bfa56f67547d
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Thu May 28 23:10:35 2020 +0800
[IOTDB-692] Fix merge update bug (#1262)
* fix merge bug and add IT test
---
.../db/engine/merge/task/MergeMultiChunkTask.java | 87 ++++++++++++----------
.../iotdb/db/integration/IoTDBSensorUpdateIT.java | 87 ++++++++++++++++++++++
2 files changed, 134 insertions(+), 40 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index f63a5cc..5d483c7 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -223,19 +223,21 @@ class MergeMultiChunkTask {
int[] ptWrittens = new int[seqChunkMeta.length];
int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig()
.getMergeChunkSubThreadNum();
- PriorityQueue<MetaListEntry>[] chunkMetaHeaps = new
PriorityQueue[mergeChunkSubTaskNum];
+ MetaListEntry[] metaListEntries = new
MetaListEntry[currMergingPaths.size()];
+ PriorityQueue<Integer>[] chunkIdxHeaps = new
PriorityQueue[mergeChunkSubTaskNum];
for (int i = 0; i < mergeChunkSubTaskNum; i++) {
- chunkMetaHeaps[i] = new PriorityQueue<>();
+ chunkIdxHeaps[i] = new PriorityQueue<>();
}
int idx = 0;
for (int i = 0; i < currMergingPaths.size(); i++) {
+ chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i);
if (seqChunkMeta[i].isEmpty()) {
continue;
}
+
MetaListEntry entry = new MetaListEntry(i, seqChunkMeta[i]);
entry.next();
-
- chunkMetaHeaps[idx % mergeChunkSubTaskNum].add(entry);
+ metaListEntries[i] = entry;
idx++;
ptWrittens[i] = 0;
}
@@ -247,7 +249,9 @@ class MergeMultiChunkTask {
for (int i = 0; i < mergeChunkSubTaskNum; i++) {
int finalI = i;
futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> {
- mergeChunkHeap(chunkMetaHeaps[finalI], ptWrittens, reader,
mergeFileWriter, unseqReaders,
+ mergeChunkHeap(chunkIdxHeaps[finalI], metaListEntries, ptWrittens,
+ reader,
+ mergeFileWriter, unseqReaders,
currFile,
isLastFile);
return null;
@@ -270,49 +274,52 @@ class MergeMultiChunkTask {
return mergedChunkNum.get() > 0;
}
- private void mergeChunkHeap(PriorityQueue<MetaListEntry> chunkMetaHeap,
int[] ptWrittens,
+ private void mergeChunkHeap(PriorityQueue<Integer> chunkIdxHeap,
MetaListEntry[] metaListEntries,
+ int[] ptWrittens,
TsFileSequenceReader reader,
RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders,
TsFileResource currFile,
boolean isLastFile) throws IOException {
- while (!chunkMetaHeap.isEmpty()) {
- MetaListEntry metaListEntry = chunkMetaHeap.poll();
- ChunkMetadata currMeta = metaListEntry.current();
- int pathIdx = metaListEntry.getPathId();
- boolean isLastChunk = !metaListEntry.hasNext();
+ while (!chunkIdxHeap.isEmpty()) {
+ int pathIdx = chunkIdxHeap.poll();
Path path = currMergingPaths.get(pathIdx);
MeasurementSchema measurementSchema = resource.getSchema(path);
IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
-
- boolean chunkOverflowed =
MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
- boolean chunkTooSmall = MergeUtils
- .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk,
minChunkPointNum);
-
- Chunk chunk;
- synchronized (reader) {
- chunk = reader.readMemChunk(currMeta);
- }
- ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed,
chunkTooSmall, chunk,
- ptWrittens[pathIdx], pathIdx, mergeFileWriter,
unseqReaders[pathIdx], chunkWriter,
- currFile);
-
- if (!isLastChunk) {
- metaListEntry.next();
- chunkMetaHeap.add(metaListEntry);
- } else {
- // this only happens when the seqFiles do not contain this series,
otherwise the remaining
- // data will be merged with the last chunk in the seqFiles
- if (isLastFile && currTimeValuePairs[pathIdx] != null) {
- ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter,
unseqReaders[pathIdx],
- Long.MAX_VALUE,
- pathIdx);
- mergedChunkNum.incrementAndGet();
+ if (metaListEntries[pathIdx] != null) {
+ MetaListEntry metaListEntry = metaListEntries[pathIdx];
+ ChunkMetadata currMeta = metaListEntry.current();
+ boolean isLastChunk = !metaListEntry.hasNext();
+ boolean chunkOverflowed = MergeUtils
+ .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
+ boolean chunkTooSmall = MergeUtils
+ .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk,
minChunkPointNum);
+
+ Chunk chunk;
+ synchronized (reader) {
+ chunk = reader.readMemChunk(currMeta);
+ }
+ ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed,
chunkTooSmall, chunk,
+ ptWrittens[pathIdx], pathIdx, mergeFileWriter,
unseqReaders[pathIdx], chunkWriter,
+ currFile);
+
+ if (!isLastChunk) {
+ metaListEntry.next();
+ chunkIdxHeap.add(pathIdx);
+ continue;
}
- // the last merged chunk may still be smaller than the threshold,
flush it anyway
- if (ptWrittens[pathIdx] > 0) {
- synchronized (mergeFileWriter) {
- chunkWriter.writeToFileWriter(mergeFileWriter);
- }
+ }
+ // this only happens when the seqFiles do not contain this series,
otherwise the remaining
+ // data will be merged with the last chunk in the seqFiles
+ if (isLastFile && currTimeValuePairs[pathIdx] != null) {
+ ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter,
unseqReaders[pathIdx],
+ Long.MAX_VALUE,
+ pathIdx);
+ mergedChunkNum.incrementAndGet();
+ }
+ // the last merged chunk may still be smaller than the threshold, flush
it anyway
+ if (ptWrittens[pathIdx] > 0) {
+ synchronized (mergeFileWriter) {
+ chunkWriter.writeToFileWriter(mergeFileWriter);
}
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
new file mode 100644
index 0000000..c404297
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBSensorUpdateIT {
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testMerge()
+ throws SQLException, InterruptedException {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("set storage group to root.demo");
+ statement.execute("create timeseries root.demo.d1.s1 with
datatype=INT64,encoding=RLE");
+ statement.execute("create timeseries root.demo.d1.s2 with
datatype=INT64,encoding=RLE");
+ statement.execute("create timeseries root.demo.d1.s3 with
datatype=INT64,encoding=RLE");
+ statement.execute("insert into root.demo.d1(time,s1,s2) values(1,1,2)");
+ statement.execute("flush");
+ statement.execute("insert into root.demo.d1(time,s3) values(1,1)");
+ statement.execute("flush");
+ try (ResultSet set = statement.executeQuery("SELECT * FROM root")) {
+ int cnt = 0;
+ while (set.next()) {
+ cnt++;
+ assertEquals(1, set.getLong("root.demo.d1.s3"));
+ }
+ assertEquals(1, cnt);
+ }
+ statement.execute("merge");
+ Thread.sleep(1000);
+ // before merge completes
+ try (ResultSet set = statement.executeQuery("SELECT * FROM root")) {
+ int cnt = 0;
+ while (set.next()) {
+ cnt++;
+ assertEquals(1, set.getLong("root.demo.d1.s3"));
+ }
+ assertEquals(1, cnt);
+ }
+
+ // after merge completes
+ statement.execute("DELETE FROM root.demo.d1");
+ }
+ }
+}