This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/AggOpMemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a8e2f90072122a6de7d9c2f5077ecf9ff5e92535
Merge: f3983e0759 30ff39724d
Author: Minghui Liu <[email protected]>
AuthorDate: Wed Aug 10 23:23:03 2022 +0800

    Merge branch 'MemoryControl' of github.com:apache/iotdb into MemoryControl

 .../iotdb/db/mpp/execution/operator/Operator.java  | 12 ++++
 .../execution/operator/process/FillOperator.java   |  8 ++-
 .../execution/operator/process/LimitOperator.java  |  5 ++
 .../operator/process/LinearFillOperator.java       |  8 ++-
 .../execution/operator/process/OffsetOperator.java |  5 ++
 .../execution/operator/process/SortOperator.java   |  5 ++
 .../process/join/RowBasedTimeJoinOperator.java     | 22 +++++-
 .../operator/process/join/TimeJoinOperator.java    | 22 +++++-
 .../process/last/LastQueryCollectOperator.java     | 10 +++
 .../process/last/LastQueryMergeOperator.java       | 31 ++++++--
 .../operator/process/last/LastQueryOperator.java   | 14 +++-
 .../process/last/LastQuerySortOperator.java        | 19 +++--
 .../process/last/UpdateLastCacheOperator.java      |  5 ++
 .../operator/source/AlignedSeriesScanOperator.java |  5 ++
 .../operator/source/ExchangeOperator.java          |  5 ++
 .../operator/source/LastCacheScanOperator.java     |  5 ++
 .../operator/source/SeriesScanOperator.java        |  5 ++
 .../mpp/execution/operator/OperatorMemoryTest.java | 84 +++++++++++++++++-----
 18 files changed, 229 insertions(+), 41 deletions(-)

diff --cc 
server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index abc5ab3a4b,349e98758b..67be7fd353
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@@ -423,67 -463,8 +468,68 @@@ public class OperatorMemoryTest 
          new LinearFillOperator(
              Mockito.mock(OperatorContext.class), new LinearFill[] {null, 
null}, child);
  
-     assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory());
+     assertEquals(2048 * 3 + 512L, 
linearFillOperator.calculateMaxPeekMemory());
      assertEquals(1024, linearFillOperator.calculateMaxReturnSize());
+     assertEquals(512, 
linearFillOperator.calculateRetainedSizeAfterCallingNext());
    }
 +
 +  @Test
 +  public void seriesAggregationScanOperatorTest() {
 +    ExecutorService instanceNotificationExecutor =
 +        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
 +    try {
 +      MeasurementPath measurementPath =
 +          new MeasurementPath(
 +              "root.SeriesAggregationScanOperatorTest.device0.sensor0", 
TSDataType.INT32);
 +      Set<String> allSensors = Sets.newHashSet("sensor0");
 +
 +      QueryId queryId = new QueryId("stub_query");
 +      FragmentInstanceId instanceId =
 +          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
 +      FragmentInstanceStateMachine stateMachine =
 +          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
 +      FragmentInstanceContext fragmentInstanceContext =
 +          createFragmentInstanceContext(instanceId, stateMachine);
 +      PlanNodeId planNodeId = new PlanNodeId("1");
 +      fragmentInstanceContext.addOperatorContext(
 +          1, planNodeId, 
SeriesAggregationScanOperatorTest.class.getSimpleName());
 +
 +      SeriesAggregationScanOperator seriesAggregationScanOperator =
 +          new SeriesAggregationScanOperator(
 +              planNodeId,
 +              measurementPath,
 +              allSensors,
 +              fragmentInstanceContext.getOperatorContexts().get(0),
 +              Arrays.asList(
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.COUNT, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE),
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.MAX_VALUE, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE),
 +                  new Aggregator(
 +                      AccumulatorFactory.createAccumulator(
 +                          AggregationType.MIN_TIME, TSDataType.INT32, true),
 +                      AggregationStep.SINGLE)),
 +              null,
 +              true,
 +              null);
 +
 +      assertEquals(
 +          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
 +              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
 +          seriesAggregationScanOperator.calculateMaxPeekMemory());
 +      assertEquals(
 +          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
 +              + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
 +          seriesAggregationScanOperator.calculateMaxReturnSize());
 +    } catch (IllegalPathException e) {
 +      e.printStackTrace();
 +      fail();
 +    } finally {
 +      instanceNotificationExecutor.shutdown();
 +    }
 +  }
  }

Reply via email to