This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch fast_performer_force_decoding_rel11 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e011a7aab3273f99be9787b1eaebef3937da9951 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Jul 26 13:20:14 2023 +0800 add compression and encoding type check for FastCompactionPerformer --- .../fast/AlignedSeriesCompactionExecutor.java | 20 ++++++++++++++++++++ .../fast/NonAlignedSeriesCompactionExecutor.java | 14 ++++++++++++++ .../executor/fast/SeriesCompactionExecutor.java | 12 ++++++++---- .../executor/fast/element/ChunkMetadataElement.java | 2 ++ .../utils/executor/fast/element/PageElement.java | 4 ++++ 5 files changed, 48 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java index cd7adbb1adc..a6c11dd5a2a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java @@ -311,6 +311,26 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { .readMemChunk((ChunkMetadata) valueChunkMetadata)); } chunkMetadataElement.valueChunks = valueChunks; + setForceDecoding(chunkMetadataElement); + } + + void setForceDecoding(ChunkMetadataElement chunkMetadataElement) { + IMeasurementSchema timeChunkSchema = measurementSchemas.get(0); + if (timeChunkSchema.getCompressor() + != chunkMetadataElement.chunk.getHeader().getCompressionType() + || timeChunkSchema.getEncodingType() + != chunkMetadataElement.chunk.getHeader().getEncodingType()) { + chunkMetadataElement.needForceDecoding = true; + return; + } + for (int i = 1; i < measurementSchemas.size(); i++) { + ChunkHeader header = chunkMetadataElement.valueChunks.get(i - 1).getHeader(); + if (header.getCompressionType() != measurementSchemas.get(i).getCompressor() + || header.getEncodingType() != measurementSchemas.get(i).getEncodingType()) { + chunkMetadataElement.needForceDecoding = true; + return; + } + } } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java index 6efa8850673..a8dda6fca5e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java @@ -35,6 +35,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; @@ -50,6 +52,10 @@ import java.util.Map; public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { private boolean hasStartMeasurement = false; + private CompressionType seriesCompressionType = null; + + private TSEncoding seriesTSEncoding = null; + // tsfile resource -> timeseries metadata <startOffset, endOffset> // used to get the chunk metadatas from tsfile directly according to timeseries metadata offset. private Map<TsFileResource, Pair<Long, Long>> timeseriesMetadataOffsetMap; @@ -203,6 +209,14 @@ public class NonAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor header.getCompressionType()); compactionWriter.startMeasurement(Collections.singletonList(schema), subTaskId); hasStartMeasurement = true; + seriesCompressionType = header.getCompressionType(); + seriesTSEncoding = header.getEncodingType(); + chunkMetadataElement.needForceDecoding = false; + } else { + ChunkHeader header = chunkMetadataElement.chunk.getHeader(); + chunkMetadataElement.needForceDecoding = + header.getCompressionType() != seriesCompressionType + || header.getEncodingType() != seriesTSEncoding; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java index f80f9a45724..ccb8af61950 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java @@ -141,7 +141,11 @@ public abstract class SeriesCompactionExecutor { firstChunkMetadataElement.chunkMetadata.getEndTime() >= nextChunkStartTime; boolean isModified = firstChunkMetadataElement.chunkMetadata.isModified(); - if (isChunkOverlap || isModified) { + // read current chunk + readChunk(firstChunkMetadataElement); + boolean forceDecodingChunk = firstChunkMetadataElement.needForceDecoding; + + if (isChunkOverlap || isModified || forceDecodingChunk) { // has overlap or modified chunk, then deserialize it summary.CHUNK_OVERLAP_OR_MODIFIED++; compactWithOverlapChunks(firstChunkMetadataElement); @@ -161,7 +165,6 @@ public abstract class SeriesCompactionExecutor { */ private void compactWithOverlapChunks(ChunkMetadataElement overlappedChunkMetadata) throws IOException, PageException, WriteProcessException, IllegalPathException { - readChunk(overlappedChunkMetadata); deserializeChunkIntoPageQueue(overlappedChunkMetadata); compactPages(); @@ -173,7 +176,6 @@ public abstract class SeriesCompactionExecutor { */ private void compactWithNonOverlapChunk(ChunkMetadataElement chunkMetadataElement) throws IOException, PageException, WriteProcessException, IllegalPathException { - readChunk(chunkMetadataElement); boolean success; if (isAligned) { success = @@ -229,7 +231,9 @@ public abstract class SeriesCompactionExecutor { firstPageElement.pageHeader.getEndTime() >= nextPageStartTime || firstPageElement.pageHeader.getEndTime() >= nextChunkStartTime; - if (isPageOverlap || modifiedStatus == ModifiedStatus.PARTIAL_DELETED) { + if (isPageOverlap + || modifiedStatus == ModifiedStatus.PARTIAL_DELETED + || firstPageElement.needForceDecoding) { // has overlap or modified pages, then deserialize it summary.PAGE_OVERLAP_OR_MODIFIED += 1; pointPriorityReader.addNewPage(firstPageElement); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java index a51fbaebdaf..c7fe9e65fe2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java @@ -38,6 +38,8 @@ public class ChunkMetadataElement { public List<Chunk> valueChunks; + public boolean needForceDecoding; + public ChunkMetadataElement( IChunkMetadata chunkMetadata, long priority, boolean isLastChunk, FileElement fileElement) { this.chunkMetadata = chunkMetadata; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/PageElement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/PageElement.java index 0e2954ce78d..3031c5c919d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/PageElement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/PageElement.java @@ -53,6 +53,8 @@ public class PageElement { public ChunkMetadataElement chunkMetadataElement; + public boolean needForceDecoding; + public PageElement( PageHeader pageHeader, ByteBuffer pageData, @@ -67,6 +69,7 @@ public class PageElement { this.startTime = pageHeader.getStartTime(); this.chunkMetadataElement = chunkMetadataElement; this.isLastPage = isLastPage; + this.needForceDecoding = chunkMetadataElement.needForceDecoding; } public PageElement( @@ -87,6 +90,7 @@ public class PageElement { this.startTime = pageHeader.getStartTime(); this.chunkMetadataElement = chunkMetadataElement; this.isLastPage = isLastPage; + this.needForceDecoding = chunkMetadataElement.needForceDecoding; } public void deserializePage() throws IOException {
