This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new f1cbc133986 [to rc/1.3.3] Optimize column batch selection when there 
are many value columns (#13457)
f1cbc133986 is described below

commit f1cbc133986c0f5fdde47b5e5dba00c2e32665cf
Author: shuwenwei <[email protected]>
AuthorDate: Tue Sep 10 15:00:57 2024 +0800

    [to rc/1.3.3] Optimize column batch selection when there are many value 
columns (#13457)
    
    * optimize column batch selection when there are many value columns
    
    * fix bug
    
    * use array dequeue
---
 ...BatchedFastAlignedSeriesCompactionExecutor.java |  41 +++---
 ...edReadChunkAlignedSeriesCompactionExecutor.java |  42 +++---
 .../utils/AlignedSeriesBatchCompactionUtils.java   | 161 +++++++++++++--------
 .../ReadChunkAlignedSeriesCompactionExecutor.java  |  42 +++---
 .../compaction/utils/BatchCompactionUtilsTest.java |  12 +-
 5 files changed, 162 insertions(+), 136 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java
index 8047c00ba95..9a31990b11a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java
@@ -56,18 +56,15 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 public class BatchedFastAlignedSeriesCompactionExecutor
     extends FastAlignedSeriesCompactionExecutor {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private final Set<String> compactedMeasurements;
+  private AlignedSeriesBatchCompactionUtils.BatchColumnSelection 
batchColumnSelection;
   private final IMeasurementSchema timeSchema;
   private final List<IMeasurementSchema> valueMeasurementSchemas;
   private final List<TsFileResource> sortedSourceFiles;
@@ -100,7 +97,9 @@ public class BatchedFastAlignedSeriesCompactionExecutor
         summary);
     timeSchema = measurementSchemas.remove(0);
     valueMeasurementSchemas = measurementSchemas;
-    this.compactedMeasurements = new HashSet<>();
+    this.batchColumnSelection =
+        new AlignedSeriesBatchCompactionUtils.BatchColumnSelection(
+            valueMeasurementSchemas, batchSize);
     this.sortedSourceFiles = sortedSourceFiles;
     this.alignedChunkMetadataCache = new HashMap<>();
     this.batchCompactionPlan = new BatchCompactionPlan();
@@ -122,13 +121,9 @@ public class BatchedFastAlignedSeriesCompactionExecutor
 
     List<AlignedChunkMetadata> filteredAlignedChunkMetadataList = new 
ArrayList<>();
     for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) 
{
-      List<String> selectedMeasurements =
-          selectedValueMeasurementSchemas.stream()
-              .map(IMeasurementSchema::getMeasurementId)
-              .collect(Collectors.toList());
       filteredAlignedChunkMetadataList.add(
-          AlignedSeriesBatchCompactionUtils.filterAlignedChunkMetadata(
-              alignedChunkMetadata, selectedMeasurements));
+          AlignedSeriesBatchCompactionUtils.filterAlignedChunkMetadataByIndex(
+              alignedChunkMetadata, 
batchColumnSelection.getSelectedColumnIndexList()));
     }
     return filteredAlignedChunkMetadataList;
   }
@@ -145,13 +140,16 @@ public class BatchedFastAlignedSeriesCompactionExecutor
 
   private void compactFirstBatch()
       throws PageException, IllegalPathException, IOException, 
WriteProcessException {
-    List<IMeasurementSchema> firstGroupMeasurements =
-        AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
-            valueMeasurementSchemas, compactedMeasurements, batchSize);
+    if (!batchColumnSelection.hasNext()) {
+      return;
+    }
+    batchColumnSelection.next();
+
     List<IMeasurementSchema> currentBatchMeasurementSchemas =
-        new ArrayList<>(firstGroupMeasurements.size() + 1);
+        new 
ArrayList<>(batchColumnSelection.getCurrentSelectedColumnSchemaList().size() + 
1);
     currentBatchMeasurementSchemas.add(timeSchema);
-    currentBatchMeasurementSchemas.addAll(firstGroupMeasurements);
+    currentBatchMeasurementSchemas.addAll(
+        batchColumnSelection.getCurrentSelectedColumnSchemaList());
 
     FirstBatchFastAlignedSeriesCompactionExecutor executor =
         new FirstBatchFastAlignedSeriesCompactionExecutor(
@@ -173,14 +171,13 @@ public class BatchedFastAlignedSeriesCompactionExecutor
 
   private void compactLeftBatches()
       throws PageException, IllegalPathException, IOException, 
WriteProcessException {
-    while (compactedMeasurements.size() < valueMeasurementSchemas.size()) {
-      List<IMeasurementSchema> selectedValueColumnGroup =
-          AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
-              valueMeasurementSchemas, compactedMeasurements, batchSize);
+    while (batchColumnSelection.hasNext()) {
+      batchColumnSelection.next();
       List<IMeasurementSchema> currentBatchMeasurementSchemas =
-          new ArrayList<>(selectedValueColumnGroup.size() + 1);
+          new 
ArrayList<>(batchColumnSelection.getCurrentSelectedColumnSchemaList().size() + 
1);
       currentBatchMeasurementSchemas.add(timeSchema);
-      currentBatchMeasurementSchemas.addAll(selectedValueColumnGroup);
+      currentBatchMeasurementSchemas.addAll(
+          batchColumnSelection.getCurrentSelectedColumnSchemaList());
       FollowingBatchFastAlignedSeriesCompactionExecutor executor =
           new FollowingBatchFastAlignedSeriesCompactionExecutor(
               compactionWriter,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java
index fd84b37dc6c..a8245d62632 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedReadChunkAlignedSeriesCompactionExecutor.java
@@ -53,20 +53,17 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 public class BatchedReadChunkAlignedSeriesCompactionExecutor
     extends ReadChunkAlignedSeriesCompactionExecutor {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private final Set<String> compactedMeasurements;
   private final BatchCompactionPlan batchCompactionPlan = new 
BatchCompactionPlan();
   private final int batchSize =
       
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
+  private final AlignedSeriesBatchCompactionUtils.BatchColumnSelection 
batchColumnSelection;
   private final LinkedList<Pair<TsFileSequenceReader, 
List<AlignedChunkMetadata>>>
       originReaderAndChunkMetadataList;
 
@@ -79,7 +76,8 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
       throws IOException {
     super(device, targetResource, readerAndChunkMetadataList, writer, summary);
     this.originReaderAndChunkMetadataList = readerAndChunkMetadataList;
-    compactedMeasurements = new HashSet<>();
+    this.batchColumnSelection =
+        new AlignedSeriesBatchCompactionUtils.BatchColumnSelection(schemaList, 
batchSize);
   }
 
   @Override
@@ -97,17 +95,16 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
   }
 
   private void compactFirstBatch() throws IOException, PageException {
-    List<IMeasurementSchema> firstBatchMeasurements =
-        AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
-            schemaList, compactedMeasurements, batchSize);
+    if (!batchColumnSelection.hasNext()) {
+      return;
+    }
+    batchColumnSelection.next();
 
     LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
         batchedReaderAndChunkMetadataList =
             filterAlignedChunkMetadataList(
-                readerAndChunkMetadataList,
-                firstBatchMeasurements.stream()
-                    .map(IMeasurementSchema::getMeasurementId)
-                    .collect(Collectors.toList()));
+                readerAndChunkMetadataList, 
batchColumnSelection.getSelectedColumnIndexList());
+
     FirstBatchedReadChunkAlignedSeriesCompactionExecutor executor =
         new FirstBatchedReadChunkAlignedSeriesCompactionExecutor(
             device,
@@ -116,7 +113,7 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
             writer,
             summary,
             timeSchema,
-            firstBatchMeasurements);
+            batchColumnSelection.getCurrentSelectedColumnSchemaList());
     executor.execute();
     LOGGER.debug(
         "[Batch Compaction] current device is {}, first batch compacted time 
chunk is {}",
@@ -125,17 +122,12 @@ public class 
BatchedReadChunkAlignedSeriesCompactionExecutor
   }
 
   private void compactLeftBatches() throws PageException, IOException {
-    while (compactedMeasurements.size() < schemaList.size()) {
-      List<IMeasurementSchema> selectedColumnBatch =
-          AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
-              schemaList, compactedMeasurements, batchSize);
+    while (batchColumnSelection.hasNext()) {
+      batchColumnSelection.next();
       LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
           groupReaderAndChunkMetadataList =
               filterAlignedChunkMetadataList(
-                  readerAndChunkMetadataList,
-                  selectedColumnBatch.stream()
-                      .map(IMeasurementSchema::getMeasurementId)
-                      .collect(Collectors.toList()));
+                  readerAndChunkMetadataList, 
batchColumnSelection.getSelectedColumnIndexList());
       FollowingBatchedReadChunkAlignedSeriesGroupCompactionExecutor executor =
           new FollowingBatchedReadChunkAlignedSeriesGroupCompactionExecutor(
               device,
@@ -144,7 +136,7 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
               writer,
               summary,
               timeSchema,
-              selectedColumnBatch);
+              batchColumnSelection.getCurrentSelectedColumnSchemaList());
       executor.execute();
     }
   }
@@ -152,7 +144,7 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
   private LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
       filterAlignedChunkMetadataList(
           List<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> 
readerAndChunkMetadataList,
-          List<String> selectedMeasurements) {
+          List<Integer> selectedMeasurementIndexs) {
     LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
         groupReaderAndChunkMetadataList = new LinkedList<>();
     for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair : 
readerAndChunkMetadataList) {
@@ -160,8 +152,8 @@ public class BatchedReadChunkAlignedSeriesCompactionExecutor
       List<AlignedChunkMetadata> selectedColumnAlignedChunkMetadataList = new 
LinkedList<>();
       for (AlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
         selectedColumnAlignedChunkMetadataList.add(
-            AlignedSeriesBatchCompactionUtils.filterAlignedChunkMetadata(
-                alignedChunkMetadata, selectedMeasurements));
+            
AlignedSeriesBatchCompactionUtils.filterAlignedChunkMetadataByIndex(
+                alignedChunkMetadata, selectedMeasurementIndexs));
       }
       groupReaderAndChunkMetadataList.add(
           new Pair<>(pair.getLeft(), selectedColumnAlignedChunkMetadataList));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java
index c1e83101a12..79e7b72a8ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/AlignedSeriesBatchCompactionUtils.java
@@ -21,7 +21,6 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.ex
 
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
 
-import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.read.TsFileSequenceReader;
@@ -29,61 +28,19 @@ import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.NoSuchElementException;
+import java.util.Queue;
 
 public class AlignedSeriesBatchCompactionUtils {
 
   private AlignedSeriesBatchCompactionUtils() {}
 
-  public static List<IMeasurementSchema> selectColumnBatchToCompact(
-      List<IMeasurementSchema> schemaList, Set<String> compactedMeasurements, 
int batchSize) {
-    // TODO: select batch by allocated memory and chunk size to perform more 
strict memory control
-    List<IMeasurementSchema> selectedColumnBatch = new ArrayList<>(batchSize);
-    for (IMeasurementSchema schema : schemaList) {
-      if (!isLargeDataType(schema.getType())) {
-        continue;
-      }
-      if (compactedMeasurements.contains(schema.getMeasurementId())) {
-        continue;
-      }
-      compactedMeasurements.add(schema.getMeasurementId());
-      selectedColumnBatch.add(schema);
-      if (selectedColumnBatch.size() >= batchSize) {
-        return selectedColumnBatch;
-      }
-      if (compactedMeasurements.size() == schemaList.size()) {
-        return selectedColumnBatch;
-      }
-    }
-    for (IMeasurementSchema schema : schemaList) {
-      if (compactedMeasurements.contains(schema.getMeasurementId())) {
-        continue;
-      }
-      selectedColumnBatch.add(schema);
-      compactedMeasurements.add(schema.getMeasurementId());
-      if (selectedColumnBatch.size() >= batchSize) {
-        break;
-      }
-      if (compactedMeasurements.size() == schemaList.size()) {
-        break;
-      }
-    }
-    return selectedColumnBatch;
-  }
-
-  private static boolean isLargeDataType(TSDataType dataType) {
-    return dataType.equals(TSDataType.BLOB)
-        || dataType.equals(TSDataType.TEXT)
-        || dataType.equals(TSDataType.STRING);
-  }
-
   public static void markAlignedChunkHasDeletion(
       LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
           readerAndChunkMetadataList) {
@@ -106,27 +63,45 @@ public class AlignedSeriesBatchCompactionUtils {
     }
   }
 
-  public static AlignedChunkMetadata filterAlignedChunkMetadata(
-      AlignedChunkMetadata alignedChunkMetadata, List<String> 
selectedMeasurements) {
-    List<IChunkMetadata> valueChunkMetadataList =
-        Arrays.asList(new IChunkMetadata[selectedMeasurements.size()]);
-
-    Map<String, Integer> measurementIndex = new HashMap<>();
+  public static AlignedChunkMetadata filterAlignedChunkMetadataByIndex(
+      AlignedChunkMetadata alignedChunkMetadata, List<Integer> 
selectedMeasurements) {
+    IChunkMetadata[] valueChunkMetadataArr = new 
IChunkMetadata[selectedMeasurements.size()];
+    List<IChunkMetadata> originValueChunkMetadataList =
+        alignedChunkMetadata.getValueChunkMetadataList();
     for (int i = 0; i < selectedMeasurements.size(); i++) {
-      measurementIndex.put(selectedMeasurements.get(i), i);
+      int columnIndex = selectedMeasurements.get(i);
+      valueChunkMetadataArr[i] = originValueChunkMetadataList.get(columnIndex);
     }
+    return new AlignedChunkMetadata(
+        alignedChunkMetadata.getTimeChunkMetadata(), 
Arrays.asList(valueChunkMetadataArr));
+  }
 
-    for (IChunkMetadata chunkMetadata : 
alignedChunkMetadata.getValueChunkMetadataList()) {
-      if (chunkMetadata == null) {
-        continue;
+  public static AlignedChunkMetadata fillAlignedChunkMetadataBySchemaList(
+      AlignedChunkMetadata originAlignedChunkMetadata, 
List<IMeasurementSchema> schemaList) {
+    List<IChunkMetadata> originValueChunkMetadataList =
+        originAlignedChunkMetadata.getValueChunkMetadataList();
+    if (originValueChunkMetadataList.size() == schemaList.size()) {
+      return originAlignedChunkMetadata;
+    }
+    IChunkMetadata[] newValueChunkMetadataArr = new 
IChunkMetadata[schemaList.size()];
+    int currentValueChunkMetadataIndex = 0;
+    for (int i = 0; i < schemaList.size(); i++) {
+      IMeasurementSchema currentSchema = schemaList.get(i);
+      if (currentValueChunkMetadataIndex >= 
originValueChunkMetadataList.size()) {
+        break;
       }
-      Integer idx = measurementIndex.get(chunkMetadata.getMeasurementUid());
-      if (idx != null) {
-        valueChunkMetadataList.set(idx, chunkMetadata);
+      IChunkMetadata currentValueChunkMetadata =
+          originValueChunkMetadataList.get(currentValueChunkMetadataIndex);
+      if (currentValueChunkMetadata != null
+          && currentSchema
+              .getMeasurementId()
+              .equals(currentValueChunkMetadata.getMeasurementUid())) {
+        newValueChunkMetadataArr[i] = currentValueChunkMetadata;
+        currentValueChunkMetadataIndex++;
       }
     }
     return new AlignedChunkMetadata(
-        alignedChunkMetadata.getTimeChunkMetadata(), valueChunkMetadataList);
+        originAlignedChunkMetadata.getTimeChunkMetadata(), 
Arrays.asList(newValueChunkMetadataArr));
   }
 
   public static ModifiedStatus calculateAlignedPageModifiedStatus(
@@ -173,4 +148,70 @@ public class AlignedSeriesBatchCompactionUtils {
     }
     return status;
   }
+
+  public static class BatchColumnSelection {
+    private final List<IMeasurementSchema> schemaList;
+    private final Queue<Integer> normalTypeSortedColumnIndexList;
+    private final Queue<Integer> binaryTypeSortedColumnIndexList;
+    private final int batchSize;
+    private int selectedColumnNum;
+
+    private List<Integer> columnIndexList;
+    private List<IMeasurementSchema> currentSelectedColumnSchemaList;
+
+    public BatchColumnSelection(List<IMeasurementSchema> valueSchemas, int 
batchSize) {
+      this.schemaList = valueSchemas;
+      this.normalTypeSortedColumnIndexList = new 
ArrayDeque<>(valueSchemas.size());
+      this.binaryTypeSortedColumnIndexList = new ArrayDeque<>();
+      for (int i = 0; i < valueSchemas.size(); i++) {
+        IMeasurementSchema schema = valueSchemas.get(i);
+        if (schema.getType().isBinary()) {
+          this.binaryTypeSortedColumnIndexList.add(i);
+        } else {
+          this.normalTypeSortedColumnIndexList.add(i);
+        }
+      }
+      this.batchSize = batchSize;
+      this.selectedColumnNum = 0;
+    }
+
+    public boolean hasNext() {
+      return selectedColumnNum < schemaList.size();
+    }
+
+    public void next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      selectColumnBatchToCompact();
+    }
+
+    public List<Integer> getSelectedColumnIndexList() {
+      return this.columnIndexList;
+    }
+
+    public List<IMeasurementSchema> getCurrentSelectedColumnSchemaList() {
+      return this.currentSelectedColumnSchemaList;
+    }
+
+    private void selectColumnBatchToCompact() {
+      // TODO: select batch by allocated memory and chunk size to perform more 
strict memory control
+      this.columnIndexList = new ArrayList<>(batchSize);
+      this.currentSelectedColumnSchemaList = new ArrayList<>(batchSize);
+      while (!normalTypeSortedColumnIndexList.isEmpty()
+          || !binaryTypeSortedColumnIndexList.isEmpty()) {
+        Queue<Integer> sourceTypeSortedList =
+            binaryTypeSortedColumnIndexList.isEmpty()
+                ? normalTypeSortedColumnIndexList
+                : binaryTypeSortedColumnIndexList;
+        Integer columnIndex = sourceTypeSortedList.poll();
+        this.columnIndexList.add(columnIndex);
+        
this.currentSelectedColumnSchemaList.add(this.schemaList.get(columnIndex));
+        selectedColumnNum++;
+        if (this.columnIndexList.size() >= batchSize) {
+          break;
+        }
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
index 2404c99f114..61c85a4bd47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.loader.ChunkLoader;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.loader.InstantChunkLoader;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.loader.InstantPageLoader;
@@ -57,8 +58,6 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -96,6 +95,7 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
     this.targetResource = targetResource;
     this.summary = summary;
     collectValueColumnSchemaList();
+    fillAlignedChunkMetadataToMatchSchemaList();
     int compactionFileLevel =
         
Integer.parseInt(this.targetResource.getTsFile().getName().split("-")[2]);
     flushController = new 
ReadChunkAlignedSeriesCompactionFlushController(compactionFileLevel);
@@ -120,10 +120,6 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
     flushController = new 
ReadChunkAlignedSeriesCompactionFlushController(compactionFileLevel);
     this.timeSchema = timeSchema;
     this.schemaList = valueSchemaList;
-    this.measurementSchemaListIndexMap = new HashMap<>();
-    for (int i = 0; i < schemaList.size(); i++) {
-      measurementSchemaListIndexMap.put(schemaList.get(i).getMeasurementId(), 
i);
-    }
     this.chunkWriter = constructAlignedChunkWriter();
   }
 
@@ -172,10 +168,18 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
         measurementSchemaMap.values().stream()
             .sorted(Comparator.comparing(IMeasurementSchema::getMeasurementId))
             .collect(Collectors.toList());
+  }
 
-    this.measurementSchemaListIndexMap = new HashMap<>();
-    for (int i = 0; i < schemaList.size(); i++) {
-      
this.measurementSchemaListIndexMap.put(schemaList.get(i).getMeasurementId(), i);
+  private void fillAlignedChunkMetadataToMatchSchemaList() {
+    for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> pair : 
readerAndChunkMetadataList) {
+      List<AlignedChunkMetadata> alignedChunkMetadataList = pair.getRight();
+      for (int i = 0; i < alignedChunkMetadataList.size(); i++) {
+        AlignedChunkMetadata alignedChunkMetadata = 
alignedChunkMetadataList.get(i);
+        alignedChunkMetadataList.set(
+            i,
+            
AlignedSeriesBatchCompactionUtils.fillAlignedChunkMetadataBySchemaList(
+                alignedChunkMetadata, schemaList));
+      }
     }
   }
 
@@ -210,17 +214,17 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
       throws IOException, PageException {
     ChunkLoader timeChunk =
         getChunkLoader(reader, (ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
-    List<ChunkLoader> valueChunks = Arrays.asList(new 
ChunkLoader[schemaList.size()]);
-    Collections.fill(valueChunks, getChunkLoader(reader, null));
+    List<ChunkLoader> valueChunks = new ArrayList<>(schemaList.size());
     long pointNum = 0;
-    for (IChunkMetadata chunkMetadata : 
alignedChunkMetadata.getValueChunkMetadataList()) {
-      if (chunkMetadata == null || 
!isValueChunkDataTypeMatchSchema(chunkMetadata)) {
+    for (int i = 0; i < 
alignedChunkMetadata.getValueChunkMetadataList().size(); i++) {
+      IChunkMetadata chunkMetadata = 
alignedChunkMetadata.getValueChunkMetadataList().get(i);
+      if (chunkMetadata == null
+          || !chunkMetadata.getDataType().equals(schemaList.get(i).getType())) 
{
+        valueChunks.add(getChunkLoader(reader, null));
         continue;
       }
       pointNum += chunkMetadata.getStatistics().getCount();
-      ChunkLoader valueChunk = getChunkLoader(reader, (ChunkMetadata) 
chunkMetadata);
-      int idx = 
measurementSchemaListIndexMap.get(chunkMetadata.getMeasurementUid());
-      valueChunks.set(idx, valueChunk);
+      valueChunks.add(getChunkLoader(reader, (ChunkMetadata) chunkMetadata));
     }
     summary.increaseProcessPointNum(pointNum);
     if (flushController.canFlushCurrentChunkWriter()) {
@@ -233,12 +237,6 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
     }
   }
 
-  private boolean isValueChunkDataTypeMatchSchema(IChunkMetadata 
valueChunkMetadata) {
-    String measurement = valueChunkMetadata.getMeasurementUid();
-    IMeasurementSchema schema = 
schemaList.get(measurementSchemaListIndexMap.get(measurement));
-    return schema.getType() == valueChunkMetadata.getDataType();
-  }
-
   private ChunkLoader getChunkLoader(TsFileSequenceReader reader, 
ChunkMetadata chunkMetadata)
       throws IOException {
     if (chunkMetadata == null || chunkMetadata.getStatistics().getCount() == 
0) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/BatchCompactionUtilsTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/BatchCompactionUtilsTest.java
index 813ee8cf1de..673f83f2a3d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/BatchCompactionUtilsTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/BatchCompactionUtilsTest.java
@@ -61,9 +61,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 public class BatchCompactionUtilsTest extends AbstractCompactionTest {
 
@@ -88,11 +86,11 @@ public class BatchCompactionUtilsTest extends 
AbstractCompactionTest {
         measurementSchemas.add(new MeasurementSchema("s" + i, 
TSDataType.INT32));
       }
     }
-    Set<String> selectedColumns = new HashSet<>();
-    List<IMeasurementSchema> iMeasurementSchemas =
-        AlignedSeriesBatchCompactionUtils.selectColumnBatchToCompact(
-            measurementSchemas, selectedColumns, 10);
-    Assert.assertEquals(10, iMeasurementSchemas.size());
+    AlignedSeriesBatchCompactionUtils.BatchColumnSelection 
batchColumnSelection =
+        new 
AlignedSeriesBatchCompactionUtils.BatchColumnSelection(measurementSchemas, 10);
+    Assert.assertTrue(batchColumnSelection.hasNext());
+    batchColumnSelection.next();
+    Assert.assertEquals(10, 
batchColumnSelection.getCurrentSelectedColumnSchemaList().size());
   }
 
   @Test

Reply via email to