This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 75d7d4c [To rel/0.11] Cherry pick
add_continuous_compaction_in_level_compaction_strategy (#3187)
75d7d4c is described below
commit 75d7d4c3d33e321b1781ffcf736ece0e9d66fb27
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Fri May 14 14:07:16 2021 +0800
[To rel/0.11] Cherry pick
add_continuous_compaction_in_level_compaction_strategy (#3187)
---
.../resources/conf/iotdb-engine.properties | 4 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 21 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 21 +-
.../db/engine/compaction/TsFileManagement.java | 9 +-
.../level/LevelCompactionTsFileManagement.java | 56 ++---
.../engine/storagegroup/StorageGroupProcessor.java | 18 +-
.../compaction/LevelCompactionCacheTest.java | 3 +-
.../engine/compaction/LevelCompactionLogTest.java | 7 +-
.../compaction/LevelCompactionMergeTest.java | 74 ++++++-
.../compaction/LevelCompactionMoreDataTest.java | 238 +++++++++++++++++++++
.../db/engine/compaction/LevelCompactionTest.java | 3 +-
.../NoCompactionTsFileManagementTest.java | 7 +-
.../org/apache/iotdb/db/script/EnvScriptIT.java | 6 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 6 +-
14 files changed, 416 insertions(+), 57 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 2bae439..26dc1ed 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -300,6 +300,10 @@ compaction_strategy=LEVEL_COMPACTION
enable_unseq_compaction = true
# Works when the compaction_strategy is LEVEL_COMPACTION.
+# Whether to start next compaction task automatically after finish one
compaction task
+# enable_continuous_compaction=true
+
+# Works when the compaction_strategy is LEVEL_COMPACTION.
# The max seq file num of each level.
# When the num of files in one level exceeds this,
# the files in this level will merge to one and put to upper level.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4a6fec5..1f706f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -318,10 +318,15 @@ public class IoTDBConfig {
private boolean enableUnseqCompaction = true;
/**
- * Works when the compaction_strategy is LEVEL_COMPACTION.
- * The max seq file num of each level.
- * When the num of files in one level exceeds this,
- * the files in this level will merge to one and put to upper level.
+ * Works when the compaction_strategy is LEVEL_COMPACTION. Whether to start
next compaction task
+ * automatically after finish one compaction task
+ */
+ private boolean enableContinuousCompaction = true;
+
+ /**
+ * Works when the compaction_strategy is LEVEL_COMPACTION. The max seq file
num of each level.
+ * When the num of files in one level exceeds this, the files in this level
will merge to one and
+ * put to upper level.
*/
private int seqFileNumInEachLevel = 6;
@@ -1479,6 +1484,14 @@ public class IoTDBConfig {
this.enableUnseqCompaction = enableUnseqCompaction;
}
+ public boolean isEnableContinuousCompaction() {
+ return enableContinuousCompaction;
+ }
+
+ public void setEnableContinuousCompaction(boolean
enableContinuousCompaction) {
+ this.enableContinuousCompaction = enableContinuousCompaction;
+ }
+
public int getSeqFileNumInEachLevel() {
return seqFileNumInEachLevel;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 0aa0c19..98f5ab0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -279,9 +279,17 @@ public class IoTDBDescriptor {
conf.setMemtableSizeThreshold(memTableSizeThreshold);
}
- conf.setAvgSeriesPointNumberThreshold(Integer.parseInt(properties
- .getProperty("avg_series_point_number_threshold",
- Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
+ conf.setAvgSeriesPointNumberThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "avg_series_point_number_threshold",
+ Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
+
+ conf.setEnableContinuousCompaction(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_continuous_compaction",
+ Boolean.toString(conf.isEnableContinuousCompaction()))));
conf.setCheckPeriodWhenInsertBlocked(Integer.parseInt(properties
.getProperty("check_period_when_insert_blocked",
@@ -760,6 +768,13 @@ public class IoTDBDescriptor {
// update debug_state
conf.setDebugState(Boolean.parseBoolean(properties.getProperty("debug_state")));
+ // update enable_continuous_compaction
+ conf.setEnableContinuousCompaction(
+
Boolean.parseBoolean(properties.getProperty("enable_continuous_compaction")));
+
+ // update merge_write_throughput_mb_per_sec
+ conf.setMergeWriteThroughputMbPerSec(
+
Integer.parseInt(properties.getProperty("merge_write_throughput_mb_per_sec")));
} catch (Exception e) {
throw new QueryProcessException(
String.format("Fail to reload configuration because %s", e));
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 226f686..0cf1e74 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -68,6 +68,9 @@ public abstract class TsFileManagement {
private long mergeStartTime;
+ /** whether execute merge chunk in this task */
+ protected boolean isMergeExecutedInCurrentTask = false;
+
protected boolean isForceFullMerge =
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
public TsFileManagement(String storageGroupName, String storageGroupDir) {
@@ -161,7 +164,7 @@ public abstract class TsFileManagement {
@Override
public void run() {
merge(timePartitionId);
- closeCompactionMergeCallBack.call();
+ closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask,
timePartitionId);
}
}
@@ -176,7 +179,9 @@ public abstract class TsFileManagement {
@Override
public void run() {
recover();
- closeCompactionMergeCallBack.call();
+ // in recover logic, we do not have to start next compaction task, and
in this case the param
+ // time partition is useless, we can just pass 0L
+ closeCompactionMergeCallBack.call(false, 0L);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 61ba9b4..7a41518 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -533,25 +533,21 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
forkTsFileList(
forkedSequenceTsFileResources,
sequenceTsFileResources.computeIfAbsent(timePartition,
this::newSequenceTsFileResources),
- seqLevelNum,
- seqFileNumInEachLevel);
+ seqLevelNum);
}
// we have to copy all unseq file
synchronized (unSequenceTsFileResources) {
+ // we have to copy all unseq file
forkTsFileList(
forkedUnSequenceTsFileResources,
unSequenceTsFileResources.computeIfAbsent(
timePartition, this::newUnSequenceTsFileResources),
- unseqLevelNum + 1,
- unseqFileNumInEachLevel);
+ unseqLevelNum + 1);
}
}
private void forkTsFileList(
- List<List<TsFileResource>> forkedTsFileResources,
- List rawTsFileResources,
- int currMaxLevel,
- int currFileNumInEachLevel) {
+ List<List<TsFileResource>> forkedTsFileResources, List
rawTsFileResources, int currMaxLevel) {
forkedTsFileResources.clear();
for (int i = 0; i < currMaxLevel - 1; i++) {
List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
@@ -560,9 +556,6 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
for (TsFileResource tsFileResource : levelRawTsFileResources) {
if (tsFileResource.isClosed()) {
forkedLevelTsFileResources.add(tsFileResource);
- if (forkedLevelTsFileResources.size() > currFileNumInEachLevel) {
- break;
- }
}
}
forkedTsFileResources.add(forkedLevelTsFileResources);
@@ -571,25 +564,31 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
@Override
protected void merge(long timePartition) {
- merge(forkedSequenceTsFileResources, true, timePartition, seqLevelNum,
seqFileNumInEachLevel);
- if (enableUnseqCompaction && unseqLevelNum <= 1 &&
forkedUnSequenceTsFileResources.size() > 0) {
+ isMergeExecutedInCurrentTask =
+ merge(
+ forkedSequenceTsFileResources, true, timePartition, seqLevelNum,
seqFileNumInEachLevel);
+ if (enableUnseqCompaction
+ && unseqLevelNum <= 1
+ && forkedUnSequenceTsFileResources.get(0).size() > 0) {
+ isMergeExecutedInCurrentTask = true;
merge(
isForceFullMerge,
- getTsFileList(true),
+ getTsFileListByTimePartition(true, timePartition),
forkedUnSequenceTsFileResources.get(0),
Long.MAX_VALUE);
} else {
- merge(
- forkedUnSequenceTsFileResources,
- false,
- timePartition,
- unseqLevelNum,
- unseqFileNumInEachLevel);
+ isMergeExecutedInCurrentTask =
+ merge(
+ forkedUnSequenceTsFileResources,
+ false,
+ timePartition,
+ unseqLevelNum,
+ unseqFileNumInEachLevel);
}
}
@SuppressWarnings("squid:S3776")
- private void merge(
+ private boolean merge(
List<List<TsFileResource>> mergeResources,
boolean sequence,
long timePartition,
@@ -602,21 +601,26 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
} catch (InterruptedException e) {
logger.error("{} [Compaction] shutdown", storageGroupName, e);
Thread.currentThread().interrupt();
- return;
+ return false;
}
}
isSeqMerging = true;
long startTimeMillis = System.currentTimeMillis();
+ // whether execute merge chunk in the loop below
+ boolean isMergeExecutedInCurrentTask = false;
try {
logger.info("{} start to filter compaction condition", storageGroupName);
for (int i = 0; i < currMaxLevel - 1; i++) {
- if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+ List<TsFileResource> currLevelTsFileResource = mergeResources.get(i);
+ if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
+ // just merge part of the file
+ isMergeExecutedInCurrentTask = true;
// level is numbered from 0
if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
// do not merge current unseq file level to upper level and just
merge all of them to
// seq file
isSeqMerging = false;
- merge(isForceFullMerge, getTsFileList(true),
mergeResources.get(i), Long.MAX_VALUE);
+ merge(isForceFullMerge, getTsFileListByTimePartition(true,
timePartition), mergeResources.get(i), Long.MAX_VALUE);
} else {
CompactionLogger compactionLogger =
new CompactionLogger(storageGroupDir, storageGroupName);
@@ -628,7 +632,8 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
i + 1);
compactionLogger.logSequence(sequence);
compactionLogger.logFile(TARGET_NAME, newLevelFile);
- List<TsFileResource> toMergeTsFiles = mergeResources.get(i);
+ List<TsFileResource> toMergeTsFiles =
+ mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
logger.info(
"{} [Compaction] merge level-{}'s {} TsFiles to next level",
storageGroupName,
@@ -692,6 +697,7 @@ public class LevelCompactionTsFileManagement extends
TsFileManagement {
sequence,
System.currentTimeMillis() - startTimeMillis);
}
+ return isMergeExecutedInCurrentTask;
}
/** if level < maxLevel-1, the file need compaction else, the file can be
merged later */
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 4fd2d0f..1440a51 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -354,7 +354,8 @@ public class StorageGroupProcessor {
throw new StorageGroupProcessorException(e);
}
- for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
+ List<TsFileResource> seqTsFileResources =
tsFileManagement.getTsFileList(true);
+ for (TsFileResource resource : seqTsFileResources) {
long timePartitionId = resource.getTimePartition();
Map<String, Long> endTimeMap = new HashMap<>();
for (Entry<String, Integer> entry :
resource.getDeviceToIndexMap().entrySet()) {
@@ -382,7 +383,7 @@ public class StorageGroupProcessor {
.submitTask(
tsFileManagement.new
CompactionRecoverTask(this::closeCompactionMergeCallBack));
} catch (RejectedExecutionException e) {
- this.closeCompactionMergeCallBack();
+ this.closeCompactionMergeCallBack(false, 0);
logger.error("{} compaction submit task failed", storageGroupName);
}
} else {
@@ -1872,7 +1873,7 @@ public class StorageGroupProcessor {
tsFileManagement
.new CompactionMergeTask(this::closeCompactionMergeCallBack,
timePartition));
} catch (IOException | RejectedExecutionException e) {
- this.closeCompactionMergeCallBack();
+ this.closeCompactionMergeCallBack(false, timePartition);
logger.error("{} compaction submit task failed", storageGroupName);
}
} else {
@@ -1881,8 +1882,13 @@ public class StorageGroupProcessor {
}
/** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack() {
- this.compactionMergeWorking = false;
+ private void closeCompactionMergeCallBack(boolean isMerge, long
timePartitionId) {
+ if (isMerge &&
IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+ executeCompaction(
+ timePartitionId,
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ } else {
+ this.compactionMergeWorking = false;
+ }
}
/**
@@ -2737,7 +2743,7 @@ public class StorageGroupProcessor {
@FunctionalInterface
public interface CloseCompactionMergeCallBack {
- void call();
+ void call(boolean isMergeExecutedInCurrentTask, long timePartitionId);
}
@FunctionalInterface
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index 4169816..6c03e4f 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -119,7 +119,8 @@ public class LevelCompactionCacheTest extends
LevelCompactionTest {
}
/** close compaction merge callback, to release some locks */
- private void closeCompactionMergeCallBack() {
+ private void closeCompactionMergeCallBack(
+ boolean isMergeExecutedInCurrentTask, long timePartitionId) {
this.compactionMergeWorking = false;
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
index a6d1647..8bb4e9c 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
@@ -73,10 +73,9 @@ public class LevelCompactionLogTest extends
LevelCompactionTest {
assertFalse(logFile.exists());
}
- /**
- * close compaction merge callback, to release some locks
- */
- private void closeCompactionMergeCallBack() {
+ /** close compaction merge callback, to release some locks */
+ private void closeCompactionMergeCallBack(
+ boolean isMergeExecutedInCurrentTask, long timePartitionId) {
this.compactionMergeWorking = false;
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 8970a4c..4a6632c 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -24,11 +24,16 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import
org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -133,10 +138,71 @@ public class LevelCompactionMergeTest extends
LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
}
- /**
- * close compaction merge callback, to release some locks
- */
- private void closeCompactionMergeCallBack() {
+ @Test
+ public void testCompactionModsByOffsetAfterMerge() throws
IllegalPathException, IOException {
+ int prevPageLimit =
+
IoTDBDescriptor.getInstance().getConfig().getMergePagePointNumberThreshold();
+
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(1);
+
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG,
tempSGDir.getPath());
+ TsFileResource forthSeqTsFileResource = seqResources.get(3);
+ PartialPath path =
+ new PartialPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[0].getMeasurementId());
+ try (ModificationFile sourceModificationFile =
+ new ModificationFile(
+ forthSeqTsFileResource.getTsFilePath() +
ModificationFile.FILE_SUFFIX)) {
+ Modification modification =
+ new Deletion(path, forthSeqTsFileResource.getTsFileSize() / 10, 300,
310);
+ sourceModificationFile.write(modification);
+ }
+ levelCompactionTsFileManagement.addAll(seqResources, true);
+ levelCompactionTsFileManagement.addAll(unseqResources, false);
+ levelCompactionTsFileManagement.forkCurrentFileList(0);
+ CompactionMergeTask compactionMergeTask =
+ levelCompactionTsFileManagement
+ .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+ compactionMergeWorking = true;
+ compactionMergeTask.run();
+ while (compactionMergeWorking) {
+ // wait
+ }
+ QueryContext context = new QueryContext();
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[0].getType(),
+ context,
+ levelCompactionTsFileManagement.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+
+ long count = 0L;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ System.out.println(batchData.getTimeByIndex(i));
+ }
+ count += batchData.length();
+ }
+ assertEquals(489, count);
+
+ List<TsFileResource> tsFileResourceList =
levelCompactionTsFileManagement.getTsFileList(true);
+ for (TsFileResource tsFileResource : tsFileResourceList) {
+ tsFileResource.getModFile().remove();
+ tsFileResource.remove();
+ }
+
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(prevPageLimit);
+ }
+
+ /** close compaction merge callback, to release some locks */
+ private void closeCompactionMergeCallBack(
+ boolean isMergeExecutedInCurrentTask, long timePartitionId) {
this.compactionMergeWorking = false;
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
new file mode 100644
index 0000000..565741a
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.constant.TestConstant;
+import
org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
+import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+
+public class LevelCompactionMoreDataTest extends LevelCompactionTest {
+
+ protected int measurementNum = 3000;
+
+ File tempSGDir;
+ boolean compactionMergeWorking = false;
+
+ @Override
+ protected void prepareSeries() throws MetadataException {
+ measurementSchemas = new MeasurementSchema[measurementNum];
+ for (int i = 0; i < measurementNum; i++) {
+ measurementSchemas[i] =
+ new MeasurementSchema(
+ "sensor" + i, TSDataType.DOUBLE, encoding,
CompressionType.UNCOMPRESSED);
+ }
+ deviceIds = new String[deviceNum];
+ for (int i = 0; i < deviceNum; i++) {
+ deviceIds[i] = COMPACTION_TEST_SG + PATH_SEPARATOR + "device" + i;
+ }
+ IoTDB.metaManager.setStorageGroup(new PartialPath(COMPACTION_TEST_SG));
+ for (String device : deviceIds) {
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ PartialPath devicePath = new PartialPath(device);
+ IoTDB.metaManager.createTimeseries(
+ devicePath.concatNode(measurementSchema.getMeasurementId()),
+ measurementSchema.getType(),
+ measurementSchema.getEncodingType(),
+ measurementSchema.getCompressor(),
+ Collections.emptyMap());
+ }
+ }
+ }
+
+ @Override
+ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException,
WriteProcessException {
+ for (int i = 0; i < seqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) i);
+ seqResources.add(tsFileResource);
+ prepareFile(tsFileResource, i * ptNum, ptNum, 0);
+ }
+ for (int i = 0; i < unseqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ (10000 + i)
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + (10000 + i)
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) (i + seqFileNum));
+ unseqResources.add(tsFileResource);
+ prepareFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum,
10000);
+ }
+
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ unseqFileNum
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + unseqFileNum
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) (seqFileNum + unseqFileNum));
+ unseqResources.add(tsFileResource);
+ prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
+ }
+
+ @Override
+ void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
long valueOffset)
+ throws IOException, WriteProcessException {
+ TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile());
+ for (String deviceId : deviceIds) {
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ fileWriter.registerTimeseries(
+ new Path(deviceId, measurementSchema.getMeasurementId()),
measurementSchema);
+ }
+ }
+ for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+ for (int j = 0; j < deviceNum; j++) {
+ TSRecord record = new TSRecord(i, deviceIds[j]);
+ for (int k = 0; k < measurementNum; k++) {
+ record.addTuple(
+ DataPoint.getDataPoint(
+ measurementSchemas[k].getType(),
+ measurementSchemas[k].getMeasurementId(),
+ String.valueOf(i + valueOffset + k)));
+ }
+ fileWriter.write(record);
+ tsFileResource.updateStartTime(deviceIds[j], i);
+ tsFileResource.updateEndTime(deviceIds[j], i);
+ }
+ if ((i + 1) % flushInterval == 0) {
+ fileWriter.flushAllChunkGroups();
+ }
+ }
+ fileWriter.close();
+ }
+
+ @Before
+ public void setUp() throws IOException, WriteProcessException,
MetadataException {
+ super.setUp();
+ tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
+ tempSGDir.mkdirs();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ FileUtils.deleteDirectory(tempSGDir);
+ }
+
+ // test file compaction larger than 1024 sensor
+ @Test
+ public void testSensorWithTwoOrThreeNode() throws IllegalPathException,
IOException {
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG,
tempSGDir.getPath());
+ levelCompactionTsFileManagement.addAll(seqResources, true);
+ levelCompactionTsFileManagement.addAll(unseqResources, false);
+ levelCompactionTsFileManagement.forkCurrentFileList(0);
+ CompactionMergeTask compactionMergeTask =
+ levelCompactionTsFileManagement
+ .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+ compactionMergeWorking = true;
+ compactionMergeTask.run();
+ while (compactionMergeWorking) {
+ // wait
+ }
+ QueryContext context = new QueryContext();
+ PartialPath path =
+ new PartialPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[2688].getMeasurementId());
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[2688].getType(),
+ context,
+ levelCompactionTsFileManagement.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i) + 2688,
batchData.getDoubleByIndex(i), 0.001);
+ }
+ }
+ }
+
+ /** close compaction merge callback, to release some locks */
+ private void closeCompactionMergeCallBack(
+ boolean isMergeExecutedInCurrentTask, long timePartitionId) {
+ this.compactionMergeWorking = false;
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
index 7fbc509..dc71ea4 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -97,7 +96,7 @@ abstract class LevelCompactionTest {
EnvironmentUtils.cleanAllDir();
}
- private void prepareSeries() throws MetadataException {
+ protected void prepareSeries() throws MetadataException {
measurementSchemas = new MeasurementSchema[measurementNum];
for (int i = 0; i < measurementNum; i++) {
measurementSchemas[i] = new MeasurementSchema("sensor" + i,
TSDataType.DOUBLE,
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
index f99ec45..e3f21f3 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
@@ -113,9 +113,10 @@ public class NoCompactionTsFileManagementTest extends
LevelCompactionTest {
+ ".tsfile"))), false);
noCompactionTsFileManagement.forkCurrentFileList(0);
noCompactionTsFileManagement.recover();
- CompactionMergeTask compactionMergeTask = noCompactionTsFileManagement.new
CompactionMergeTask(
- () -> {
- }, 0);
+ CompactionMergeTask compactionMergeTask =
+ noCompactionTsFileManagement
+ .new CompactionMergeTask(
+ (boolean isMergeExecutedInCurrentTask, long timePartitionId) ->
{}, 0);
compactionMergeTask.run();
assertEquals(1, noCompactionTsFileManagement.size(true));
assertEquals(1, noCompactionTsFileManagement.size(false));
diff --git a/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
b/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
index 604a305..90142a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
@@ -21,7 +21,11 @@ package org.apache.iotdb.db.script;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 7ad89b2..5ac96f4 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -135,8 +135,10 @@ public class TsFileIOWriter {
}
protected void startFile() throws IOException {
- out.write(magicStringBytes);
- out.write(versionNumberBytes);
+ if (out != null) {
+ out.write(magicStringBytes);
+ out.write(versionNumberBytes);
+ }
}
public void startChunkGroup(String deviceId) throws IOException {