This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e41fd090b76 Fix incorrect use of Operator.next() in 
IdentitySinkOperatorTest
e41fd090b76 is described below

commit e41fd090b76e660beb9a584c2da2f14f262b272e
Author: Liao Lanyu <[email protected]>
AuthorDate: Thu Jul 13 13:59:35 2023 +0800

    Fix incorrect use of Operator.next() in IdentitySinkOperatorTest
---
 .../operator/sink/IdentitySinkOperatorTest.java    | 70 ++++++++++++++++------
 1 file changed, 51 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java
index 7cadc6a41b0..4c554ea4f89 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java
@@ -32,7 +32,6 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
@@ -69,7 +68,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class IdentitySinkOperatorTest {
-  private static final String TIME_JOIN_OPERATOR_TEST_SG = 
"root.identitySinkTest";
+  private static final String IDENTITY_SINK_TEST = "root.identitySinkTest";
   private final List<String> deviceIds = new ArrayList<>();
   private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
 
@@ -79,7 +78,7 @@ public class IdentitySinkOperatorTest {
   @Before
   public void setUp() throws MetadataException, IOException, 
WriteProcessException {
     SeriesReaderTestUtil.setUp(
-        measurementSchemas, deviceIds, seqResources, unSeqResources, 
TIME_JOIN_OPERATOR_TEST_SG);
+        measurementSchemas, deviceIds, seqResources, unSeqResources, 
IDENTITY_SINK_TEST);
   }
 
   @After
@@ -93,7 +92,7 @@ public class IdentitySinkOperatorTest {
         IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
     try {
       MeasurementPath measurementPath1 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
+          new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor0", 
TSDataType.INT32);
       Set<String> allSensors = new HashSet<>();
       allSensors.add("sensor0");
       allSensors.add("sensor1");
@@ -111,6 +110,10 @@ public class IdentitySinkOperatorTest {
       driverContext.addOperatorContext(2, planNodeId2, 
SeriesScanOperator.class.getSimpleName());
       driverContext.addOperatorContext(
           3, new PlanNodeId("3"), IdentitySinkOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      driverContext.addOperatorContext(4, planNodeId4, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      driverContext.addOperatorContext(5, planNodeId5, 
SeriesScanOperator.class.getSimpleName());
 
       SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
       scanOptionsBuilder.withAllSensors(allSensors);
@@ -124,10 +127,10 @@ public class IdentitySinkOperatorTest {
       seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
       seriesScanOperator1
           .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+          .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath2 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", 
TSDataType.INT32);
+          new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor1", 
TSDataType.INT32);
       SeriesScanOperator seriesScanOperator2 =
           new SeriesScanOperator(
               driverContext.getOperatorContexts().get(1),
@@ -138,26 +141,35 @@ public class IdentitySinkOperatorTest {
       seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
       seriesScanOperator2
           .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+          .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath3 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
+          new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor0", 
TSDataType.INT32);
       SeriesScanOperator seriesScanOperator3 =
           new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(1),
-              planNodeId2,
+              driverContext.getOperatorContexts().get(3),
+              planNodeId4,
               measurementPath3,
               Ordering.ASC,
               scanOptionsBuilder.build());
       seriesScanOperator3.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
       seriesScanOperator3
           .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+          .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS));
 
-      TsBlock identityTsBlock = null;
-      while (seriesScanOperator3.hasNext()) {
-        identityTsBlock = seriesScanOperator3.next();
-      }
+      MeasurementPath measurementPath4 =
+          new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor1", 
TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator4 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(4),
+              planNodeId5,
+              measurementPath4,
+              Ordering.ASC,
+              scanOptionsBuilder.build());
+      seriesScanOperator4.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator4
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS));
 
       ISinkHandle sinkHandle = Mockito.mock(ShuffleSinkHandle.class);
       Mockito.when(sinkHandle.isChannelClosed(0)).thenReturn(false);
@@ -168,6 +180,9 @@ public class IdentitySinkOperatorTest {
               Arrays.asList(seriesScanOperator1, seriesScanOperator2),
               new DownStreamChannelIndex(0),
               sinkHandle);
+      identitySinkOperator
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(1200, TimeUnit.MILLISECONDS));
 
       Assert.assertEquals(
           seriesScanOperator3.calculateMaxPeekMemory(),
@@ -176,7 +191,27 @@ public class IdentitySinkOperatorTest {
           seriesScanOperator3.calculateMaxReturnSize(),
           identitySinkOperator.calculateMaxReturnSize());
 
-      while (identitySinkOperator.isBlocked().isDone() && 
identitySinkOperator.hasNext()) {
+      while (seriesScanOperator3.hasNext()
+          && identitySinkOperator.isBlocked().isDone()
+          && identitySinkOperator.hasNext()) {
+        TsBlock identityTsBlock = seriesScanOperator3.next();
+        TsBlock tsBlock = identitySinkOperator.next();
+        if (tsBlock == null) {
+          continue;
+        }
+        assertEquals(1, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+        assertNotNull(identityTsBlock);
+        assertEquals(identityTsBlock.getPositionCount(), 
tsBlock.getPositionCount());
+        for (int i = 0; i < identityTsBlock.getPositionCount(); i++) {
+          assertEquals(identityTsBlock.getColumn(0).getInt(i), 
tsBlock.getColumn(0).getInt(i));
+        }
+      }
+
+      while (seriesScanOperator4.hasNext()
+          && identitySinkOperator.isBlocked().isDone()
+          && identitySinkOperator.hasNext()) {
+        TsBlock identityTsBlock = seriesScanOperator4.next();
         TsBlock tsBlock = identitySinkOperator.next();
         if (tsBlock == null) {
           continue;
@@ -189,9 +224,6 @@ public class IdentitySinkOperatorTest {
           assertEquals(identityTsBlock.getColumn(0).getInt(i), 
tsBlock.getColumn(0).getInt(i));
         }
       }
-      List<Operator> children = identitySinkOperator.getChildren();
-      Assert.assertNull(children.get(0));
-      Assert.assertNull(children.get(1));
 
     } catch (IllegalPathException e) {
       e.printStackTrace();

Reply via email to