This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch perf/szh/optimize_window_partition
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 441f29d92a335e6787ab413e73d6b6c20008d29e
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Mon Mar 9 19:02:55 2026 +0800

    Reuse table function's component.
---
 .../process/function/partition/PartitionCache.java |   9 +
 .../rowpattern/PatternPartitionExecutor.java       |  36 +--
 .../process/window/TableWindowOperator.java        | 248 ++++++---------------
 .../process/window/partition/Partition.java        | 213 +++++++++---------
 .../window/partition/PartitionExecutor.java        |  49 ++--
 5 files changed, 228 insertions(+), 327 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java
index 0591de961e2..7da3d4821a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.process.function.part
 import org.apache.tsfile.block.column.Column;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /** Used to manage the slices of the partition. It is all in memory now. */
@@ -97,6 +98,14 @@ public class PartitionCache {
         && passThroughIndex < getSliceOffset(sliceIndex) + 
slices.get(sliceIndex).getSize();
   }
 
+  public List<Slice> getSlices() {
+    return Collections.unmodifiableList(slices);
+  }
+
+  public boolean isEmpty() {
+    return slices.isEmpty();
+  }
+
   public long getEstimatedSize() {
     return estimatedSize;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java
index d76fe1232c5..402cc7e8312 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java
@@ -32,7 +32,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowsPerMatch
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SkipToPosition;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
@@ -205,18 +204,12 @@ public final class PatternPartitionExecutor {
   // for ALL ROWS PER MATCH WITH UNMATCHED ROWS
   // the output for unmatched row refers to no pattern match.
   private void outputUnmatchedRow(TsBlockBuilder builder) {
-    // Copy origin data
     int index = currentPosition - partitionStart;
-    Partition.PartitionIndex partitionIndex = 
partition.getPartitionIndex(index);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-    TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
 
     int channel = 0;
     for (int i = 0; i < outputChannels.size(); i++) {
-      Column column = tsBlock.getColumn(outputChannels.get(i));
       ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
-      columnBuilder.write(column, offsetInTsBlock);
+      partition.writeTo(columnBuilder, outputChannels.get(i), index);
       channel++;
     }
 
@@ -231,18 +224,12 @@ public final class PatternPartitionExecutor {
 
   // the output for empty match refers to empty pattern match.
   private void outputEmptyMatch(TsBlockBuilder builder) {
-    // Copy origin data
     int index = currentPosition - partitionStart;
-    Partition.PartitionIndex partitionIndex = 
partition.getPartitionIndex(index);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-    TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
 
     int channel = 0;
     for (int i = 0; i < outputChannels.size(); i++) {
-      Column column = tsBlock.getColumn(outputChannels.get(i));
       ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
-      columnBuilder.write(column, offsetInTsBlock);
+      partition.writeTo(columnBuilder, outputChannels.get(i), index);
       channel++;
     }
 
@@ -268,20 +255,13 @@ public final class PatternPartitionExecutor {
       int patternStart,
       int searchStart,
       int searchEnd) {
-    // Copy origin data
     int index = currentPosition - partitionStart;
-    Partition.PartitionIndex partitionIndex = 
partition.getPartitionIndex(index);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-    TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
 
     int channel = 0;
     // PARTITION BY
     for (int i = 0; i < outputChannels.size(); i++) {
-      Column column = tsBlock.getColumn(outputChannels.get(i));
       ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
-      columnBuilder.write(column, offsetInTsBlock);
-
+      partition.writeTo(columnBuilder, outputChannels.get(i), index);
       channel++;
     }
 
@@ -348,18 +328,10 @@ public final class PatternPartitionExecutor {
   // Called by method outputAllRowsPerMatch
   private void outputRow(
       TsBlockBuilder builder, ArrayView labels, int position, int searchStart, 
int searchEnd) {
-    // Copy origin data
-    Partition.PartitionIndex partitionIndex = 
partition.getPartitionIndex(position);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-    TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
-
-    // map the column data of the current row from the input table to the 
output table
     int channel = 0;
     for (int i = 0; i < outputChannels.size(); i++) {
-      Column column = tsBlock.getColumn(outputChannels.get(i));
       ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
-      columnBuilder.write(column, offsetInTsBlock);
+      partition.writeTo(columnBuilder, outputChannels.get(i), position);
       channel++;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
index b6fb47d601f..9309ee10479 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
@@ -23,15 +23,18 @@ import 
org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.PartitionRecognizer;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionCache;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.PartitionExecutor;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.frame.FrameInfo;
-import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.RowComparator;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
@@ -65,10 +68,8 @@ public class TableWindowOperator implements ProcessOperator {
   private final List<FrameInfo> frameInfoList;
 
   // Partition
-  private final List<Integer> partitionChannels;
-  private final RowComparator partitionComparator;
-  private final List<TsBlock> cachedTsBlocks;
-  private int startIndexInFirstBlock;
+  private final PartitionRecognizer partitionRecognizer;
+  private final PartitionCache partitionCache;
 
   // Sort
   private final List<Integer> sortChannels;
@@ -80,6 +81,7 @@ public class TableWindowOperator implements ProcessOperator {
   private long totalMemorySize;
   private long maxUsedMemory;
   private final long maxRuntime;
+  private boolean noMoreDataSignaled;
 
   public TableWindowOperator(
       OperatorContext operatorContext,
@@ -91,38 +93,32 @@ public class TableWindowOperator implements ProcessOperator 
{
       List<FrameInfo> frameInfoList,
       List<Integer> partitionChannels,
       List<Integer> sortChannels) {
-    // Common part(among all other operators)
     this.operatorContext = operatorContext;
     this.inputOperator = inputOperator;
     this.inputDataTypes = ImmutableList.copyOf(inputDataTypes);
     this.outputChannels = ImmutableList.copyOf(outputChannels);
     this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
 
-    // Basic information part
     this.windowFunctions = ImmutableList.copyOf(windowFunctions);
     this.frameInfoList = ImmutableList.copyOf(frameInfoList);
 
-    // Partition Part
-    this.partitionChannels = ImmutableList.copyOf(partitionChannels);
-    // Acquire partition channels' data types
-    List<TSDataType> partitionDataTypes = new ArrayList<>();
-    for (Integer channel : partitionChannels) {
-      partitionDataTypes.add(inputDataTypes.get(channel));
+    List<Integer> requiredChannels = new ArrayList<>(inputDataTypes.size());
+    for (int i = 0; i < inputDataTypes.size(); i++) {
+      requiredChannels.add(i);
     }
-    this.partitionComparator = new RowComparator(partitionDataTypes);
+    this.partitionRecognizer =
+        new PartitionRecognizer(
+            partitionChannels, requiredChannels, Collections.emptyList(), 
inputDataTypes);
+    this.partitionCache = new PartitionCache();
 
-    // Ordering part
     this.sortChannels = ImmutableList.copyOf(sortChannels);
 
-    // Transformation part
     this.cachedPartitionExecutors = new LinkedList<>();
 
-    // Misc
-    this.cachedTsBlocks = new ArrayList<>();
-    this.startIndexInFirstBlock = -1;
     this.maxRuntime = 
this.operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     this.totalMemorySize = 0;
     this.maxUsedMemory = 0;
+    this.noMoreDataSignaled = false;
     this.memoryReservationManager =
         operatorContext
             .getDriverContext()
@@ -144,178 +140,82 @@ public class TableWindowOperator implements 
ProcessOperator {
   public TsBlock next() throws Exception {
     long startTime = System.nanoTime();
 
-    // Transform is not finished
     if (!cachedPartitionExecutors.isEmpty()) {
       TsBlock tsBlock = transform(startTime);
       if (tsBlock != null) {
         return tsBlock;
       }
-      // Receive more data when result TsBlock builder is not full
-      // In this case, all partition executors are done
     }
 
     if (inputOperator.hasNextWithTimer()) {
-      // This TsBlock is pre-sorted with PARTITION BY and ORDER BY channels
       TsBlock preSortedBlock = inputOperator.nextWithTimer();
-      // StreamSort Operator sometimes returns null
       if (preSortedBlock == null || preSortedBlock.isEmpty()) {
         return null;
       }
 
-      cachedPartitionExecutors = partition(preSortedBlock);
-      if (cachedPartitionExecutors.isEmpty()) {
-        // No partition found
-        // i.e., partition crosses multiple TsBlocks
-        return null;
-      }
-
-      // May return null if builder is not full
-      return transform(startTime);
-    } else if (!cachedTsBlocks.isEmpty()) {
-      // Form last partition
-      TsBlock lastTsBlock = cachedTsBlocks.get(cachedTsBlocks.size() - 1);
-      int endIndexOfLastTsBlock = lastTsBlock.getPositionCount();
-      PartitionExecutor partitionExecutor =
-          new PartitionExecutor(
-              cachedTsBlocks,
-              inputDataTypes,
-              startIndexInFirstBlock,
-              endIndexOfLastTsBlock,
-              outputChannels,
-              windowFunctions,
-              frameInfoList,
-              sortChannels);
-      cachedPartitionExecutors.addLast(partitionExecutor);
-      cachedTsBlocks.clear();
-      releaseAllCachedTsBlockMemory();
+      partitionRecognizer.addTsBlock(preSortedBlock);
+      processRecognizerStates();
 
-      TsBlock tsBlock = transform(startTime);
-      if (tsBlock == null) {
-        // TsBlockBuilder is not full
-        // Force build since this is the last partition
-        tsBlock =
-            tsBlockBuilder.build(
-                new RunLengthEncodedColumn(
-                    TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount()));
-        tsBlockBuilder.reset();
+      if (!cachedPartitionExecutors.isEmpty()) {
+        return transform(startTime);
+      }
+      return null;
+    } else if (!noMoreDataSignaled) {
+      partitionRecognizer.noMoreData();
+      noMoreDataSignaled = true;
+      processRecognizerStates();
+
+      if (!cachedPartitionExecutors.isEmpty()) {
+        TsBlock tsBlock = transform(startTime);
+        if (tsBlock == null && !tsBlockBuilder.isEmpty()) {
+          tsBlock = getTsBlockFromTsBlockBuilder();
+        }
+        return tsBlock;
       }
-
-      return tsBlock;
     } else if (!tsBlockBuilder.isEmpty()) {
-      // Return remaining data in result TsBlockBuilder
-      // This happens when last partition is too large
-      // And TsBlockBuilder is not full at the end of transform
       return getTsBlockFromTsBlockBuilder();
     }
 
     return null;
   }
 
-  private LinkedList<PartitionExecutor> partition(TsBlock tsBlock) {
-    LinkedList<PartitionExecutor> partitionExecutors = new LinkedList<>();
-
-    int partitionStartInCurrentBlock = 0;
-    int partitionEndInCurrentBlock = partitionStartInCurrentBlock + 1;
-
-    // In this stage, we only consider partition channels
-    List<Column> partitionColumns = extractPartitionColumns(tsBlock);
-
-    // Previous TsBlocks forms a partition
-    if (!cachedTsBlocks.isEmpty()) {
-      TsBlock lastTsBlock = cachedTsBlocks.get(cachedTsBlocks.size() - 1);
-      int endIndexOfLastTsBlock = lastTsBlock.getPositionCount();
-
-      // Whether the first row of current TsBlock is not equal to
-      // last row of previous cached TsBlocks
-      List<Column> lastPartitionColumns = extractPartitionColumns(lastTsBlock);
-      if (!partitionComparator.equal(
-          partitionColumns, 0, lastPartitionColumns, endIndexOfLastTsBlock - 
1)) {
-        PartitionExecutor partitionExecutor =
-            new PartitionExecutor(
-                cachedTsBlocks,
-                inputDataTypes,
-                startIndexInFirstBlock,
-                endIndexOfLastTsBlock,
-                outputChannels,
-                windowFunctions,
-                frameInfoList,
-                sortChannels);
-
-        partitionExecutors.addLast(partitionExecutor);
-        cachedTsBlocks.clear();
-        releaseAllCachedTsBlockMemory();
-        startIndexInFirstBlock = -1;
+  private void processRecognizerStates() {
+    while (true) {
+      PartitionState state = partitionRecognizer.nextState();
+      switch (state.getStateType()) {
+        case INIT:
+        case NEED_MORE_DATA:
+          return;
+        case FINISHED:
+          finalizeCurrentPartition();
+          return;
+        case NEW_PARTITION:
+          finalizeCurrentPartition();
+          addSliceToCache(state.getSlice());
+          break;
+        case ITERATING:
+          addSliceToCache(state.getSlice());
+          break;
       }
     }
+  }
 
-    // Try to find all partitions
-    int count = tsBlock.getPositionCount();
-    while (count == 1 || partitionEndInCurrentBlock < count) {
-      // Try to find one partition
-      while (partitionEndInCurrentBlock < count
-          && partitionComparator.equalColumns(
-              partitionColumns, partitionStartInCurrentBlock, 
partitionEndInCurrentBlock)) {
-        partitionEndInCurrentBlock++;
-      }
-
-      if (partitionEndInCurrentBlock != count) {
-        // Find partition
-        PartitionExecutor partitionExecutor;
-        if (partitionStartInCurrentBlock != 0 || startIndexInFirstBlock == -1) 
{
-          // Small partition within this TsBlock
-          partitionExecutor =
-              new PartitionExecutor(
-                  Collections.singletonList(tsBlock),
-                  inputDataTypes,
-                  partitionStartInCurrentBlock,
-                  partitionEndInCurrentBlock,
-                  outputChannels,
-                  windowFunctions,
-                  frameInfoList,
-                  sortChannels);
-        } else {
-          // Large partition crosses multiple TsBlocks
-          reserveOneTsBlockMemory(tsBlock);
-          cachedTsBlocks.add(tsBlock);
-          partitionExecutor =
-              new PartitionExecutor(
-                  cachedTsBlocks,
-                  inputDataTypes,
-                  startIndexInFirstBlock,
-                  partitionEndInCurrentBlock,
-                  outputChannels,
-                  windowFunctions,
-                  frameInfoList,
-                  sortChannels);
-          // Clear TsBlock of last partition
-          cachedTsBlocks.clear();
-          releaseAllCachedTsBlockMemory();
-        }
-        partitionExecutors.addLast(partitionExecutor);
-
-        partitionStartInCurrentBlock = partitionEndInCurrentBlock;
-        // Reset cross-TsBlock tracking after partition completion
-        startIndexInFirstBlock = -1;
-      } else {
-        // Last partition of TsBlock
-        // The beginning of next TsBlock may have rows in this partition
-        if (startIndexInFirstBlock == -1) {
-          startIndexInFirstBlock = partitionStartInCurrentBlock;
-        }
-        reserveOneTsBlockMemory(tsBlock);
-        cachedTsBlocks.add(tsBlock);
-        // For count == 1
-        break;
-      }
+  private void finalizeCurrentPartition() {
+    if (!partitionCache.isEmpty()) {
+      Partition partition = new Partition(partitionCache.getSlices());
+      PartitionExecutor partitionExecutor =
+          new PartitionExecutor(
+              partition, inputDataTypes, outputChannels, windowFunctions, 
frameInfoList,
+              sortChannels);
+      cachedPartitionExecutors.addLast(partitionExecutor);
+      releasePartitionCacheMemory();
+      partitionCache.clear();
     }
-
-    return partitionExecutors;
   }
 
   private TsBlock transform(long startTime) {
     while (!cachedPartitionExecutors.isEmpty()) {
       PartitionExecutor partitionExecutor = 
cachedPartitionExecutors.getFirst();
-      // Reset window functions for new partition
       partitionExecutor.resetWindowFunctions();
 
       while (System.nanoTime() - startTime < maxRuntime
@@ -333,19 +233,9 @@ public class TableWindowOperator implements 
ProcessOperator {
       }
     }
 
-    // Reach partition end, but builder is not full yet
     return null;
   }
 
-  private List<Column> extractPartitionColumns(TsBlock tsBlock) {
-    List<Column> partitionColumns = new ArrayList<>(partitionChannels.size());
-    for (int channel : partitionChannels) {
-      Column partitionColumn = tsBlock.getColumn(channel);
-      partitionColumns.add(partitionColumn);
-    }
-    return partitionColumns;
-  }
-
   private TsBlock getTsBlockFromTsBlockBuilder() {
     TsBlock result =
         tsBlockBuilder.build(
@@ -358,13 +248,14 @@ public class TableWindowOperator implements 
ProcessOperator {
   public boolean hasNext() throws Exception {
     return !cachedPartitionExecutors.isEmpty()
         || inputOperator.hasNext()
-        || !cachedTsBlocks.isEmpty()
+        || !partitionCache.isEmpty()
         || !tsBlockBuilder.isEmpty();
   }
 
   @Override
   public void close() throws Exception {
     inputOperator.close();
+    partitionCache.close();
     if (totalMemorySize != 0) {
       memoryReservationManager.releaseMemoryCumulatively(totalMemorySize);
     }
@@ -375,19 +266,19 @@ public class TableWindowOperator implements 
ProcessOperator {
     return !this.hasNextWithTimer();
   }
 
-  private void reserveOneTsBlockMemory(TsBlock tsBlock) {
-    long reserved = tsBlock.getTotalInstanceSize();
+  private void addSliceToCache(Slice slice) {
+    long reserved = slice.getEstimatedSize();
     memoryReservationManager.reserveMemoryCumulatively(reserved);
     totalMemorySize += reserved;
     maxUsedMemory = Math.max(maxUsedMemory, totalMemorySize);
     operatorContext.recordSpecifiedInfo(MAX_RESERVED_MEMORY, 
Long.toString(maxUsedMemory));
+    partitionCache.addSlice(slice);
   }
 
-  private void releaseAllCachedTsBlockMemory() {
-    long released = 
cachedTsBlocks.stream().mapToInt(TsBlock::getTotalInstanceSize).sum();
+  private void releasePartitionCacheMemory() {
+    long released = partitionCache.getEstimatedSize();
     memoryReservationManager.releaseMemoryCumulatively(released);
     totalMemorySize -= released;
-    // No need to update maxUsedMemory
     operatorContext.recordSpecifiedInfo(MAX_RESERVED_MEMORY, 
Long.toString(maxUsedMemory));
   }
 
@@ -415,6 +306,7 @@ public class TableWindowOperator implements ProcessOperator 
{
     return INSTANCE_SIZE
         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
-        + tsBlockBuilder.getRetainedSizeInBytes();
+        + tsBlockBuilder.getRetainedSizeInBytes()
+        + partitionCache.getEstimatedSize();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java
index b8acd7fae11..ac23bbc9fef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java
@@ -19,6 +19,7 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.process.window.partition;
 
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.ColumnList;
 
 import org.apache.tsfile.block.column.Column;
@@ -27,40 +28,45 @@ import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.utils.Binary;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 public class Partition {
-  private final List<TsBlock> tsBlocks;
+  private final List<Column[]> segments;
   private int cachedPositionCount = -1;
 
   public Partition(List<TsBlock> tsBlocks, int startIndexInFirstBlock, int 
endIndexInLastBlock) {
+    this.segments = new ArrayList<>(tsBlocks.size());
     if (tsBlocks.size() == 1) {
       int length = endIndexInLastBlock - startIndexInFirstBlock;
-      this.tsBlocks =
-          
Collections.singletonList(tsBlocks.get(0).getRegion(startIndexInFirstBlock, 
length));
-      return;
+      TsBlock region = tsBlocks.get(0).getRegion(startIndexInFirstBlock, 
length);
+      segments.add(region.getValueColumns());
+    } else {
+      TsBlock firstBlock = tsBlocks.get(0).subTsBlock(startIndexInFirstBlock);
+      segments.add(firstBlock.getValueColumns());
+      for (int i = 1; i < tsBlocks.size() - 1; i++) {
+        segments.add(tsBlocks.get(i).getValueColumns());
+      }
+      TsBlock lastBlock = tsBlocks.get(tsBlocks.size() - 1).getRegion(0, 
endIndexInLastBlock);
+      segments.add(lastBlock.getValueColumns());
     }
+  }
 
-    this.tsBlocks = new ArrayList<>(tsBlocks.size());
-    // First TsBlock
-    TsBlock firstBlock = tsBlocks.get(0).subTsBlock(startIndexInFirstBlock);
-    this.tsBlocks.add(firstBlock);
-    // Middle TsBlock
-    for (int i = 1; i < tsBlocks.size() - 1; i++) {
-      this.tsBlocks.add(tsBlocks.get(i));
+  public Partition(List<Slice> slices) {
+    this.segments = new ArrayList<>(slices.size());
+    for (Slice slice : slices) {
+      segments.add(slice.getRequiredColumns());
     }
-    // Last TsBlock
-    TsBlock lastBlock = tsBlocks.get(tsBlocks.size() - 1).getRegion(0, 
endIndexInLastBlock);
-    this.tsBlocks.add(lastBlock);
+  }
+
+  private Partition(List<Column[]> segments, boolean directSegments) {
+    this.segments = segments;
   }
 
   public int getPositionCount() {
     if (cachedPositionCount == -1) {
-      // Lazy initialized
       cachedPositionCount = 0;
-      for (TsBlock block : tsBlocks) {
-        cachedPositionCount += block.getPositionCount();
+      for (Column[] segment : segments) {
+        cachedPositionCount += segment[0].getPositionCount();
       }
     }
 
@@ -68,144 +74,149 @@ public class Partition {
   }
 
   public int getValueColumnCount() {
-    return tsBlocks.get(0).getValueColumnCount();
-  }
-
-  public TsBlock getTsBlock(int tsBlockIndex) {
-    return tsBlocks.get(tsBlockIndex);
+    return segments.get(0).length;
   }
 
   public List<Column[]> getAllColumns() {
-    List<Column[]> allColumns = new ArrayList<>();
-    for (TsBlock block : tsBlocks) {
-      allColumns.add(block.getAllColumns());
-    }
-
-    return allColumns;
+    return segments;
   }
 
   public boolean getBoolean(int channel, int rowIndex) {
     PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
-    TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
-    return tsBlock.getColumn(channel).getBoolean(offsetInTsBlock);
+    int segmentIndex = partitionIndex.getSegmentIndex();
+    int offset = partitionIndex.getOffsetInSegment();
+    return segments.get(segmentIndex)[channel].getBoolean(offset);
   }
 
   public int getInt(int channel, int rowIndex) {
     PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
-    TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
-    return tsBlock.getColumn(channel).getInt(offsetInTsBlock);
+    int segmentIndex = partitionIndex.getSegmentIndex();
+    int offset = partitionIndex.getOffsetInSegment();
+    return segments.get(segmentIndex)[channel].getInt(offset);
   }
 
   public long getLong(int channel, int rowIndex) {
     PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
-    TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
-    return tsBlock.getColumn(channel).getLong(offsetInTsBlock);
+    int segmentIndex = partitionIndex.getSegmentIndex();
+    int offset = partitionIndex.getOffsetInSegment();
+    return segments.get(segmentIndex)[channel].getLong(offset);
   }
 
   public float getFloat(int channel, int rowIndex) {
     PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
-    TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
-    return tsBlock.getColumn(channel).getFloat(offsetInTsBlock);
+    int segmentIndex = partitionIndex.getSegmentIndex();
+    int offset = partitionIndex.getOffsetInSegment();
+    return segments.get(segmentIndex)[channel].getFloat(offset);
   }
 
   public double getDouble(int channel, int rowIndex) {
     PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
-    TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
-    return tsBlock.getColumn(channel).getDouble(offsetInTsBlock);
+    int segmentIndex = partitionIndex.getSegmentIndex();
+    int offset = partitionIndex.getOffsetInSegment();
+    return segments.get(segmentIndex)[channel].getDouble(offset);
   }
 
   public Binary getBinary(int channel, int rowIndex) {
     PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
-    TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
-    return tsBlock.getColumn(channel).getBinary(offsetInTsBlock);
+    int segmentIndex = partitionIndex.getSegmentIndex();
+    int offset = partitionIndex.getOffsetInSegment();
+    return segments.get(segmentIndex)[channel].getBinary(offset);
   }
 
   public boolean isNull(int channel, int rowIndex) {
     PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
-    TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
-    return tsBlock.getColumn(channel).isNull(offsetInTsBlock);
+    int segmentIndex = partitionIndex.getSegmentIndex();
+    int offset = partitionIndex.getOffsetInSegment();
+    return segments.get(segmentIndex)[channel].isNull(offset);
   }
 
   public void writeTo(ColumnBuilder builder, int channel, int rowIndex) {
     PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
-    Column column = tsBlocks.get(tsBlockIndex).getColumn(channel);
-    builder.write(column, offsetInTsBlock);
+    int segmentIndex = partitionIndex.getSegmentIndex();
+    int offset = partitionIndex.getOffsetInSegment();
+    Column column = segments.get(segmentIndex)[channel];
+    builder.write(column, offset);
   }
 
   public static class PartitionIndex {
-    private final int tsBlockIndex;
-    private final int offsetInTsBlock;
+    private final int segmentIndex;
+    private final int offsetInSegment;
 
-    PartitionIndex(int tsBlockIndex, int offsetInTsBlock) {
-      this.tsBlockIndex = tsBlockIndex;
-      this.offsetInTsBlock = offsetInTsBlock;
+    PartitionIndex(int segmentIndex, int offsetInSegment) {
+      this.segmentIndex = segmentIndex;
+      this.offsetInSegment = offsetInSegment;
     }
 
-    public int getTsBlockIndex() {
-      return tsBlockIndex;
+    public int getSegmentIndex() {
+      return segmentIndex;
     }
 
-    public int getOffsetInTsBlock() {
-      return offsetInTsBlock;
+    public int getOffsetInSegment() {
+      return offsetInSegment;
     }
   }
 
   // start and end are indexes within partition
   // Both of them are inclusive, i.e. [start, end]
   public Partition getRegion(int start, int end) {
-    PartitionIndex startPartitionIndex = getPartitionIndex(start);
-    PartitionIndex endPartitionIndex = getPartitionIndex(end);
-
-    List<TsBlock> tsBlockList = new ArrayList<>();
-    int startTsBlockIndex = startPartitionIndex.getTsBlockIndex();
-    int endTsBlockIndex = endPartitionIndex.getTsBlockIndex();
-    for (int i = startTsBlockIndex; i <= endTsBlockIndex; i++) {
-      tsBlockList.add(tsBlocks.get(i));
+    PartitionIndex startPI = getPartitionIndex(start);
+    PartitionIndex endPI = getPartitionIndex(end);
+
+    int startSeg = startPI.getSegmentIndex();
+    int endSeg = endPI.getSegmentIndex();
+    int columnCount = segments.get(0).length;
+
+    List<Column[]> regionSegments = new ArrayList<>();
+
+    if (startSeg == endSeg) {
+      int offset = startPI.getOffsetInSegment();
+      int length = endPI.getOffsetInSegment() - offset + 1;
+      Column[] cols = segments.get(startSeg);
+      Column[] region = new Column[columnCount];
+      for (int c = 0; c < columnCount; c++) {
+        region[c] = cols[c].getRegion(offset, length);
+      }
+      regionSegments.add(region);
+    } else {
+      // First segment
+      Column[] firstCols = segments.get(startSeg);
+      int firstOffset = startPI.getOffsetInSegment();
+      int firstLen = firstCols[0].getPositionCount() - firstOffset;
+      Column[] firstRegion = new Column[columnCount];
+      for (int c = 0; c < columnCount; c++) {
+        firstRegion[c] = firstCols[c].getRegion(firstOffset, firstLen);
+      }
+      regionSegments.add(firstRegion);
+
+      // Middle segments
+      for (int i = startSeg + 1; i < endSeg; i++) {
+        regionSegments.add(segments.get(i));
+      }
+
+      // Last segment
+      Column[] lastCols = segments.get(endSeg);
+      int lastLen = endPI.getOffsetInSegment() + 1;
+      Column[] lastRegion = new Column[columnCount];
+      for (int c = 0; c < columnCount; c++) {
+        lastRegion[c] = lastCols[c].getRegion(0, lastLen);
+      }
+      regionSegments.add(lastRegion);
     }
 
-    int startIndexInFirstBlock = startPartitionIndex.getOffsetInTsBlock();
-    int endIndexInLastBlock = endPartitionIndex.getOffsetInTsBlock();
-    return new Partition(tsBlockList, startIndexInFirstBlock, 
endIndexInLastBlock + 1);
+    return new Partition(regionSegments, true);
   }
 
-  // rowIndex is index within partition
   public PartitionIndex getPartitionIndex(int rowIndex) {
-    int tsBlockIndex = 0;
-    while (tsBlockIndex < tsBlocks.size()
-        && rowIndex >= tsBlocks.get(tsBlockIndex).getPositionCount()) {
-      rowIndex -= tsBlocks.get(tsBlockIndex).getPositionCount();
-      // Enter next TsBlock
-      tsBlockIndex++;
+    int segmentIndex = 0;
+    while (segmentIndex < segments.size()
+        && rowIndex >= segments.get(segmentIndex)[0].getPositionCount()) {
+      rowIndex -= segments.get(segmentIndex)[0].getPositionCount();
+      segmentIndex++;
     }
 
-    if (tsBlockIndex != tsBlocks.size()) {
-      return new PartitionIndex(tsBlockIndex, rowIndex);
+    if (segmentIndex != segments.size()) {
+      return new PartitionIndex(segmentIndex, rowIndex);
     } else {
-      // Unlikely
       throw new IndexOutOfBoundsException("Index out of Partition's bounds!");
     }
   }
@@ -215,8 +226,8 @@ public class Partition {
 
     for (Integer sortedChannel : sortedChannels) {
       List<Column> columns = new ArrayList<>();
-      for (TsBlock tsBlock : tsBlocks) {
-        columns.add(tsBlock.getColumn(sortedChannel));
+      for (Column[] segment : segments) {
+        columns.add(segment[sortedChannel]);
       }
       columnLists.add(new ColumnList(columns));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
index 20167955d79..d5a68ea5796 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.R
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.RowComparator;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
@@ -69,17 +68,42 @@ public final class PartitionExecutor {
       List<WindowFunction> windowFunctions,
       List<FrameInfo> frameInfoList,
       List<Integer> sortChannels) {
-    // Partition
-    this.partition = new Partition(tsBlocks, startIndexInFirstBlock, 
endIndexInLastBlock);
-    this.partitionStart = startIndexInFirstBlock;
-    this.partitionEnd = startIndexInFirstBlock + 
this.partition.getPositionCount();
-    // Window functions and frames
+    this(
+        new Partition(tsBlocks, startIndexInFirstBlock, endIndexInLastBlock),
+        dataTypes,
+        startIndexInFirstBlock,
+        outputChannels,
+        windowFunctions,
+        frameInfoList,
+        sortChannels);
+  }
+
+  public PartitionExecutor(
+      Partition partition,
+      List<TSDataType> dataTypes,
+      List<Integer> outputChannels,
+      List<WindowFunction> windowFunctions,
+      List<FrameInfo> frameInfoList,
+      List<Integer> sortChannels) {
+    this(partition, dataTypes, 0, outputChannels, windowFunctions, 
frameInfoList, sortChannels);
+  }
+
+  private PartitionExecutor(
+      Partition partition,
+      List<TSDataType> dataTypes,
+      int partitionStart,
+      List<Integer> outputChannels,
+      List<WindowFunction> windowFunctions,
+      List<FrameInfo> frameInfoList,
+      List<Integer> sortChannels) {
+    this.partition = partition;
+    this.partitionStart = partitionStart;
+    this.partitionEnd = partitionStart + this.partition.getPositionCount();
     this.windowFunctions = ImmutableList.copyOf(windowFunctions);
     this.frames = new ArrayList<>();
 
     this.outputChannels = ImmutableList.copyOf(outputChannels);
 
-    // Prepare for peer group comparing
     List<TSDataType> sortDataTypes = new ArrayList<>();
     for (int channel : sortChannels) {
       TSDataType dataType = dataTypes.get(channel);
@@ -118,7 +142,6 @@ public final class PartitionExecutor {
                     peerGroupEnd - partitionStart - 1);
             break;
           default:
-            // Unreachable
             throw new UnsupportedOperationException("Unreachable!");
         }
       }
@@ -131,21 +154,15 @@ public final class PartitionExecutor {
   }
 
   public void processNextRow(TsBlockBuilder builder) {
-    // Copy origin data
     int index = currentPosition - partitionStart;
-    Partition.PartitionIndex partitionIndex = 
partition.getPartitionIndex(index);
-    int tsBlockIndex = partitionIndex.getTsBlockIndex();
-    int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-    TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
 
     int channel = 0;
     for (int i = 0; i < outputChannels.size(); i++) {
-      Column column = tsBlock.getColumn(outputChannels.get(i));
       ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
-      if (column.isNull(offsetInTsBlock)) {
+      if (partition.isNull(outputChannels.get(i), index)) {
         columnBuilder.appendNull();
       } else {
-        columnBuilder.write(column, offsetInTsBlock);
+        partition.writeTo(columnBuilder, outputChannels.get(i), index);
       }
       channel++;
     }


Reply via email to