This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch fast_performer_force_decoding_rel12 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0c3723b632817b118d3fc8c9add66e153137d76b Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Jul 26 13:20:14 2023 +0800 add compression and encoding type check for FastCompactionPerformer --- .../fast/AlignedSeriesCompactionExecutor.java | 20 ++++++++++++++++++++ .../fast/NonAlignedSeriesCompactionExecutor.java | 14 ++++++++++++++ .../executor/fast/SeriesCompactionExecutor.java | 12 ++++++++---- .../executor/fast/element/ChunkMetadataElement.java | 2 ++ .../utils/executor/fast/element/PageElement.java | 4 ++++ 5 files changed, 48 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java index 8dd65f28505..1c03d23f092 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java @@ -345,6 +345,26 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { .readMemChunk((ChunkMetadata) valueChunkMetadata)); } chunkMetadataElement.valueChunks = valueChunks; + setForceDecoding(chunkMetadataElement); + } + + void setForceDecoding(ChunkMetadataElement chunkMetadataElement) { + IMeasurementSchema timeChunkSchema = measurementSchemas.get(0); + if (timeChunkSchema.getCompressor() + != chunkMetadataElement.chunk.getHeader().getCompressionType() + || timeChunkSchema.getEncodingType() + != chunkMetadataElement.chunk.getHeader().getEncodingType()) { + chunkMetadataElement.needForceDecoding = true; + return; + } + for (int i = 1; i < measurementSchemas.size(); i++) { + ChunkHeader header = chunkMetadataElement.valueChunks.get(i - 1).getHeader(); + if (header.getCompressionType() != measurementSchemas.get(i).getCompressor() + || header.getEncodingType() != measurementSchemas.get(i).getEncodingType()) { + chunkMetadataElement.needForceDecoding = true; + return; + } + } } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java index 1a075fef39c..3cf146b61f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java @@ -36,6 +36,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; @@ -51,6 +53,10 @@ import java.util.Map; public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { private boolean hasStartMeasurement = false; + private CompressionType seriesCompressionType = null; + + private TSEncoding seriesTSEncoding = null; + // tsfile resource -> timeseries metadata <startOffset, endOffset> // used to get the chunk metadatas from tsfile directly according to timeseries metadata offset. private Map<TsFileResource, Pair<Long, Long>> timeseriesMetadataOffsetMap; @@ -211,6 +217,14 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor header.getCompressionType()); compactionWriter.startMeasurement(Collections.singletonList(schema), subTaskId); hasStartMeasurement = true; + seriesCompressionType = header.getCompressionType(); + seriesTSEncoding = header.getEncodingType(); + chunkMetadataElement.needForceDecoding = false; + } else { + ChunkHeader header = chunkMetadataElement.chunk.getHeader(); + chunkMetadataElement.needForceDecoding = + header.getCompressionType() != seriesCompressionType + || header.getEncodingType() != seriesTSEncoding; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java index 9cfdabc7f55..1290287c0d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java @@ -144,7 +144,11 @@ public abstract class SeriesCompactionExecutor { firstChunkMetadataElement.chunkMetadata.getEndTime() >= nextChunkStartTime; boolean isModified = firstChunkMetadataElement.chunkMetadata.isModified(); - if (isChunkOverlap || isModified) { + // read current chunk + readChunk(firstChunkMetadataElement); + boolean forceDecodingChunk = firstChunkMetadataElement.needForceDecoding; + + if (isChunkOverlap || isModified || forceDecodingChunk) { // has overlap or modified chunk, then deserialize it summary.chunkOverlapOrModified++; compactWithOverlapChunks(firstChunkMetadataElement); @@ -164,7 +168,6 @@ public abstract class SeriesCompactionExecutor { */ private void compactWithOverlapChunks(ChunkMetadataElement overlappedChunkMetadata) throws IOException, PageException, WriteProcessException, IllegalPathException { - readChunk(overlappedChunkMetadata); deserializeChunkIntoPageQueue(overlappedChunkMetadata); compactPages(); @@ -176,7 +179,6 @@ public abstract class SeriesCompactionExecutor { */ private void compactWithNonOverlapChunk(ChunkMetadataElement chunkMetadataElement) throws IOException, PageException, WriteProcessException, IllegalPathException { - readChunk(chunkMetadataElement); boolean success; if (isAligned) { success = @@ -237,7 +239,9 @@ public abstract class SeriesCompactionExecutor { firstPageElement.pageHeader.getEndTime() >= nextPageStartTime || firstPageElement.pageHeader.getEndTime() >= nextChunkStartTime; - if (isPageOverlap || modifiedStatus == ModifiedStatus.PARTIAL_DELETED) { + if (isPageOverlap + || modifiedStatus == ModifiedStatus.PARTIAL_DELETED + || firstPageElement.needForceDecoding) { // has overlap or modified pages, then deserialize it summary.pageOverlapOrModified += 1; pointPriorityReader.addNewPage(firstPageElement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java index bcebf074e9f..fafdef50f68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java @@ -40,6 +40,8 @@ public class ChunkMetadataElement { public List<Chunk> valueChunks; + public boolean needForceDecoding; + public ChunkMetadataElement( IChunkMetadata chunkMetadata, long priority, boolean isLastChunk, FileElement fileElement) { this.chunkMetadata = chunkMetadata; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java index c86744ddec5..0977c9d6a44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/PageElement.java @@ -56,6 +56,8 @@ public class PageElement { public ChunkMetadataElement chunkMetadataElement; + public boolean needForceDecoding; + public PageElement( PageHeader pageHeader, ByteBuffer pageData, @@ -70,6 +72,7 @@ public class PageElement { this.startTime = pageHeader.getStartTime(); this.chunkMetadataElement = chunkMetadataElement; this.isLastPage = isLastPage; + this.needForceDecoding = chunkMetadataElement.needForceDecoding; } @SuppressWarnings("squid:S107") @@ -91,6 +94,7 @@ public class PageElement { this.startTime = pageHeader.getStartTime(); this.chunkMetadataElement = chunkMetadataElement; this.isLastPage = isLastPage; + this.needForceDecoding = chunkMetadataElement.needForceDecoding; } public void deserializePage() throws IOException {
