This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch MemoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d0a2ba4e967ee9b33ff89cbe2ace68c433cc1bcf
Author: zyk990424 <[email protected]>
AuthorDate: Fri Aug 12 12:46:16 2022 +0800

    add schema operator memory UT
---
 .../schema/NodeManageMemoryMergeOperator.java      |   4 +-
 .../operator/schema/NodePathsCountOperator.java    |   4 +-
 .../mpp/execution/operator/OperatorMemoryTest.java | 454 +++++++++++++++++++++
 3 files changed, 458 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index b7688b023a..315872a27f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -136,7 +136,7 @@ public class NodeManageMemoryMergeOperator implements 
ProcessOperator {
   public long calculateMaxPeekMemory() {
     // todo calculate the result based on all the scan node; currently, this 
is shadowed by
     // schemaQueryMergeNode
-    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxPeekMemory());
   }
 
   @Override
@@ -146,6 +146,6 @@ public class NodeManageMemoryMergeOperator implements 
ProcessOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 
child.calculateRetainedSizeAfterCallingNext();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 37b05c4e58..1417f302e6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -104,7 +104,7 @@ public class NodePathsCountOperator implements 
ProcessOperator {
   public long calculateMaxPeekMemory() {
     // todo calculate the result based on all the scan node; currently, this 
is shadowed by
     // schemaQueryMergeNode
-    return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxPeekMemory());
   }
 
   @Override
@@ -114,6 +114,6 @@ public class NodePathsCountOperator implements 
ProcessOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 
child.calculateRetainedSizeAfterCallingNext();
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index c718942108..d0598e6494 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -45,6 +45,21 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOpe
 import 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.LevelTimeSeriesCountOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.NodeManageMemoryMergeOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsConvertOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsCountOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsSchemaScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.PathsUsingTemplateScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchMergeOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryMergeOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryOrderByHeatOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
@@ -72,6 +87,7 @@ import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
@@ -619,4 +635,442 @@ public class OperatorMemoryTest {
         operator.calculateMaxReturnSize());
     assertEquals(512, operator.calculateRetainedSizeAfterCallingNext());
   }
+
+  @Test
+  public void TimeSeriesSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      TimeSeriesSchemaScanOperator operator =
+          new TimeSeriesSchemaScanOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              0,
+              0,
+              null,
+              null,
+              null,
+              false,
+              false,
+              false,
+              null);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void DeviceSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      DevicesSchemaScanOperator operator =
+          new DevicesSchemaScanOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              0,
+              0,
+              null,
+              false,
+              false);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void PathsUsingTemplateScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      PathsUsingTemplateScanOperator operator =
+          new PathsUsingTemplateScanOperator(
+              planNodeId, 
fragmentInstanceContext.getOperatorContexts().get(0), 0);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void TimeSeriesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      TimeSeriesCountOperator operator =
+          new TimeSeriesCountOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              false,
+              null,
+              null,
+              false);
+
+      assertEquals(4L, operator.calculateMaxPeekMemory());
+      assertEquals(4L, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void LevelTimeSeriesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      LevelTimeSeriesCountOperator operator =
+          new LevelTimeSeriesCountOperator(
+              planNodeId,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              false,
+              4,
+              null,
+              null,
+              false);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void DevicesCountOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      DevicesCountOperator operator =
+          new DevicesCountOperator(
+              planNodeId, 
fragmentInstanceContext.getOperatorContexts().get(0), null, false);
+
+      assertEquals(4L, operator.calculateMaxPeekMemory());
+      assertEquals(4L, operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void SchemaQueryMergeOperatorTest() {
+    QueryId queryId = new QueryId("stub_query");
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, 
child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 
child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    SchemaQueryMergeOperator operator =
+        new SchemaQueryMergeOperator(
+            queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), 
children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void CountMergeOperatorTest() {
+    QueryId queryId = new QueryId("stub_query");
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, 
child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 
child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    CountMergeOperator operator =
+        new CountMergeOperator(
+            queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), 
children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void SchemaFetchScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      SchemaFetchScanOperator operator =
+          new SchemaFetchScanOperator(
+              planNodeId, 
fragmentInstanceContext.getOperatorContexts().get(0), null, null, null);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void SchemaFetchMergeOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, 
child.calculateMaxPeekMemory());
+      expectedMaxReturnSize = Math.max(expectedMaxReturnSize, 
child.calculateMaxReturnSize());
+      expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext();
+      children.add(child);
+    }
+
+    SchemaFetchMergeOperator operator =
+        new SchemaFetchMergeOperator(Mockito.mock(OperatorContext.class), 
children, null);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void SchemaQueryOrderByHeatOperatorTest() {
+    List<Operator> children = new ArrayList<>(4);
+
+    long expectedMaxReturnSize = 0;
+    long expectedMaxPeekMemory = 0;
+    long expectedRetainedSize = 0;
+
+    for (int i = 0; i < 4; i++) {
+      Operator child = Mockito.mock(Operator.class);
+      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+      
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+      expectedMaxPeekMemory += child.calculateMaxReturnSize();
+      expectedMaxReturnSize += child.calculateMaxReturnSize();
+      expectedRetainedSize +=
+          child.calculateRetainedSizeAfterCallingNext() + 
child.calculateMaxReturnSize();
+      children.add(child);
+    }
+
+    SchemaQueryOrderByHeatOperator operator =
+        new 
SchemaQueryOrderByHeatOperator(Mockito.mock(OperatorContext.class), children);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodePathsSchemaScanOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+      NodePathsSchemaScanOperator operator =
+          new NodePathsSchemaScanOperator(
+              planNodeId, 
fragmentInstanceContext.getOperatorContexts().get(0), null, 4);
+
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxPeekMemory());
+      assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
operator.calculateMaxReturnSize());
+      assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void NodePathsConvertOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory = child.calculateMaxPeekMemory() + 
child.calculateMaxReturnSize();
+    long expectedMaxReturnSize = child.calculateMaxReturnSize();
+    long expectedRetainedSize = child.calculateRetainedSizeAfterCallingNext();
+
+    NodePathsConvertOperator operator =
+        new NodePathsConvertOperator(Mockito.mock(OperatorContext.class), 
child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodePathsCountOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory =
+        Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxPeekMemory());
+    long expectedMaxReturnSize =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxReturnSize());
+    long expectedRetainedSize =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 
child.calculateRetainedSizeAfterCallingNext();
+
+    NodePathsCountOperator operator =
+        new NodePathsCountOperator(Mockito.mock(OperatorContext.class), child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
+  }
+
+  @Test
+  public void NodeManageMemoryMergeOperatorTest() {
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+    long expectedMaxPeekMemory =
+        Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxPeekMemory());
+    long expectedMaxReturnSize =
+        Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, 
child.calculateMaxReturnSize());
+    long expectedRetainedSize =
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + 
child.calculateRetainedSizeAfterCallingNext();
+
+    NodeManageMemoryMergeOperator operator =
+        new NodeManageMemoryMergeOperator(
+            Mockito.mock(OperatorContext.class), Collections.emptySet(), 
child);
+
+    assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory());
+    assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize());
+    assertEquals(expectedRetainedSize, 
operator.calculateRetainedSizeAfterCallingNext());
+  }
 }

Reply via email to