This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new df6dddc fix hot compaction read bug (#1876)
df6dddc is described below
commit df6dddc198d13d83952c5ea20c23a7455315eaae
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Tue Oct 27 23:45:28 2020 +0800
fix hot compaction read bug (#1876)
---
.../resources/conf/iotdb-engine.properties | 2 +-
.../db/engine/storagegroup/TsFileProcessor.java | 2 +-
.../level/LevelTsFileManagement.java | 187 +++++----------------
3 files changed, 45 insertions(+), 146 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 183ba86..82d83f0 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -265,7 +265,7 @@ default_fill_interval=-1
### Merge Configurations
####################
# TsFile manage strategy, define use which hot compaction strategy
-# now we have normal_strategy, level_strategy
+# now we have NORMAL_STRATEGY, LEVEL_STRATEGY
tsfile_manage_strategy=NORMAL_STRATEGY
# Work when tsfile_manage_strategy is level_strategy.
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9374ccb..3cc67ba 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -743,7 +743,7 @@ public class TsFileProcessor {
Map<String, String> props, QueryContext context,
List<TsFileResource> tsfileResourcesForQuery) throws IOException,
MetadataException {
if (logger.isDebugEnabled()) {
- logger.debug("{}: {} get flushQueryLock and vmMergeLock read lock",
storageGroupName,
+ logger.debug("{}: {} get flushQueryLock and hotCompactionMergeLock read
lock", storageGroupName,
tsFileResource.getTsFile().getName());
}
flushQueryLock.readLock().lock();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
index 9e95c63..575e645 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
@@ -72,8 +72,6 @@ public class LevelTsFileManagement extends TsFileManagement {
.getMaxUnseqLevelNum();
private final int maxUnseqFileNumInEachLevel =
IoTDBDescriptor.getInstance().getConfig()
.getMaxFileNumInEachLevel();
- private final int maxChunkPointNum =
IoTDBDescriptor.getInstance().getConfig()
- .getMergeChunkPointNumberThreshold();
private final boolean isForceFullMerge =
IoTDBDescriptor.getInstance().getConfig()
.isForceFullMerge();
// First map is partition list; Second list is level list; Third list is
file list in level;
@@ -82,11 +80,6 @@ public class LevelTsFileManagement extends TsFileManagement {
private final List<List<TsFileResource>> forkedSequenceTsFileResources = new
ArrayList<>();
private final List<List<TsFileResource>> forkedUnSequenceTsFileResources =
new ArrayList<>();
- private double forkedSeqListPointNum = 0;
- private double forkedSeqListMeasurementSize = 0;
- private double forkedUnSeqListPointNum = 0;
- private double forkedUnSeqListMeasurementSize = 0;
-
public LevelTsFileManagement(String storageGroupName, String
storageGroupDir) {
super(storageGroupName, storageGroupDir);
clear();
@@ -94,8 +87,8 @@ public class LevelTsFileManagement extends TsFileManagement {
private void deleteLevelFiles(long timePartitionId,
Collection<TsFileResource> mergeTsFiles) {
logger.debug("{} [hot compaction] merge starts to delete file",
storageGroupName);
- for (TsFileResource vmMergeTsFile : mergeTsFiles) {
- deleteLevelFile(vmMergeTsFile);
+ for (TsFileResource mergeTsFile : mergeTsFiles) {
+ deleteLevelFile(mergeTsFile);
}
for (int i = 0; i < maxLevelNum; i++) {
sequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
@@ -119,36 +112,6 @@ public class LevelTsFileManagement extends
TsFileManagement {
}
}
- private void flushAllFilesToLastLevel(long timePartitionId,
- List<List<TsFileResource>> currMergeFiles,
- HotCompactionLogger hotCompactionLogger, boolean sequence) throws
IOException {
- TsFileResource sourceFile = currMergeFiles.get(0).get(0);
- File newTargetFile = createNewTsFileName(sourceFile.getTsFile(),
- sequence ? (maxLevelNum - 1) : (maxUnseqLevelNum - 1));
- TsFileResource targetResource = new TsFileResource(newTargetFile);
- List<TsFileResource> mergeFiles = new ArrayList<>();
- for (int i = currMergeFiles.size() - 1; i >= 0; i--) {
- mergeFiles.addAll(sequenceTsFileResources.get(timePartitionId).get(i));
- }
- HotCompactionUtils.merge(targetResource, mergeFiles,
- storageGroupName, hotCompactionLogger, new HashSet<>(), sequence);
- hotCompactionLogger.logFullMerge();
- hotCompactionLogger.logSequence(sequence);
- hotCompactionLogger.logFile(TARGET_NAME, newTargetFile);
- writeLock();
- if (sequence) {
- for (int i = 0; i < maxLevelNum - 1; i++) {
- deleteLevelFiles(timePartitionId, currMergeFiles.get(i));
- }
- } else {
- for (int i = 0; i < maxUnseqLevelNum - 1; i++) {
- deleteLevelFiles(timePartitionId, currMergeFiles.get(i));
- }
- }
- writeUnlock();
- hotCompactionLogger.logMergeFinish();
- }
-
@Override
public List<TsFileResource> getStableTsFileList(boolean sequence) {
List<TsFileResource> result = new ArrayList<>();
@@ -415,13 +378,13 @@ public class LevelTsFileManagement extends
TsFileManagement {
}
}
} catch (IOException e) {
- logger.error("recover vm error ", e);
+ logger.error("recover level tsfile management error ", e);
} finally {
if (logFile.exists()) {
try {
Files.delete(logFile.toPath());
} catch (IOException e) {
- logger.error("delete vm log file error ", e);
+ logger.error("delete level tsfile management log file error ", e);
}
}
}
@@ -429,88 +392,38 @@ public class LevelTsFileManagement extends
TsFileManagement {
@Override
public void forkCurrentFileList(long timePartition) {
- Pair<Double, Double> seqStatisticsPair = forkTsFileList(
+ forkTsFileList(
forkedSequenceTsFileResources,
sequenceTsFileResources.computeIfAbsent(timePartition,
this::newSequenceTsFileResources),
maxLevelNum);
- forkedSeqListPointNum = seqStatisticsPair.left;
- forkedSeqListMeasurementSize = seqStatisticsPair.right;
- Pair<Double, Double> unSeqStatisticsPair = forkTsFileList(
+ forkTsFileList(
forkedUnSequenceTsFileResources,
unSequenceTsFileResources
.computeIfAbsent(timePartition,
this::newUnSequenceTsFileResources),
maxUnseqLevelNum);
- forkedUnSeqListPointNum = unSeqStatisticsPair.left;
- forkedUnSeqListMeasurementSize = unSeqStatisticsPair.right;
}
- private Pair<Double, Double> forkTsFileList(
+ private void forkTsFileList(
List<List<TsFileResource>> forkedTsFileResources,
List rawTsFileResources, int currMaxLevel) {
forkedTsFileResources.clear();
- // just fork part of the TsFile list, controlled by max_merge_chunk_point
- long pointNum = 0;
- // all flush to target file
- ICardinality measurementSet = new HyperLogLog(13);
for (int i = 0; i < currMaxLevel - 1; i++) {
List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
Collection<TsFileResource> levelRawTsFileResources =
(Collection<TsFileResource>) rawTsFileResources
.get(i);
- List<TsFileResource> allCurrLevelTsFileResources = new
ArrayList<>(levelRawTsFileResources);
- for (TsFileResource tsFileResource : allCurrLevelTsFileResources) {
+ for (TsFileResource tsFileResource : levelRawTsFileResources) {
if (tsFileResource.isClosed()) {
- String path = tsFileResource.getTsFile().getAbsolutePath();
- if (tsFileResource.getTsFile().exists()) {
- try (TsFileSequenceReader reader = new TsFileSequenceReader(path))
{
- List<String> devices = reader.getAllDevices();
- for (String device : devices) {
- Map<String, List<ChunkMetadata>>
measurementChunkMetadataListMap = reader
- .readChunkMetadataInDevice(device);
- for (Entry<String, List<ChunkMetadata>>
measurementChunkMetadataList : measurementChunkMetadataListMap
- .entrySet()) {
- Path sensorPath = new Path(device,
measurementChunkMetadataList.getKey());
- measurementSet.offer(sensorPath.getFullPath());
- List<ChunkMetadata> chunkMetadataList =
reader.getChunkMetadataList(sensorPath);
- for (ChunkMetadata chunkMetadata : chunkMetadataList) {
- pointNum += chunkMetadata.getNumOfPoints();
- }
- }
- }
- } catch (IOException e) {
- logger.error(
- "{} tsfile reader creates error", path, e);
- }
- } else {
- logger.info("{} tsfile does not exist", path);
- }
- }
- forkedLevelTsFileResources.add(tsFileResource);
- if (measurementSet.cardinality() > 0
- && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
- break;
+ forkedLevelTsFileResources.add(tsFileResource);
}
}
-
forkedTsFileResources.add(forkedLevelTsFileResources);
- if (measurementSet.cardinality() > 0
- && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
- break;
- }
}
-
- // fill in empty file
- while (forkedTsFileResources.size() < currMaxLevel) {
- List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>();
- forkedTsFileResources.add(emptyForkedLevelTsFileResources);
- }
-
- return new Pair<>((double) pointNum, (double)
measurementSet.cardinality());
}
@Override
protected void merge(long timePartition) {
merge(forkedSequenceTsFileResources, true, timePartition, maxLevelNum,
maxFileNumInEachLevel);
- if (maxUnseqLevelNum <= 1) {
+ if (maxUnseqLevelNum <= 1 && forkedUnSequenceTsFileResources.size() > 0) {
merge(isForceFullMerge, getTsFileList(true),
forkedUnSequenceTsFileResources.get(0),
Long.MAX_VALUE);
} else {
@@ -525,57 +438,43 @@ public class LevelTsFileManagement extends
TsFileManagement {
long startTimeMillis = System.currentTimeMillis();
try {
logger.info("{} start to filter hot compaction condition",
storageGroupName);
- double pointNum = sequence ? forkedSeqListPointNum :
forkedUnSeqListPointNum;
- double measurementSize =
- sequence ? forkedSeqListMeasurementSize :
forkedUnSeqListMeasurementSize;
- logger
- .info("{} current sg subLevel point num: {}, approximate measurement
num: {}",
- storageGroupName, pointNum,
- measurementSize);
HotCompactionLogger hotCompactionLogger = new
HotCompactionLogger(storageGroupDir,
storageGroupName);
- if (measurementSize > 0 && pointNum / measurementSize >=
maxChunkPointNum) {
- // merge all tsfile to last level
- logger.info("{} merge {} level tsfiles to next level",
storageGroupName,
- mergeResources.size());
- flushAllFilesToLastLevel(timePartition, mergeResources,
hotCompactionLogger, sequence);
- } else {
- for (int i = 0; i < currMaxLevel - 1; i++) {
- if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
- //level is numbered from 0
- if (!sequence && i == currMaxLevel - 2) {
- // do not merge current unseq file level to upper level and just
merge all of them to seq file
- merge(isForceFullMerge, getTsFileList(true),
mergeResources.get(i), Long.MAX_VALUE);
- } else {
- for (TsFileResource mergeResource : mergeResources.get(i)) {
- hotCompactionLogger.logFile(SOURCE_NAME,
mergeResource.getTsFile());
+ for (int i = 0; i < currMaxLevel - 1; i++) {
+ if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+ //level is numbered from 0
+ if (!sequence && i == currMaxLevel - 2) {
+ // do not merge current unseq file level to upper level and just
merge all of them to seq file
+ merge(isForceFullMerge, getTsFileList(true),
mergeResources.get(i), Long.MAX_VALUE);
+ } else {
+ for (TsFileResource mergeResource : mergeResources.get(i)) {
+ hotCompactionLogger.logFile(SOURCE_NAME,
mergeResource.getTsFile());
+ }
+ File newLevelFile =
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
+ i + 1);
+ hotCompactionLogger.logSequence(sequence);
+ hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
+ logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to
next level",
+ storageGroupName, i, mergeResources.get(i).size());
+
+ TsFileResource newResource = new TsFileResource(newLevelFile);
+ HotCompactionUtils
+ .merge(newResource, mergeResources.get(i), storageGroupName,
hotCompactionLogger,
+ new HashSet<>(), sequence);
+ writeLock();
+ try {
+ deleteLevelFiles(timePartition, mergeResources.get(i));
+ hotCompactionLogger.logMergeFinish();
+ if (sequence) {
+ sequenceTsFileResources.get(timePartition).get(i +
1).add(newResource);
+ } else {
+ unSequenceTsFileResources.get(timePartition).get(i +
1).add(newResource);
}
- File newLevelFile =
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
- i + 1);
- hotCompactionLogger.logSequence(sequence);
- hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
- logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to
next level vm",
- storageGroupName, i, mergeResources.get(i).size());
-
- TsFileResource newResource = new TsFileResource(newLevelFile);
- HotCompactionUtils
- .merge(newResource, mergeResources.get(i), storageGroupName,
hotCompactionLogger,
- new HashSet<>(), sequence);
- writeLock();
- try {
- deleteLevelFiles(timePartition, mergeResources.get(i));
- hotCompactionLogger.logMergeFinish();
- if (sequence) {
- sequenceTsFileResources.get(timePartition).get(i +
1).add(newResource);
- } else {
- unSequenceTsFileResources.get(timePartition).get(i +
1).add(newResource);
- }
- if (mergeResources.size() > i + 1) {
- mergeResources.get(i + 1).add(newResource);
- }
- } finally {
- writeUnlock();
+ if (mergeResources.size() > i + 1) {
+ mergeResources.get(i + 1).add(newResource);
}
+ } finally {
+ writeUnlock();
}
}
}