This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/AggOpMemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a8e2f90072122a6de7d9c2f5077ecf9ff5e92535 Merge: f3983e0759 30ff39724d Author: Minghui Liu <[email protected]> AuthorDate: Wed Aug 10 23:23:03 2022 +0800 Merge branch 'MemoryControl' of github.com:apache/iotdb into MemoryControl .../iotdb/db/mpp/execution/operator/Operator.java | 12 ++++ .../execution/operator/process/FillOperator.java | 8 ++- .../execution/operator/process/LimitOperator.java | 5 ++ .../operator/process/LinearFillOperator.java | 8 ++- .../execution/operator/process/OffsetOperator.java | 5 ++ .../execution/operator/process/SortOperator.java | 5 ++ .../process/join/RowBasedTimeJoinOperator.java | 22 +++++- .../operator/process/join/TimeJoinOperator.java | 22 +++++- .../process/last/LastQueryCollectOperator.java | 10 +++ .../process/last/LastQueryMergeOperator.java | 31 ++++++-- .../operator/process/last/LastQueryOperator.java | 14 +++- .../process/last/LastQuerySortOperator.java | 19 +++-- .../process/last/UpdateLastCacheOperator.java | 5 ++ .../operator/source/AlignedSeriesScanOperator.java | 5 ++ .../operator/source/ExchangeOperator.java | 5 ++ .../operator/source/LastCacheScanOperator.java | 5 ++ .../operator/source/SeriesScanOperator.java | 5 ++ .../mpp/execution/operator/OperatorMemoryTest.java | 84 +++++++++++++++++----- 18 files changed, 229 insertions(+), 41 deletions(-) diff --cc server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java index abc5ab3a4b,349e98758b..67be7fd353 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java @@@ -423,67 -463,8 +468,68 @@@ public class OperatorMemoryTest new LinearFillOperator( Mockito.mock(OperatorContext.class), new LinearFill[] {null, null}, child); - assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory()); + assertEquals(2048 * 3 + 512L, linearFillOperator.calculateMaxPeekMemory()); assertEquals(1024, linearFillOperator.calculateMaxReturnSize()); + assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext()); } + + @Test + public void seriesAggregationScanOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + MeasurementPath measurementPath = + new MeasurementPath( + "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32); + Set<String> allSensors = Sets.newHashSet("sensor0"); + + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName()); + + SeriesAggregationScanOperator seriesAggregationScanOperator = + new SeriesAggregationScanOperator( + planNodeId, + measurementPath, + allSensors, + fragmentInstanceContext.getOperatorContexts().get(0), + Arrays.asList( + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.COUNT, TSDataType.INT32, true), + AggregationStep.SINGLE), + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.MAX_VALUE, TSDataType.INT32, true), + AggregationStep.SINGLE), + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.MIN_TIME, TSDataType.INT32, true), + AggregationStep.SINGLE)), + null, + true, + null); + + assertEquals( + 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + seriesAggregationScanOperator.calculateMaxPeekMemory()); + assertEquals( + 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + seriesAggregationScanOperator.calculateMaxReturnSize()); + } catch (IllegalPathException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } }
