This is an automated email from the ASF dual-hosted git repository. zhihao pushed a commit to branch perf/szh/optimize_window_partition in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 441f29d92a335e6787ab413e73d6b6c20008d29e Author: Sh-Zh-7 <[email protected]> AuthorDate: Mon Mar 9 19:02:55 2026 +0800 Reuse table function's component. --- .../process/function/partition/PartitionCache.java | 9 + .../rowpattern/PatternPartitionExecutor.java | 36 +-- .../process/window/TableWindowOperator.java | 248 ++++++--------------- .../process/window/partition/Partition.java | 213 +++++++++--------- .../window/partition/PartitionExecutor.java | 49 ++-- 5 files changed, 228 insertions(+), 327 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java index 0591de961e2..7da3d4821a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.process.function.part import org.apache.tsfile.block.column.Column; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** Used to manage the slices of the partition. It is all in memory now. */ @@ -97,6 +98,14 @@ public class PartitionCache { && passThroughIndex < getSliceOffset(sliceIndex) + slices.get(sliceIndex).getSize(); } + public List<Slice> getSlices() { + return Collections.unmodifiableList(slices); + } + + public boolean isEmpty() { + return slices.isEmpty(); + } + public long getEstimatedSize() { return estimatedSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java index d76fe1232c5..402cc7e8312 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowsPerMatch import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SkipToPosition; import com.google.common.collect.ImmutableList; -import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; @@ -205,18 +204,12 @@ public final class PatternPartitionExecutor { // for ALL ROWS PER MATCH WITH UNMATCHED ROWS // the output for unmatched row refers to no pattern match. private void outputUnmatchedRow(TsBlockBuilder builder) { - // Copy origin data int index = currentPosition - partitionStart; - Partition.PartitionIndex partitionIndex = partition.getPartitionIndex(index); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - TsBlock tsBlock = partition.getTsBlock(tsBlockIndex); int channel = 0; for (int i = 0; i < outputChannels.size(); i++) { - Column column = tsBlock.getColumn(outputChannels.get(i)); ColumnBuilder columnBuilder = builder.getColumnBuilder(i); - columnBuilder.write(column, offsetInTsBlock); + partition.writeTo(columnBuilder, outputChannels.get(i), index); channel++; } @@ -231,18 +224,12 @@ public final class PatternPartitionExecutor { // the output for empty match refers to empty pattern match. private void outputEmptyMatch(TsBlockBuilder builder) { - // Copy origin data int index = currentPosition - partitionStart; - Partition.PartitionIndex partitionIndex = partition.getPartitionIndex(index); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - TsBlock tsBlock = partition.getTsBlock(tsBlockIndex); int channel = 0; for (int i = 0; i < outputChannels.size(); i++) { - Column column = tsBlock.getColumn(outputChannels.get(i)); ColumnBuilder columnBuilder = builder.getColumnBuilder(i); - columnBuilder.write(column, offsetInTsBlock); + partition.writeTo(columnBuilder, outputChannels.get(i), index); channel++; } @@ -268,20 +255,13 @@ public final class PatternPartitionExecutor { int patternStart, int searchStart, int searchEnd) { - // Copy origin data int index = currentPosition - partitionStart; - Partition.PartitionIndex partitionIndex = partition.getPartitionIndex(index); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - TsBlock tsBlock = partition.getTsBlock(tsBlockIndex); int channel = 0; // PARTITION BY for (int i = 0; i < outputChannels.size(); i++) { - Column column = tsBlock.getColumn(outputChannels.get(i)); ColumnBuilder columnBuilder = builder.getColumnBuilder(i); - columnBuilder.write(column, offsetInTsBlock); - + partition.writeTo(columnBuilder, outputChannels.get(i), index); channel++; } @@ -348,18 +328,10 @@ public final class PatternPartitionExecutor { // Called by method outputAllRowsPerMatch private void outputRow( TsBlockBuilder builder, ArrayView labels, int position, int searchStart, int searchEnd) { - // Copy origin data - Partition.PartitionIndex partitionIndex = partition.getPartitionIndex(position); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - TsBlock tsBlock = partition.getTsBlock(tsBlockIndex); - - // map the column data of the current row from the input table to the output table int channel = 0; for (int i = 0; i < outputChannels.size(); i++) { - Column column = tsBlock.getColumn(outputChannels.get(i)); ColumnBuilder columnBuilder = builder.getColumnBuilder(i); - columnBuilder.write(column, offsetInTsBlock); + partition.writeTo(columnBuilder, outputChannels.get(i), position); channel++; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java index b6fb47d601f..9309ee10479 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java @@ -23,15 +23,18 @@ import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.PartitionRecognizer; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionCache; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition; import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.PartitionExecutor; import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.frame.FrameInfo; -import org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.RowComparator; import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.tsfile.block.column.Column; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; @@ -65,10 +68,8 @@ public class TableWindowOperator implements ProcessOperator { private final List<FrameInfo> frameInfoList; // Partition - private final List<Integer> partitionChannels; - private final RowComparator partitionComparator; - private final List<TsBlock> cachedTsBlocks; - private int startIndexInFirstBlock; + private final PartitionRecognizer partitionRecognizer; + private final PartitionCache partitionCache; // Sort private final List<Integer> sortChannels; @@ -80,6 +81,7 @@ public class TableWindowOperator implements ProcessOperator { private long totalMemorySize; private long maxUsedMemory; private final long maxRuntime; + private boolean noMoreDataSignaled; public TableWindowOperator( OperatorContext operatorContext, @@ -91,38 +93,32 @@ public class TableWindowOperator implements ProcessOperator { List<FrameInfo> frameInfoList, List<Integer> partitionChannels, List<Integer> sortChannels) { - // Common part(among all other operators) this.operatorContext = operatorContext; this.inputOperator = inputOperator; this.inputDataTypes = ImmutableList.copyOf(inputDataTypes); this.outputChannels = ImmutableList.copyOf(outputChannels); this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes); - // Basic information part this.windowFunctions = ImmutableList.copyOf(windowFunctions); this.frameInfoList = ImmutableList.copyOf(frameInfoList); - // Partition Part - this.partitionChannels = ImmutableList.copyOf(partitionChannels); - // Acquire partition channels' data types - List<TSDataType> partitionDataTypes = new ArrayList<>(); - for (Integer channel : partitionChannels) { - partitionDataTypes.add(inputDataTypes.get(channel)); + List<Integer> requiredChannels = new ArrayList<>(inputDataTypes.size()); + for (int i = 0; i < inputDataTypes.size(); i++) { + requiredChannels.add(i); } - this.partitionComparator = new RowComparator(partitionDataTypes); + this.partitionRecognizer = + new PartitionRecognizer( + partitionChannels, requiredChannels, Collections.emptyList(), inputDataTypes); + this.partitionCache = new PartitionCache(); - // Ordering part this.sortChannels = ImmutableList.copyOf(sortChannels); - // Transformation part this.cachedPartitionExecutors = new LinkedList<>(); - // Misc - this.cachedTsBlocks = new ArrayList<>(); - this.startIndexInFirstBlock = -1; this.maxRuntime = this.operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); this.totalMemorySize = 0; this.maxUsedMemory = 0; + this.noMoreDataSignaled = false; this.memoryReservationManager = operatorContext .getDriverContext() @@ -144,178 +140,82 @@ public class TableWindowOperator implements ProcessOperator { public TsBlock next() throws Exception { long startTime = System.nanoTime(); - // Transform is not finished if (!cachedPartitionExecutors.isEmpty()) { TsBlock tsBlock = transform(startTime); if (tsBlock != null) { return tsBlock; } - // Receive more data when result TsBlock builder is not full - // In this case, all partition executors are done } if (inputOperator.hasNextWithTimer()) { - // This TsBlock is pre-sorted with PARTITION BY and ORDER BY channels TsBlock preSortedBlock = inputOperator.nextWithTimer(); - // StreamSort Operator sometimes returns null if (preSortedBlock == null || preSortedBlock.isEmpty()) { return null; } - cachedPartitionExecutors = partition(preSortedBlock); - if (cachedPartitionExecutors.isEmpty()) { - // No partition found - // i.e., partition crosses multiple TsBlocks - return null; - } - - // May return null if builder is not full - return transform(startTime); - } else if (!cachedTsBlocks.isEmpty()) { - // Form last partition - TsBlock lastTsBlock = cachedTsBlocks.get(cachedTsBlocks.size() - 1); - int endIndexOfLastTsBlock = lastTsBlock.getPositionCount(); - PartitionExecutor partitionExecutor = - new PartitionExecutor( - cachedTsBlocks, - inputDataTypes, - startIndexInFirstBlock, - endIndexOfLastTsBlock, - outputChannels, - windowFunctions, - frameInfoList, - sortChannels); - cachedPartitionExecutors.addLast(partitionExecutor); - cachedTsBlocks.clear(); - releaseAllCachedTsBlockMemory(); + partitionRecognizer.addTsBlock(preSortedBlock); + processRecognizerStates(); - TsBlock tsBlock = transform(startTime); - if (tsBlock == null) { - // TsBlockBuilder is not full - // Force build since this is the last partition - tsBlock = - tsBlockBuilder.build( - new RunLengthEncodedColumn( - TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); - tsBlockBuilder.reset(); + if (!cachedPartitionExecutors.isEmpty()) { + return transform(startTime); + } + return null; + } else if (!noMoreDataSignaled) { + partitionRecognizer.noMoreData(); + noMoreDataSignaled = true; + processRecognizerStates(); + + if (!cachedPartitionExecutors.isEmpty()) { + TsBlock tsBlock = transform(startTime); + if (tsBlock == null && !tsBlockBuilder.isEmpty()) { + tsBlock = getTsBlockFromTsBlockBuilder(); + } + return tsBlock; } - - return tsBlock; } else if (!tsBlockBuilder.isEmpty()) { - // Return remaining data in result TsBlockBuilder - // This happens when last partition is too large - // And TsBlockBuilder is not full at the end of transform return getTsBlockFromTsBlockBuilder(); } return null; } - private LinkedList<PartitionExecutor> partition(TsBlock tsBlock) { - LinkedList<PartitionExecutor> partitionExecutors = new LinkedList<>(); - - int partitionStartInCurrentBlock = 0; - int partitionEndInCurrentBlock = partitionStartInCurrentBlock + 1; - - // In this stage, we only consider partition channels - List<Column> partitionColumns = extractPartitionColumns(tsBlock); - - // Previous TsBlocks forms a partition - if (!cachedTsBlocks.isEmpty()) { - TsBlock lastTsBlock = cachedTsBlocks.get(cachedTsBlocks.size() - 1); - int endIndexOfLastTsBlock = lastTsBlock.getPositionCount(); - - // Whether the first row of current TsBlock is not equal to - // last row of previous cached TsBlocks - List<Column> lastPartitionColumns = extractPartitionColumns(lastTsBlock); - if (!partitionComparator.equal( - partitionColumns, 0, lastPartitionColumns, endIndexOfLastTsBlock - 1)) { - PartitionExecutor partitionExecutor = - new PartitionExecutor( - cachedTsBlocks, - inputDataTypes, - startIndexInFirstBlock, - endIndexOfLastTsBlock, - outputChannels, - windowFunctions, - frameInfoList, - sortChannels); - - partitionExecutors.addLast(partitionExecutor); - cachedTsBlocks.clear(); - releaseAllCachedTsBlockMemory(); - startIndexInFirstBlock = -1; + private void processRecognizerStates() { + while (true) { + PartitionState state = partitionRecognizer.nextState(); + switch (state.getStateType()) { + case INIT: + case NEED_MORE_DATA: + return; + case FINISHED: + finalizeCurrentPartition(); + return; + case NEW_PARTITION: + finalizeCurrentPartition(); + addSliceToCache(state.getSlice()); + break; + case ITERATING: + addSliceToCache(state.getSlice()); + break; } } + } - // Try to find all partitions - int count = tsBlock.getPositionCount(); - while (count == 1 || partitionEndInCurrentBlock < count) { - // Try to find one partition - while (partitionEndInCurrentBlock < count - && partitionComparator.equalColumns( - partitionColumns, partitionStartInCurrentBlock, partitionEndInCurrentBlock)) { - partitionEndInCurrentBlock++; - } - - if (partitionEndInCurrentBlock != count) { - // Find partition - PartitionExecutor partitionExecutor; - if (partitionStartInCurrentBlock != 0 || startIndexInFirstBlock == -1) { - // Small partition within this TsBlock - partitionExecutor = - new PartitionExecutor( - Collections.singletonList(tsBlock), - inputDataTypes, - partitionStartInCurrentBlock, - partitionEndInCurrentBlock, - outputChannels, - windowFunctions, - frameInfoList, - sortChannels); - } else { - // Large partition crosses multiple TsBlocks - reserveOneTsBlockMemory(tsBlock); - cachedTsBlocks.add(tsBlock); - partitionExecutor = - new PartitionExecutor( - cachedTsBlocks, - inputDataTypes, - startIndexInFirstBlock, - partitionEndInCurrentBlock, - outputChannels, - windowFunctions, - frameInfoList, - sortChannels); - // Clear TsBlock of last partition - cachedTsBlocks.clear(); - releaseAllCachedTsBlockMemory(); - } - partitionExecutors.addLast(partitionExecutor); - - partitionStartInCurrentBlock = partitionEndInCurrentBlock; - // Reset cross-TsBlock tracking after partition completion - startIndexInFirstBlock = -1; - } else { - // Last partition of TsBlock - // The beginning of next TsBlock may have rows in this partition - if (startIndexInFirstBlock == -1) { - startIndexInFirstBlock = partitionStartInCurrentBlock; - } - reserveOneTsBlockMemory(tsBlock); - cachedTsBlocks.add(tsBlock); - // For count == 1 - break; - } + private void finalizeCurrentPartition() { + if (!partitionCache.isEmpty()) { + Partition partition = new Partition(partitionCache.getSlices()); + PartitionExecutor partitionExecutor = + new PartitionExecutor( + partition, inputDataTypes, outputChannels, windowFunctions, frameInfoList, + sortChannels); + cachedPartitionExecutors.addLast(partitionExecutor); + releasePartitionCacheMemory(); + partitionCache.clear(); } - - return partitionExecutors; } private TsBlock transform(long startTime) { while (!cachedPartitionExecutors.isEmpty()) { PartitionExecutor partitionExecutor = cachedPartitionExecutors.getFirst(); - // Reset window functions for new partition partitionExecutor.resetWindowFunctions(); while (System.nanoTime() - startTime < maxRuntime @@ -333,19 +233,9 @@ public class TableWindowOperator implements ProcessOperator { } } - // Reach partition end, but builder is not full yet return null; } - private List<Column> extractPartitionColumns(TsBlock tsBlock) { - List<Column> partitionColumns = new ArrayList<>(partitionChannels.size()); - for (int channel : partitionChannels) { - Column partitionColumn = tsBlock.getColumn(channel); - partitionColumns.add(partitionColumn); - } - return partitionColumns; - } - private TsBlock getTsBlockFromTsBlockBuilder() { TsBlock result = tsBlockBuilder.build( @@ -358,13 +248,14 @@ public class TableWindowOperator implements ProcessOperator { public boolean hasNext() throws Exception { return !cachedPartitionExecutors.isEmpty() || inputOperator.hasNext() - || !cachedTsBlocks.isEmpty() + || !partitionCache.isEmpty() || !tsBlockBuilder.isEmpty(); } @Override public void close() throws Exception { inputOperator.close(); + partitionCache.close(); if (totalMemorySize != 0) { memoryReservationManager.releaseMemoryCumulatively(totalMemorySize); } @@ -375,19 +266,19 @@ public class TableWindowOperator implements ProcessOperator { return !this.hasNextWithTimer(); } - private void reserveOneTsBlockMemory(TsBlock tsBlock) { - long reserved = tsBlock.getTotalInstanceSize(); + private void addSliceToCache(Slice slice) { + long reserved = slice.getEstimatedSize(); memoryReservationManager.reserveMemoryCumulatively(reserved); totalMemorySize += reserved; maxUsedMemory = Math.max(maxUsedMemory, totalMemorySize); operatorContext.recordSpecifiedInfo(MAX_RESERVED_MEMORY, Long.toString(maxUsedMemory)); + partitionCache.addSlice(slice); } - private void releaseAllCachedTsBlockMemory() { - long released = cachedTsBlocks.stream().mapToInt(TsBlock::getTotalInstanceSize).sum(); + private void releasePartitionCacheMemory() { + long released = partitionCache.getEstimatedSize(); memoryReservationManager.releaseMemoryCumulatively(released); totalMemorySize -= released; - // No need to update maxUsedMemory operatorContext.recordSpecifiedInfo(MAX_RESERVED_MEMORY, Long.toString(maxUsedMemory)); } @@ -415,6 +306,7 @@ public class TableWindowOperator implements ProcessOperator { return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator) + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) - + tsBlockBuilder.getRetainedSizeInBytes(); + + tsBlockBuilder.getRetainedSizeInBytes() + + partitionCache.getEstimatedSize(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java index b8acd7fae11..ac23bbc9fef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.process.window.partition; +import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; import org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.ColumnList; import org.apache.tsfile.block.column.Column; @@ -27,40 +28,45 @@ import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.utils.Binary; import java.util.ArrayList; -import java.util.Collections; import java.util.List; public class Partition { - private final List<TsBlock> tsBlocks; + private final List<Column[]> segments; private int cachedPositionCount = -1; public Partition(List<TsBlock> tsBlocks, int startIndexInFirstBlock, int endIndexInLastBlock) { + this.segments = new ArrayList<>(tsBlocks.size()); if (tsBlocks.size() == 1) { int length = endIndexInLastBlock - startIndexInFirstBlock; - this.tsBlocks = - Collections.singletonList(tsBlocks.get(0).getRegion(startIndexInFirstBlock, length)); - return; + TsBlock region = tsBlocks.get(0).getRegion(startIndexInFirstBlock, length); + segments.add(region.getValueColumns()); + } else { + TsBlock firstBlock = tsBlocks.get(0).subTsBlock(startIndexInFirstBlock); + segments.add(firstBlock.getValueColumns()); + for (int i = 1; i < tsBlocks.size() - 1; i++) { + segments.add(tsBlocks.get(i).getValueColumns()); + } + TsBlock lastBlock = tsBlocks.get(tsBlocks.size() - 1).getRegion(0, endIndexInLastBlock); + segments.add(lastBlock.getValueColumns()); } + } - this.tsBlocks = new ArrayList<>(tsBlocks.size()); - // First TsBlock - TsBlock firstBlock = tsBlocks.get(0).subTsBlock(startIndexInFirstBlock); - this.tsBlocks.add(firstBlock); - // Middle TsBlock - for (int i = 1; i < tsBlocks.size() - 1; i++) { - this.tsBlocks.add(tsBlocks.get(i)); + public Partition(List<Slice> slices) { + this.segments = new ArrayList<>(slices.size()); + for (Slice slice : slices) { + segments.add(slice.getRequiredColumns()); } - // Last TsBlock - TsBlock lastBlock = tsBlocks.get(tsBlocks.size() - 1).getRegion(0, endIndexInLastBlock); - this.tsBlocks.add(lastBlock); + } + + private Partition(List<Column[]> segments, boolean directSegments) { + this.segments = segments; } public int getPositionCount() { if (cachedPositionCount == -1) { - // Lazy initialized cachedPositionCount = 0; - for (TsBlock block : tsBlocks) { - cachedPositionCount += block.getPositionCount(); + for (Column[] segment : segments) { + cachedPositionCount += segment[0].getPositionCount(); } } @@ -68,144 +74,149 @@ public class Partition { } public int getValueColumnCount() { - return tsBlocks.get(0).getValueColumnCount(); - } - - public TsBlock getTsBlock(int tsBlockIndex) { - return tsBlocks.get(tsBlockIndex); + return segments.get(0).length; } public List<Column[]> getAllColumns() { - List<Column[]> allColumns = new ArrayList<>(); - for (TsBlock block : tsBlocks) { - allColumns.add(block.getAllColumns()); - } - - return allColumns; + return segments; } public boolean getBoolean(int channel, int rowIndex) { PartitionIndex partitionIndex = getPartitionIndex(rowIndex); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - - TsBlock tsBlock = tsBlocks.get(tsBlockIndex); - return tsBlock.getColumn(channel).getBoolean(offsetInTsBlock); + int segmentIndex = partitionIndex.getSegmentIndex(); + int offset = partitionIndex.getOffsetInSegment(); + return segments.get(segmentIndex)[channel].getBoolean(offset); } public int getInt(int channel, int rowIndex) { PartitionIndex partitionIndex = getPartitionIndex(rowIndex); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - - TsBlock tsBlock = tsBlocks.get(tsBlockIndex); - return tsBlock.getColumn(channel).getInt(offsetInTsBlock); + int segmentIndex = partitionIndex.getSegmentIndex(); + int offset = partitionIndex.getOffsetInSegment(); + return segments.get(segmentIndex)[channel].getInt(offset); } public long getLong(int channel, int rowIndex) { PartitionIndex partitionIndex = getPartitionIndex(rowIndex); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - - TsBlock tsBlock = tsBlocks.get(tsBlockIndex); - return tsBlock.getColumn(channel).getLong(offsetInTsBlock); + int segmentIndex = partitionIndex.getSegmentIndex(); + int offset = partitionIndex.getOffsetInSegment(); + return segments.get(segmentIndex)[channel].getLong(offset); } public float getFloat(int channel, int rowIndex) { PartitionIndex partitionIndex = getPartitionIndex(rowIndex); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - - TsBlock tsBlock = tsBlocks.get(tsBlockIndex); - return tsBlock.getColumn(channel).getFloat(offsetInTsBlock); + int segmentIndex = partitionIndex.getSegmentIndex(); + int offset = partitionIndex.getOffsetInSegment(); + return segments.get(segmentIndex)[channel].getFloat(offset); } public double getDouble(int channel, int rowIndex) { PartitionIndex partitionIndex = getPartitionIndex(rowIndex); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - - TsBlock tsBlock = tsBlocks.get(tsBlockIndex); - return tsBlock.getColumn(channel).getDouble(offsetInTsBlock); + int segmentIndex = partitionIndex.getSegmentIndex(); + int offset = partitionIndex.getOffsetInSegment(); + return segments.get(segmentIndex)[channel].getDouble(offset); } public Binary getBinary(int channel, int rowIndex) { PartitionIndex partitionIndex = getPartitionIndex(rowIndex); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - - TsBlock tsBlock = tsBlocks.get(tsBlockIndex); - return tsBlock.getColumn(channel).getBinary(offsetInTsBlock); + int segmentIndex = partitionIndex.getSegmentIndex(); + int offset = partitionIndex.getOffsetInSegment(); + return segments.get(segmentIndex)[channel].getBinary(offset); } public boolean isNull(int channel, int rowIndex) { PartitionIndex partitionIndex = getPartitionIndex(rowIndex); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - - TsBlock tsBlock = tsBlocks.get(tsBlockIndex); - return tsBlock.getColumn(channel).isNull(offsetInTsBlock); + int segmentIndex = partitionIndex.getSegmentIndex(); + int offset = partitionIndex.getOffsetInSegment(); + return segments.get(segmentIndex)[channel].isNull(offset); } public void writeTo(ColumnBuilder builder, int channel, int rowIndex) { PartitionIndex partitionIndex = getPartitionIndex(rowIndex); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - - Column column = tsBlocks.get(tsBlockIndex).getColumn(channel); - builder.write(column, offsetInTsBlock); + int segmentIndex = partitionIndex.getSegmentIndex(); + int offset = partitionIndex.getOffsetInSegment(); + Column column = segments.get(segmentIndex)[channel]; + builder.write(column, offset); } public static class PartitionIndex { - private final int tsBlockIndex; - private final int offsetInTsBlock; + private final int segmentIndex; + private final int offsetInSegment; - PartitionIndex(int tsBlockIndex, int offsetInTsBlock) { - this.tsBlockIndex = tsBlockIndex; - this.offsetInTsBlock = offsetInTsBlock; + PartitionIndex(int segmentIndex, int offsetInSegment) { + this.segmentIndex = segmentIndex; + this.offsetInSegment = offsetInSegment; } - public int getTsBlockIndex() { - return tsBlockIndex; + public int getSegmentIndex() { + return segmentIndex; } - public int getOffsetInTsBlock() { - return offsetInTsBlock; + public int getOffsetInSegment() { + return offsetInSegment; } } // start and end are indexes within partition // Both of them are inclusive, i.e. [start, end] public Partition getRegion(int start, int end) { - PartitionIndex startPartitionIndex = getPartitionIndex(start); - PartitionIndex endPartitionIndex = getPartitionIndex(end); - - List<TsBlock> tsBlockList = new ArrayList<>(); - int startTsBlockIndex = startPartitionIndex.getTsBlockIndex(); - int endTsBlockIndex = endPartitionIndex.getTsBlockIndex(); - for (int i = startTsBlockIndex; i <= endTsBlockIndex; i++) { - tsBlockList.add(tsBlocks.get(i)); + PartitionIndex startPI = getPartitionIndex(start); + PartitionIndex endPI = getPartitionIndex(end); + + int startSeg = startPI.getSegmentIndex(); + int endSeg = endPI.getSegmentIndex(); + int columnCount = segments.get(0).length; + + List<Column[]> regionSegments = new ArrayList<>(); + + if (startSeg == endSeg) { + int offset = startPI.getOffsetInSegment(); + int length = endPI.getOffsetInSegment() - offset + 1; + Column[] cols = segments.get(startSeg); + Column[] region = new Column[columnCount]; + for (int c = 0; c < columnCount; c++) { + region[c] = cols[c].getRegion(offset, length); + } + regionSegments.add(region); + } else { + // First segment + Column[] firstCols = segments.get(startSeg); + int firstOffset = startPI.getOffsetInSegment(); + int firstLen = firstCols[0].getPositionCount() - firstOffset; + Column[] firstRegion = new Column[columnCount]; + for (int c = 0; c < columnCount; c++) { + firstRegion[c] = firstCols[c].getRegion(firstOffset, firstLen); + } + regionSegments.add(firstRegion); + + // Middle segments + for (int i = startSeg + 1; i < endSeg; i++) { + regionSegments.add(segments.get(i)); + } + + // Last segment + Column[] lastCols = segments.get(endSeg); + int lastLen = endPI.getOffsetInSegment() + 1; + Column[] lastRegion = new Column[columnCount]; + for (int c = 0; c < columnCount; c++) { + lastRegion[c] = lastCols[c].getRegion(0, lastLen); + } + regionSegments.add(lastRegion); } - int startIndexInFirstBlock = startPartitionIndex.getOffsetInTsBlock(); - int endIndexInLastBlock = endPartitionIndex.getOffsetInTsBlock(); - return new Partition(tsBlockList, startIndexInFirstBlock, endIndexInLastBlock + 1); + return new Partition(regionSegments, true); } - // rowIndex is index within partition public PartitionIndex getPartitionIndex(int rowIndex) { - int tsBlockIndex = 0; - while (tsBlockIndex < tsBlocks.size() - && rowIndex >= tsBlocks.get(tsBlockIndex).getPositionCount()) { - rowIndex -= tsBlocks.get(tsBlockIndex).getPositionCount(); - // Enter next TsBlock - tsBlockIndex++; + int segmentIndex = 0; + while (segmentIndex < segments.size() + && rowIndex >= segments.get(segmentIndex)[0].getPositionCount()) { + rowIndex -= segments.get(segmentIndex)[0].getPositionCount(); + segmentIndex++; } - if (tsBlockIndex != tsBlocks.size()) { - return new PartitionIndex(tsBlockIndex, rowIndex); + if (segmentIndex != segments.size()) { + return new PartitionIndex(segmentIndex, rowIndex); } else { - // Unlikely throw new IndexOutOfBoundsException("Index out of Partition's bounds!"); } } @@ -215,8 +226,8 @@ public class Partition { for (Integer sortedChannel : sortedChannels) { List<Column> columns = new ArrayList<>(); - for (TsBlock tsBlock : tsBlocks) { - columns.add(tsBlock.getColumn(sortedChannel)); + for (Column[] segment : segments) { + columns.add(segment[sortedChannel]); } columnLists.add(new ColumnList(columns)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java index 20167955d79..d5a68ea5796 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.R import org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.RowComparator; import com.google.common.collect.ImmutableList; -import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; @@ -69,17 +68,42 @@ public final class PartitionExecutor { List<WindowFunction> windowFunctions, List<FrameInfo> frameInfoList, List<Integer> sortChannels) { - // Partition - this.partition = new Partition(tsBlocks, startIndexInFirstBlock, endIndexInLastBlock); - this.partitionStart = startIndexInFirstBlock; - this.partitionEnd = startIndexInFirstBlock + this.partition.getPositionCount(); - // Window functions and frames + this( + new Partition(tsBlocks, startIndexInFirstBlock, endIndexInLastBlock), + dataTypes, + startIndexInFirstBlock, + outputChannels, + windowFunctions, + frameInfoList, + sortChannels); + } + + public PartitionExecutor( + Partition partition, + List<TSDataType> dataTypes, + List<Integer> outputChannels, + List<WindowFunction> windowFunctions, + List<FrameInfo> frameInfoList, + List<Integer> sortChannels) { + this(partition, dataTypes, 0, outputChannels, windowFunctions, frameInfoList, sortChannels); + } + + private PartitionExecutor( + Partition partition, + List<TSDataType> dataTypes, + int partitionStart, + List<Integer> outputChannels, + List<WindowFunction> windowFunctions, + List<FrameInfo> frameInfoList, + List<Integer> sortChannels) { + this.partition = partition; + this.partitionStart = partitionStart; + this.partitionEnd = partitionStart + this.partition.getPositionCount(); this.windowFunctions = ImmutableList.copyOf(windowFunctions); this.frames = new ArrayList<>(); this.outputChannels = ImmutableList.copyOf(outputChannels); - // Prepare for peer group comparing List<TSDataType> sortDataTypes = new ArrayList<>(); for (int channel : sortChannels) { TSDataType dataType = dataTypes.get(channel); @@ -118,7 +142,6 @@ public final class PartitionExecutor { peerGroupEnd - partitionStart - 1); break; default: - // Unreachable throw new UnsupportedOperationException("Unreachable!"); } } @@ -131,21 +154,15 @@ public final class PartitionExecutor { } public void processNextRow(TsBlockBuilder builder) { - // Copy origin data int index = currentPosition - partitionStart; - Partition.PartitionIndex partitionIndex = partition.getPartitionIndex(index); - int tsBlockIndex = partitionIndex.getTsBlockIndex(); - int offsetInTsBlock = partitionIndex.getOffsetInTsBlock(); - TsBlock tsBlock = partition.getTsBlock(tsBlockIndex); int channel = 0; for (int i = 0; i < outputChannels.size(); i++) { - Column column = tsBlock.getColumn(outputChannels.get(i)); ColumnBuilder columnBuilder = builder.getColumnBuilder(i); - if (column.isNull(offsetInTsBlock)) { + if (partition.isNull(outputChannels.get(i), index)) { columnBuilder.appendNull(); } else { - columnBuilder.write(column, offsetInTsBlock); + partition.writeTo(columnBuilder, outputChannels.get(i), index); } channel++; }
