This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch compute-resource-balance-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4ea3c003d9fcfb4f55d5f34247d3905da9263c51 Author: Liao Lanyu <[email protected]> AuthorDate: Thu Jul 13 13:59:35 2023 +0800 Fix incorrect use of Operator.next() in IdentitySinkOperatorTest --- .../operator/sink/IdentitySinkOperatorTest.java | 70 ++++++++++++++++------ 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java index 7cadc6a41b0..4c554ea4f89 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle; import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; -import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; @@ -69,7 +68,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class IdentitySinkOperatorTest { - private static final String TIME_JOIN_OPERATOR_TEST_SG = "root.identitySinkTest"; + private static final String IDENTITY_SINK_TEST = "root.identitySinkTest"; private final List<String> deviceIds = new ArrayList<>(); private final List<MeasurementSchema> measurementSchemas = new ArrayList<>(); @@ -79,7 +78,7 @@ public class IdentitySinkOperatorTest { @Before public void setUp() throws MetadataException, IOException, WriteProcessException { SeriesReaderTestUtil.setUp( - measurementSchemas, deviceIds, seqResources, unSeqResources, TIME_JOIN_OPERATOR_TEST_SG); + measurementSchemas, deviceIds, seqResources, unSeqResources, IDENTITY_SINK_TEST); } @After @@ -93,7 +92,7 @@ public class IdentitySinkOperatorTest { IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { MeasurementPath measurementPath1 = - new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); + new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor0", TSDataType.INT32); Set<String> allSensors = new HashSet<>(); allSensors.add("sensor0"); allSensors.add("sensor1"); @@ -111,6 +110,10 @@ public class IdentitySinkOperatorTest { driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName()); driverContext.addOperatorContext( 3, new PlanNodeId("3"), IdentitySinkOperator.class.getSimpleName()); + PlanNodeId planNodeId4 = new PlanNodeId("4"); + driverContext.addOperatorContext(4, planNodeId4, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId5 = new PlanNodeId("5"); + driverContext.addOperatorContext(5, planNodeId5, SeriesScanOperator.class.getSimpleName()); SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); scanOptionsBuilder.withAllSensors(allSensors); @@ -124,10 +127,10 @@ public class IdentitySinkOperatorTest { seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator1 .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath2 = - new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32); + new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor1", TSDataType.INT32); SeriesScanOperator seriesScanOperator2 = new SeriesScanOperator( driverContext.getOperatorContexts().get(1), @@ -138,26 +141,35 @@ public class IdentitySinkOperatorTest { seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator2 .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS)); MeasurementPath measurementPath3 = - new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); + new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor0", TSDataType.INT32); SeriesScanOperator seriesScanOperator3 = new SeriesScanOperator( - driverContext.getOperatorContexts().get(1), - planNodeId2, + driverContext.getOperatorContexts().get(3), + planNodeId4, measurementPath3, Ordering.ASC, scanOptionsBuilder.build()); seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator3 .getOperatorContext() - .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS)); - TsBlock identityTsBlock = null; - while (seriesScanOperator3.hasNext()) { - identityTsBlock = seriesScanOperator3.next(); - } + MeasurementPath measurementPath4 = + new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor1", TSDataType.INT32); + SeriesScanOperator seriesScanOperator4 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(4), + planNodeId5, + measurementPath4, + Ordering.ASC, + scanOptionsBuilder.build()); + seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + seriesScanOperator4 + .getOperatorContext() + .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS)); ISinkHandle sinkHandle = Mockito.mock(ShuffleSinkHandle.class); Mockito.when(sinkHandle.isChannelClosed(0)).thenReturn(false); @@ -168,6 +180,9 @@ public class IdentitySinkOperatorTest { Arrays.asList(seriesScanOperator1, seriesScanOperator2), new DownStreamChannelIndex(0), sinkHandle); + identitySinkOperator + .getOperatorContext() + .setMaxRunTime(new Duration(1200, TimeUnit.MILLISECONDS)); Assert.assertEquals( seriesScanOperator3.calculateMaxPeekMemory(), @@ -176,7 +191,27 @@ public class IdentitySinkOperatorTest { seriesScanOperator3.calculateMaxReturnSize(), identitySinkOperator.calculateMaxReturnSize()); - while (identitySinkOperator.isBlocked().isDone() && identitySinkOperator.hasNext()) { + while (seriesScanOperator3.hasNext() + && identitySinkOperator.isBlocked().isDone() + && identitySinkOperator.hasNext()) { + TsBlock identityTsBlock = seriesScanOperator3.next(); + TsBlock tsBlock = identitySinkOperator.next(); + if (tsBlock == null) { + continue; + } + assertEquals(1, tsBlock.getValueColumnCount()); + assertTrue(tsBlock.getColumn(0) instanceof IntColumn); + assertNotNull(identityTsBlock); + assertEquals(identityTsBlock.getPositionCount(), tsBlock.getPositionCount()); + for (int i = 0; i < identityTsBlock.getPositionCount(); i++) { + assertEquals(identityTsBlock.getColumn(0).getInt(i), tsBlock.getColumn(0).getInt(i)); + } + } + + while (seriesScanOperator4.hasNext() + && identitySinkOperator.isBlocked().isDone() + && identitySinkOperator.hasNext()) { + TsBlock identityTsBlock = seriesScanOperator4.next(); TsBlock tsBlock = identitySinkOperator.next(); if (tsBlock == null) { continue; @@ -189,9 +224,6 @@ public class IdentitySinkOperatorTest { assertEquals(identityTsBlock.getColumn(0).getInt(i), tsBlock.getColumn(0).getInt(i)); } } - List<Operator> children = identitySinkOperator.getChildren(); - Assert.assertNull(children.get(0)); - Assert.assertNull(children.get(1)); } catch (IllegalPathException e) { e.printStackTrace();
