This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch speed_up_readChunkPerfomer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 245fca085fe80f2313775febbea79ad5792c029e Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Aug 8 16:14:09 2023 +0800 optimize the ReadChunkPerformer for inner compaction by getting rid of new PartialPath() --- .../performer/impl/ReadChunkCompactionPerformer.java | 5 ++--- .../execute/utils/MultiTsFileDeviceIterator.java | 6 ++++-- .../readchunk/SingleSeriesCompactionExecutor.java | 19 ++++++++++--------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index 78c1d0bc965..dbf12840029 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl; import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionTargetFileCountExceededException; @@ -164,7 +163,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { while (seriesIterator.hasNextSeries()) { checkThreadInterrupted(); // TODO: we can provide a configuration item to enable concurrent between each series - PartialPath p = new PartialPath(device, seriesIterator.nextSeries()); + String measurementUID = seriesIterator.nextSeries(); // TODO: seriesIterator needs to be refactor. // This statement must be called before next hasNextSeries() called, or it may be trapped in a // dead-loop. @@ -172,7 +171,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { seriesIterator.getMetadataListForCurrentSeries(); SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries = new SingleSeriesCompactionExecutor( - p, readerAndChunkMetadataList, writer, targetResource, summary); + device, measurementUID, readerAndChunkMetadataList, writer, targetResource, summary); compactionExecutorOfCurrentTimeSeries.execute(); } writer.endChunkGroup(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 09fa3a85bcd..038df0a48fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -543,10 +543,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { if (currentCompactingSeries == null) { return new LinkedList<>(); } - + PartialPath path = null; LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataForThisSeries = new LinkedList<>(); - PartialPath path = new PartialPath(device, currentCompactingSeries); for (TsFileResource resource : tsFileResourcesSortedByAsc) { TsFileSequenceReader reader = readerMap.get(resource); @@ -563,6 +562,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { resource, r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications())); LinkedList<Modification> modificationForCurrentSeries = new LinkedList<>(); + if (modificationsInThisResource.size() > 0 && path == null) { + path = new PartialPath(device, currentCompactingSeries); + } // collect the modifications for current series for (Modification modification : modificationsInThisResource) { if (modification.getPath().matchFullPath(path)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index c3ddb229c1b..65ab953a9c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; @@ -45,7 +44,7 @@ import java.util.List; @SuppressWarnings("squid:S1319") public class SingleSeriesCompactionExecutor { private String device; - private PartialPath series; + private String measurementUID; private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList; private CompactionTsFileWriter fileWriter; private TsFileResource targetResource; @@ -70,13 +69,14 @@ public class SingleSeriesCompactionExecutor { IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction(); public SingleSeriesCompactionExecutor( - PartialPath series, + String device, + String measurementUID, IMeasurementSchema measurementSchema, LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList, CompactionTsFileWriter fileWriter, TsFileResource targetResource) { - this.device = series.getDevice(); - this.series = series; + this.device = device; + this.measurementUID = measurementUID; this.readerAndChunkMetadataList = readerAndChunkMetadataList; this.fileWriter = fileWriter; this.schema = measurementSchema; @@ -88,13 +88,14 @@ public class SingleSeriesCompactionExecutor { } public SingleSeriesCompactionExecutor( - PartialPath series, + String device, + String measurementUID, LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList, CompactionTsFileWriter fileWriter, TsFileResource targetResource, CompactionTaskSummary summary) { - this.device = series.getDevice(); - this.series = series; + this.device = device; + this.measurementUID = measurementUID; this.readerAndChunkMetadataList = readerAndChunkMetadataList; this.fileWriter = fileWriter; this.schema = null; @@ -162,7 +163,7 @@ public class SingleSeriesCompactionExecutor { ChunkHeader chunkHeader = chunk.getHeader(); this.schema = new MeasurementSchema( - series.getMeasurement(), + measurementUID, chunkHeader.getDataType(), chunkHeader.getEncodingType(), chunkHeader.getCompressionType());
