This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_performer_force_decoding_rel11
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e011a7aab3273f99be9787b1eaebef3937da9951
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Jul 26 13:20:14 2023 +0800

    add compression and encoding type check for FastCompactionPerformer
---
 .../fast/AlignedSeriesCompactionExecutor.java        | 20 ++++++++++++++++++++
 .../fast/NonAlignedSeriesCompactionExecutor.java     | 14 ++++++++++++++
 .../executor/fast/SeriesCompactionExecutor.java      | 12 ++++++++----
 .../executor/fast/element/ChunkMetadataElement.java  |  2 ++
 .../utils/executor/fast/element/PageElement.java     |  4 ++++
 5 files changed, 48 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index cd7adbb1adc..a6c11dd5a2a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -311,6 +311,26 @@ public class AlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor {
               .readMemChunk((ChunkMetadata) valueChunkMetadata));
     }
     chunkMetadataElement.valueChunks = valueChunks;
+    setForceDecoding(chunkMetadataElement);
+  }
+
+  void setForceDecoding(ChunkMetadataElement chunkMetadataElement) {
+    IMeasurementSchema timeChunkSchema = measurementSchemas.get(0);
+    if (timeChunkSchema.getCompressor()
+            != chunkMetadataElement.chunk.getHeader().getCompressionType()
+        || timeChunkSchema.getEncodingType()
+            != chunkMetadataElement.chunk.getHeader().getEncodingType()) {
+      chunkMetadataElement.needForceDecoding = true;
+      return;
+    }
+    for (int i = 1; i < measurementSchemas.size(); i++) {
+      ChunkHeader header = chunkMetadataElement.valueChunks.get(i - 
1).getHeader();
+      if (header.getCompressionType() != 
measurementSchemas.get(i).getCompressor()
+          || header.getEncodingType() != 
measurementSchemas.get(i).getEncodingType()) {
+        chunkMetadataElement.needForceDecoding = true;
+        return;
+      }
+    }
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
index 6efa8850673..a8dda6fca5e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
@@ -50,6 +52,10 @@ import java.util.Map;
 public class NonAlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor {
   private boolean hasStartMeasurement = false;
 
+  private CompressionType seriesCompressionType = null;
+
+  private TSEncoding seriesTSEncoding = null;
+
   // tsfile resource -> timeseries metadata <startOffset, endOffset>
   // used to get the chunk metadatas from tsfile directly according to 
timeseries metadata offset.
   private Map<TsFileResource, Pair<Long, Long>> timeseriesMetadataOffsetMap;
@@ -203,6 +209,14 @@ public class NonAlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor
               header.getCompressionType());
       compactionWriter.startMeasurement(Collections.singletonList(schema), 
subTaskId);
       hasStartMeasurement = true;
+      seriesCompressionType = header.getCompressionType();
+      seriesTSEncoding = header.getEncodingType();
+      chunkMetadataElement.needForceDecoding = false;
+    } else {
+      ChunkHeader header = chunkMetadataElement.chunk.getHeader();
+      chunkMetadataElement.needForceDecoding =
+          header.getCompressionType() != seriesCompressionType
+              || header.getEncodingType() != seriesTSEncoding;
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
index f80f9a45724..ccb8af61950 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java
@@ -141,7 +141,11 @@ public abstract class SeriesCompactionExecutor {
           firstChunkMetadataElement.chunkMetadata.getEndTime() >= 
nextChunkStartTime;
       boolean isModified = 
firstChunkMetadataElement.chunkMetadata.isModified();
 
-      if (isChunkOverlap || isModified) {
+      // read current chunk
+      readChunk(firstChunkMetadataElement);
+      boolean forceDecodingChunk = firstChunkMetadataElement.needForceDecoding;
+
+      if (isChunkOverlap || isModified || forceDecodingChunk) {
         // has overlap or modified chunk, then deserialize it
         summary.CHUNK_OVERLAP_OR_MODIFIED++;
         compactWithOverlapChunks(firstChunkMetadataElement);
@@ -161,7 +165,6 @@ public abstract class SeriesCompactionExecutor {
    */
   private void compactWithOverlapChunks(ChunkMetadataElement 
overlappedChunkMetadata)
       throws IOException, PageException, WriteProcessException, 
IllegalPathException {
-    readChunk(overlappedChunkMetadata);
     deserializeChunkIntoPageQueue(overlappedChunkMetadata);
 
     compactPages();
@@ -173,7 +176,6 @@ public abstract class SeriesCompactionExecutor {
    */
   private void compactWithNonOverlapChunk(ChunkMetadataElement 
chunkMetadataElement)
       throws IOException, PageException, WriteProcessException, 
IllegalPathException {
-    readChunk(chunkMetadataElement);
     boolean success;
     if (isAligned) {
       success =
@@ -229,7 +231,9 @@ public abstract class SeriesCompactionExecutor {
           firstPageElement.pageHeader.getEndTime() >= nextPageStartTime
               || firstPageElement.pageHeader.getEndTime() >= 
nextChunkStartTime;
 
-      if (isPageOverlap || modifiedStatus == ModifiedStatus.PARTIAL_DELETED) {
+      if (isPageOverlap
+          || modifiedStatus == ModifiedStatus.PARTIAL_DELETED
+          || firstPageElement.needForceDecoding) {
         // has overlap or modified pages, then deserialize it
         summary.PAGE_OVERLAP_OR_MODIFIED += 1;
         pointPriorityReader.addNewPage(firstPageElement);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
index a51fbaebdaf..c7fe9e65fe2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
@@ -38,6 +38,8 @@ public class ChunkMetadataElement {
 
   public List<Chunk> valueChunks;
 
+  public boolean needForceDecoding;
+
   public ChunkMetadataElement(
       IChunkMetadata chunkMetadata, long priority, boolean isLastChunk, 
FileElement fileElement) {
     this.chunkMetadata = chunkMetadata;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/PageElement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/PageElement.java
index 0e2954ce78d..3031c5c919d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/PageElement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/element/PageElement.java
@@ -53,6 +53,8 @@ public class PageElement {
 
   public ChunkMetadataElement chunkMetadataElement;
 
+  public boolean needForceDecoding;
+
   public PageElement(
       PageHeader pageHeader,
       ByteBuffer pageData,
@@ -67,6 +69,7 @@ public class PageElement {
     this.startTime = pageHeader.getStartTime();
     this.chunkMetadataElement = chunkMetadataElement;
     this.isLastPage = isLastPage;
+    this.needForceDecoding = chunkMetadataElement.needForceDecoding;
   }
 
   public PageElement(
@@ -87,6 +90,7 @@ public class PageElement {
     this.startTime = pageHeader.getStartTime();
     this.chunkMetadataElement = chunkMetadataElement;
     this.isLastPage = isLastPage;
+    this.needForceDecoding = chunkMetadataElement.needForceDecoding;
   }
 
   public void deserializePage() throws IOException {

Reply via email to