This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 7eb74c7520d [To rel/1.2] Fix compaction force decoding check (#10802)
7eb74c7520d is described below

commit 7eb74c7520dce4c68a707ba3de2ae5ff10499919
Author: shuwenwei <[email protected]>
AuthorDate: Tue Aug 8 14:20:48 2023 +0800

    [To rel/1.2] Fix compaction force decoding check (#10802)
---
 .../fast/AlignedSeriesCompactionExecutor.java      | 28 ++++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index 1c03d23f092..6104bf3aadf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -59,6 +60,8 @@ public class AlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor {
   private final Map<String, Map<TsFileResource, Pair<Long, Long>>> 
timeseriesMetadataOffsetMap;
 
   private final List<IMeasurementSchema> measurementSchemas;
+  private final IMeasurementSchema timeColumnMeasurementSchema;
+  private final Map<String, IMeasurementSchema> measurementSchemaMap;
 
   @SuppressWarnings("squid:S107")
   public AlignedSeriesCompactionExecutor(
@@ -75,6 +78,10 @@ public class AlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor {
         compactionWriter, readerCacheMap, modificationCacheMap, deviceId, 
true, subTaskId, summary);
     this.timeseriesMetadataOffsetMap = timeseriesMetadataOffsetMap;
     this.measurementSchemas = measurementSchemas;
+    this.timeColumnMeasurementSchema = measurementSchemas.get(0);
+    this.measurementSchemaMap = new HashMap<>();
+    this.measurementSchemas.forEach(
+        schema -> measurementSchemaMap.put(schema.getMeasurementId(), schema));
     // get source files which are sorted by the startTime of current device 
from old to new,
     // files that do not contain the current device have been filtered out as 
well.
     sortedSourceFiles.forEach(x -> fileList.add(new FileElement(x)));
@@ -349,18 +356,25 @@ public class AlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor {
   }
 
   void setForceDecoding(ChunkMetadataElement chunkMetadataElement) {
-    IMeasurementSchema timeChunkSchema = measurementSchemas.get(0);
-    if (timeChunkSchema.getCompressor()
+    if (timeColumnMeasurementSchema.getCompressor()
             != chunkMetadataElement.chunk.getHeader().getCompressionType()
-        || timeChunkSchema.getEncodingType()
+        || timeColumnMeasurementSchema.getEncodingType()
             != chunkMetadataElement.chunk.getHeader().getEncodingType()) {
       chunkMetadataElement.needForceDecoding = true;
       return;
     }
-    for (int i = 1; i < measurementSchemas.size(); i++) {
-      ChunkHeader header = chunkMetadataElement.valueChunks.get(i - 
1).getHeader();
-      if (header.getCompressionType() != 
measurementSchemas.get(i).getCompressor()
-          || header.getEncodingType() != 
measurementSchemas.get(i).getEncodingType()) {
+    for (Chunk chunk : chunkMetadataElement.valueChunks) {
+      if (chunk == null) {
+        continue;
+      }
+      ChunkHeader header = chunk.getHeader();
+      String measurementId = header.getMeasurementID();
+      IMeasurementSchema measurementSchema = 
measurementSchemaMap.get(measurementId);
+      if (measurementSchema == null) {
+        continue;
+      }
+      if (measurementSchema.getCompressor() != header.getCompressionType()
+          || measurementSchema.getEncodingType() != header.getEncodingType()) {
         chunkMetadataElement.needForceDecoding = true;
         return;
       }

Reply via email to