This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 338c7f0 [To rel/0.11] cherry pick inplace merge and fix windows ci
bug (#2481)
338c7f0 is described below
commit 338c7f0563c509056be89a80460b132d6456d7bb
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Wed Jan 13 13:58:07 2021 +0800
[To rel/0.11] cherry pick inplace merge and fix windows ci bug (#2481)
* cherry pick inplace merge and fix windows ci bug
* update wait to sleep
Co-authored-by: zhanglingzhe <[email protected]>
---
.../compaction/CompactionMergeTaskPoolManager.java | 2 +-
.../db/engine/compaction/TsFileManagement.java | 11 ++
.../level/LevelCompactionTsFileManagement.java | 3 +
.../iotdb/db/engine/merge/task/MergeFileTask.java | 158 +++++++++++++++++----
.../apache/iotdb/db/integration/IoTDBFillIT.java | 6 +
.../db/integration/IoTDBLevelCompactionIT.java | 1 -
.../iotdb/db/integration/IoTDBMergeTest.java | 5 +
.../iotdb/db/integration/IoTDBSensorUpdateIT.java | 3 -
8 files changed, 154 insertions(+), 35 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 218be58..8abe359 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -87,7 +87,7 @@ public class CompactionMergeTaskPoolManager implements
IService {
FilePathUtils.regularizePath(IoTDBDescriptor.getInstance().getConfig().getSystemDir())
+ "storage_groups");
File[] subDirList = sgDir.listFiles();
- if(subDirList!=null) {
+ if (subDirList != null) {
for (File subDir : subDirList) {
while (FSFactoryProducer.getFSFactory().getFile(
subDir.getAbsoluteFile() + File.separator + subDir.getName() +
COMPACTION_LOG_NAME)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 0c728c7..0522066 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -61,6 +61,7 @@ public abstract class TsFileManagement {
private final ReadWriteLock compactionMergeLock = new
ReentrantReadWriteLock();
public volatile boolean isUnseqMerging = false;
+ public volatile boolean isSeqMerging = false;
/**
* This is the modification file of the result of the current merge. Because
the merged file may
* be invisible at this moment, without this, deletion/update during merge
could be lost.
@@ -202,6 +203,16 @@ public abstract class TsFileManagement {
}
return;
}
+ // wait until seq merge has finished
+ while (isSeqMerging) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.error("{} [Compaction] shutdown", storageGroupName, e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
isUnseqMerging = true;
if (seqMergeList.isEmpty()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 66d7d71..aaa6f8a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -526,6 +526,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
return;
}
}
+ isSeqMerging = true;
long startTimeMillis = System.currentTimeMillis();
try {
logger.info("{} start to filter compaction condition", storageGroupName);
@@ -534,6 +535,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
// level is numbered from 0
if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
// do not merge current unseq file level to upper level and just
merge all of them to seq file
+ isSeqMerging = false;
merge(isForceFullMerge, getTsFileList(true),
mergeResources.get(i), Long.MAX_VALUE);
} else {
CompactionLogger compactionLogger = new
CompactionLogger(storageGroupDir,
@@ -589,6 +591,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
} catch (Exception e) {
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
+ isSeqMerging = false;
// reset the merge working state to false
logger.info("{} [Compaction] merge end time isSeq = {}, consumption: {}
ms",
storageGroupName, sequence,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index a83c36c..673ac34 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -19,17 +19,12 @@
package org.apache.iotdb.db.engine.merge.task;
-import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
@@ -95,13 +90,23 @@ class MergeFileTask {
int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile,
0);
int unmergedChunkNum =
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
-
- if (logger.isInfoEnabled()) {
- logger.info("{} moving unmerged data of {} to the merged file, {}
merged chunks, {} "
- + "unmerged chunks", taskName, seqFile.getTsFile().getName(),
mergedChunkNum,
- unmergedChunkNum);
+ if (mergedChunkNum >= unmergedChunkNum) {
+ // move the unmerged data to the new file
+ if (logger.isInfoEnabled()) {
+ logger.info("{} moving unmerged data of {} to the merged file, {}
merged chunks, {} "
+ + "unmerged chunks", taskName,
seqFile.getTsFile().getName(), mergedChunkNum,
+ unmergedChunkNum);
+ }
+ moveUnmergedToNew(seqFile);
+ } else {
+ // move the merged data to the old file
+ if (logger.isInfoEnabled()) {
+ logger.info("{} moving merged data of {} to the old file {} merged
chunks, {} "
+ + "unmerged chunks", taskName,
seqFile.getTsFile().getName(), mergedChunkNum,
+ unmergedChunkNum);
+ }
+ moveMergedToOld(seqFile);
}
- moveUnmergedToNew(seqFile);
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
@@ -118,7 +123,7 @@ class MergeFileTask {
}
private void logProgress() {
- if (logger.isInfoEnabled()) {
+ if (logger.isDebugEnabled()) {
logger.debug("{} has merged {}, processed {}/{} files", taskName,
currMergeFile,
currentMergeIndex + 1, unmergedFiles.size());
}
@@ -129,18 +134,120 @@ class MergeFileTask {
currentMergeIndex + 1, unmergedFiles.size());
}
- private void updateHistoricalVersions(TsFileResource seqFile) {
- // as the new file contains data of other files, track their versions in
the new file
- // so that we will be able to compare data across different IoTDBs that
share the same file
+ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
+ int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
+ if (mergedChunkNum == 0) {
+ resource.removeFileAndWriter(seqFile);
+ return;
+ }
+
+ seqFile.writeLock();
+ try {
+ ChunkMetadataCache.getInstance().remove(seqFile);
+
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
+
+ resource.removeFileReader(seqFile);
+ TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile);
+
+ // filter the chunks that have been merged
+ oldFileWriter.filterChunks(new
HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile))
+ );
+
+ RestorableTsFileIOWriter newFileWriter =
resource.getMergeFileWriter(seqFile);
+ newFileWriter.close();
+ try (TsFileSequenceReader newFileReader =
+ new TsFileSequenceReader(newFileWriter.getFile().getPath())) {
+ Map<String, List<ChunkMetadata>> chunkMetadataListInChunkGroups =
+ newFileWriter.getDeviceChunkMetadataMap();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} find {} merged chunk groups", taskName,
+ chunkMetadataListInChunkGroups.size());
+ }
+ for (Map.Entry<String, List<ChunkMetadata>> entry :
chunkMetadataListInChunkGroups
+ .entrySet()) {
+ String deviceId = entry.getKey();
+ List<ChunkMetadata> chunkMetadataList = entry.getValue();
+ writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader,
oldFileWriter);
+
+ if (Thread.interrupted()) {
+ Thread.currentThread().interrupt();
+ oldFileWriter.close();
+ restoreOldFile(seqFile);
+ return;
+ }
+ }
+ }
+ oldFileWriter.endFile();
+
+ updatePlanIndexes(seqFile);
+ seqFile.serialize();
+ mergeLogger.logFileMergeEnd();
+ logger.debug("{} moved merged chunks of {} to the old file", taskName,
seqFile);
+ } catch (Exception e) {
+ restoreOldFile(seqFile);
+ throw e;
+ } finally {
+ seqFile.writeUnlock();
+ }
+ }
+
+ /**
+ * Restore an old seq file which is being written new chunks when exceptions
occur or the task is
+ * aborted.
+ */
+ private void restoreOldFile(TsFileResource seqFile) throws IOException {
+ RestorableTsFileIOWriter oldFileRecoverWriter = new
RestorableTsFileIOWriter(
+ seqFile.getTsFile());
+ if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
+ oldFileRecoverWriter.endFile();
+ } else {
+ oldFileRecoverWriter.close();
+ }
+ }
+
+ /**
+ * Open an appending writer for an old seq file so we can add new chunks to
it.
+ */
+ private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws
IOException {
+ TsFileIOWriter oldFileWriter;
+ try {
+ oldFileWriter = new ForceAppendTsFileWriter(seqFile.getTsFile());
+ mergeLogger.logFileMergeStart(seqFile.getTsFile(),
+ ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+ logger.debug("{} moving merged chunks of {} to the old file", taskName,
seqFile);
+ ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
+ } catch (TsFileNotCompleteException e) {
+ // this file may already be truncated if this merge is a system reboot
merge
+ oldFileWriter = new RestorableTsFileIOWriter(seqFile.getTsFile());
+ }
+ return oldFileWriter;
+ }
+
+ private void updatePlanIndexes(TsFileResource seqFile) {
+ // as the new file contains data of other files, track their plan indexes
in the new file
+ // so that we will be able to compare data across different IoTDBs that
share the same index
// generation policy
// however, since the data of unseq files are mixed together, we won't be
able to know
// which files are exactly contained in the new file, so we have to record
all unseq files
// in the new file
- Set<Long> newHistoricalVersions = new
HashSet<>(seqFile.getHistoricalVersions());
- for (TsFileResource unseqFiles : resource.getUnseqFiles()) {
- newHistoricalVersions.addAll(unseqFiles.getHistoricalVersions());
+ for (TsFileResource unseqFile : resource.getUnseqFiles()) {
+ seqFile.updatePlanIndexes(unseqFile);
}
- seqFile.setHistoricalVersions(newHistoricalVersions);
+ }
+
+ private void writeMergedChunkGroup(List<ChunkMetadata> chunkMetadataList,
String device,
+ TsFileSequenceReader reader, TsFileIOWriter fileWriter)
+ throws IOException {
+ fileWriter.startChunkGroup(device);
+ long maxVersion = 0;
+ for (ChunkMetadata chunkMetaData : chunkMetadataList) {
+ Chunk chunk = reader.readMemChunk(chunkMetaData);
+ fileWriter.writeChunk(chunk, chunkMetaData);
+ maxVersion = Math.max(chunkMetaData.getVersion(), maxVersion);
+ context.incTotalPointWritten(chunkMetaData.getNumOfPoints());
+ }
+ fileWriter.writeVersion(maxVersion);
+ fileWriter.endChunkGroup();
}
private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
@@ -183,7 +290,7 @@ class MergeFileTask {
fileWriter.endFile();
- updateHistoricalVersions(seqFile);
+ updatePlanIndexes(seqFile);
seqFile.serialize();
mergeLogger.logFileMergeEnd();
logger.debug("{} moved unmerged chunks of {} to the new file", taskName,
seqFile);
@@ -192,7 +299,6 @@ class MergeFileTask {
try {
resource.removeFileReader(seqFile);
ChunkMetadataCache.getInstance().remove(seqFile);
-
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
File newMergeFile = seqFile.getTsFile();
newMergeFile.delete();
@@ -237,12 +343,4 @@ class MergeFileTask {
return maxVersion;
}
- private File getNextMergeVersionFile(File seqFile) {
- String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "")
- .split(IoTDBConstant.FILE_NAME_SEPARATOR);
- int mergeVersion = Integer.parseInt(splits[2]) + 1;
- return fsFactory.getFile(seqFile.getParentFile(),
- splits[0] + IoTDBConstant.FILE_NAME_SEPARATOR + splits[1]
- + IoTDBConstant.FILE_NAME_SEPARATOR + mergeVersion +
TSFILE_SUFFIX);
- }
-}
+}
\ No newline at end of file
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
index 4eb87ab..cc4bd3b 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
@@ -25,6 +25,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -129,9 +130,13 @@ public class IoTDBFillIT {
private static final String STATUS_STR_2 = "root.ln.wf01.wt02.status";
private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware";
+ boolean prevEnableUnseqCompaction;
+
@Before
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
+ prevEnableUnseqCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
prepareData();
@@ -140,6 +145,7 @@ public class IoTDBFillIT {
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
+
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(prevEnableUnseqCompaction);
}
@Test
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
index 761aa26..327dbfd 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
@@ -25,7 +25,6 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Random;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index 9f67377..3c86a2f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -95,6 +95,11 @@ public class IoTDBMergeTest {
}
statement.execute("FLUSH");
statement.execute("MERGE");
+ try{
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT * FROM
root.mergeTest")) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
index c404297..b35afbd 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
@@ -79,9 +79,6 @@ public class IoTDBSensorUpdateIT {
}
assertEquals(1, cnt);
}
-
- // after merge completes
- statement.execute("DELETE FROM root.demo.d1");
}
}
}