This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch CorrectBinaryMemory in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 473a75033d5278fb8539dc405bbbd84876b09b51 Author: JackieTien97 <[email protected]> AuthorDate: Fri Feb 7 09:02:34 2025 +0800 use getSizeInBytes instead of getRetainedSizeInBytes in some places --- .../db/queryengine/execution/exchange/SharedTsBlockQueue.java | 8 ++++---- .../db/queryengine/execution/exchange/sink/SinkChannel.java | 11 +++++------ .../execution/exchange/source/LocalSourceHandle.java | 4 +--- .../db/queryengine/execution/operator/AbstractOperator.java | 2 +- .../execution/operator/process/AbstractSortOperator.java | 2 +- .../queryengine/execution/operator/process/SortOperator.java | 2 +- .../execution/operator/process/TableStreamSortOperator.java | 2 +- .../aggregation/grouped/HashAggregationOperator.java | 4 +--- .../aggregation/grouped/StreamingAggregationOperator.java | 9 ++++----- .../aggregation/grouped/StreamingHashAggregationOperator.java | 4 +--- .../apache/iotdb/db/queryengine/execution/exchange/Utils.java | 3 +++ .../db/queryengine/execution/operator/OperatorMemoryTest.java | 2 ++ pom.xml | 2 +- 13 files changed, 26 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index b3b94f66282..f3810426398 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -196,8 +196,8 @@ public class SharedTsBlockQueue { localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, - tsBlock.getRetainedSizeInBytes()); - bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes(); + tsBlock.getSizeInBytes()); + bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes(); // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to // corresponding LocalSinkChannel. if (sinkChannel != null) { @@ -236,10 +236,10 @@ public class SharedTsBlockQueue { localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, - tsBlock.getRetainedSizeInBytes(), + tsBlock.getSizeInBytes(), maxBytesCanReserve); blockedOnMemory = pair.left; - bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes(); + bufferRetainedSizeInBytes += tsBlock.getSizeInBytes(); // reserve memory failed, we should wait until there is enough memory if (!Boolean.TRUE.equals(pair.right)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java index 9f29a8a753f..8915938eb03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java @@ -201,7 +201,7 @@ public class SinkChannel implements ISinkChannel { if (noMoreTsBlocks) { return; } - long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes(); + long sizeInBytes = tsBlock.getSizeInBytes(); int startSequenceId; startSequenceId = nextSequenceId; blocked = @@ -211,17 +211,16 @@ public class SinkChannel implements ISinkChannel { localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, - retainedSizeInBytes, + sizeInBytes, maxBytesCanReserve) .left; - bufferRetainedSizeInBytes += retainedSizeInBytes; + bufferRetainedSizeInBytes += sizeInBytes; sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize)); nextSequenceId += 1; - currentTsBlockSize = retainedSizeInBytes; + currentTsBlockSize = sizeInBytes; - // TODO: consider merge multiple NewDataBlockEvent for less network traffic. - submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes)); + submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(sizeInBytes)); } finally { DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost( SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java index 4da89f72d07..63feb571cb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java @@ -124,9 +124,7 @@ public class LocalSourceHandle implements ISourceHandle { if (tsBlock != null) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( - "[GetTsBlockFromQueue] TsBlock:{} size:{}", - currSequenceId, - tsBlock.getRetainedSizeInBytes()); + "[GetTsBlockFromQueue] TsBlock:{} size:{}", currSequenceId, tsBlock.getSizeInBytes()); } currSequenceId++; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java index 1278dc15d8d..5c6d731f9a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java @@ -44,7 +44,7 @@ public abstract class AbstractOperator implements Operator { long oneTupleSize = Math.max( 1, - (tsBlock.getRetainedSizeInBytes() - tsBlock.getTotalInstanceSize()) + (tsBlock.getSizeInBytes() - tsBlock.getTotalInstanceSize()) / tsBlock.getPositionCount()); if (oneTupleSize > maxReturnSize) { // make sure at least one-tuple-at-a-time diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java index c017cded6b7..0bfa1d3e767 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java @@ -162,7 +162,7 @@ public abstract class AbstractSortOperator implements ProcessOperator { } protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException { - long bytesSize = tsBlock.getRetainedSizeInBytes(); + long bytesSize = tsBlock.getSizeInBytes(); if (bytesSize + cachedBytes < sortBufferManager.getSortBufferSize()) { cachedBytes += bytesSize; for (int i = 0; i < tsBlock.getPositionCount(); i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java index eb556e1cfac..31029a30da6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java @@ -62,7 +62,7 @@ public abstract class SortOperator extends AbstractSortOperator { if (tsBlock == null) { return null; } - dataSize += tsBlock.getRetainedSizeInBytes(); + dataSize += tsBlock.getSizeInBytes(); cacheTsBlock(tsBlock); } catch (IoTDBException e) { clear(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java index 71e938f4f30..90565a85875 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java @@ -119,7 +119,7 @@ public class TableStreamSortOperator extends AbstractSortOperator { return null; } // record total sorted data size - dataSize += currentTsBlock.getRetainedSizeInBytes(); + dataSize += currentTsBlock.getSizeInBytes(); // if currentTsBlock line count + remainingCount is still less than minLinesToOutput, just // cache it diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java index 31ddcbe293b..ce7e91f8448 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java @@ -45,8 +45,6 @@ public class HashAggregationOperator extends AbstractOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(HashAggregationOperator.class); - private final OperatorContext operatorContext; - private final Operator child; private final List<Type> groupByTypes; @@ -81,7 +79,7 @@ public class HashAggregationOperator extends AbstractOperator { long maxPartialMemory, boolean spillEnabled, long unspillMemoryLimit) { - this.operatorContext = operatorContext; + super.operatorContext = operatorContext; this.child = child; this.groupByTypes = ImmutableList.copyOf(groupByTypes); this.groupByChannels = ImmutableList.copyOf(groupByChannels); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java index 7b0d37c365f..3ed2bbb1bde 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.db.utils.datastructure.SortKey; @@ -52,8 +52,6 @@ public class StreamingAggregationOperator extends AbstractOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(StreamingAggregationOperator.class); - private final OperatorContext operatorContext; - private final Operator child; private final List<TableAggregator> aggregators; @@ -85,7 +83,7 @@ public class StreamingAggregationOperator extends AbstractOperator { long maxPartialMemory, boolean spillEnabled, long unSpillMemoryLimit) { - this.operatorContext = operatorContext; + super.operatorContext = operatorContext; this.child = child; this.groupByChannels = Ints.toArray(groupByChannels); this.groupKeyComparator = groupKeyComparator; @@ -208,7 +206,8 @@ public class StreamingAggregationOperator extends AbstractOperator { outputs.add( resultBuilder.build( new RunLengthEncodedColumn( - TableScanOperator.TIME_COLUMN_TEMPLATE, resultBuilder.getPositionCount()))); + AbstractTableScanOperator.TIME_COLUMN_TEMPLATE, + resultBuilder.getPositionCount()))); resultBuilder.reset(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java index 374a6964de2..8862a813718 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java @@ -53,8 +53,6 @@ public class StreamingHashAggregationOperator extends AbstractOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(StreamingHashAggregationOperator.class); - private final OperatorContext operatorContext; - private final Operator child; private final int[] preGroupedChannels; @@ -103,7 +101,7 @@ public class StreamingHashAggregationOperator extends AbstractOperator { long maxPartialMemory, boolean spillEnabled, long unSpillMemoryLimit) { - this.operatorContext = operatorContext; + super.operatorContext = operatorContext; this.child = child; this.preGroupedChannels = Ints.toArray(preGroupedChannels); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java index 0c6eac0dbed..327d4a34c39 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java @@ -41,6 +41,7 @@ public class Utils { for (int i = 0; i < numOfTsBlocks; i++) { TsBlock mockTsBlock = Mockito.mock(TsBlock.class); Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize); + Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize); mockTsBlocks.add(mockTsBlock); } @@ -50,6 +51,7 @@ public class Utils { public static TsBlock createMockTsBlock(long mockTsBlockSize) { TsBlock mockTsBlock = Mockito.mock(TsBlock.class); Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize); + Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize); return mockTsBlock; } @@ -144,6 +146,7 @@ public class Utils { TsBlockSerde mockTsBlockSerde = Mockito.mock(TsBlockSerde.class); TsBlock mockTsBlock = Mockito.mock(TsBlock.class); Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize); + Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize); Mockito.when(mockTsBlockSerde.deserialize(Mockito.any(ByteBuffer.class))) .thenReturn(mockTsBlock); return mockTsBlockSerde; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java index 1bba28545bf..e7f662ab573 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java @@ -253,6 +253,7 @@ public class OperatorMemoryTest { public void lastCacheScanOperatorTest() { TsBlock tsBlock = Mockito.mock(TsBlock.class); Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(1024L); + Mockito.when(tsBlock.getSizeInBytes()).thenReturn(1024L); LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(null, null, tsBlock); assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory()); @@ -383,6 +384,7 @@ public class OperatorMemoryTest { public void lastQuerySortOperatorTest() { TsBlock tsBlock = Mockito.mock(TsBlock.class); Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L); + Mockito.when(tsBlock.getSizeInBytes()).thenReturn(16 * 1024L); Mockito.when(tsBlock.getPositionCount()).thenReturn(16); List<Operator> children = new ArrayList<>(4); diff --git a/pom.xml b/pom.xml index 1df4651c4d7..2e6bdf4efae 100644 --- a/pom.xml +++ b/pom.xml @@ -167,7 +167,7 @@ <thrift.version>0.14.1</thrift.version> <xz.version>1.9</xz.version> <zstd-jni.version>1.5.6-3</zstd-jni.version> - <tsfile.version>2.1.0-250206-SNAPSHOT</tsfile.version> + <tsfile.version>2.1.0-SNAPSHOT</tsfile.version> </properties> <!-- if we claim dependencies in dependencyManagement, then we do not claim
