This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e41fd090b76 Fix incorrect use of Operator.next() in
IdentitySinkOperatorTest
e41fd090b76 is described below
commit e41fd090b76e660beb9a584c2da2f14f262b272e
Author: Liao Lanyu <[email protected]>
AuthorDate: Thu Jul 13 13:59:35 2023 +0800
Fix incorrect use of Operator.next() in IdentitySinkOperatorTest
---
.../operator/sink/IdentitySinkOperatorTest.java | 70 ++++++++++++++++------
1 file changed, 51 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java
index 7cadc6a41b0..4c554ea4f89 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/sink/IdentitySinkOperatorTest.java
@@ -32,7 +32,6 @@ import
org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
@@ -69,7 +68,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class IdentitySinkOperatorTest {
- private static final String TIME_JOIN_OPERATOR_TEST_SG =
"root.identitySinkTest";
+ private static final String IDENTITY_SINK_TEST = "root.identitySinkTest";
private final List<String> deviceIds = new ArrayList<>();
private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
@@ -79,7 +78,7 @@ public class IdentitySinkOperatorTest {
@Before
public void setUp() throws MetadataException, IOException,
WriteProcessException {
SeriesReaderTestUtil.setUp(
- measurementSchemas, deviceIds, seqResources, unSeqResources,
TIME_JOIN_OPERATOR_TEST_SG);
+ measurementSchemas, deviceIds, seqResources, unSeqResources,
IDENTITY_SINK_TEST);
}
@After
@@ -93,7 +92,7 @@ public class IdentitySinkOperatorTest {
IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
try {
MeasurementPath measurementPath1 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
+ new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor0",
TSDataType.INT32);
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
allSensors.add("sensor1");
@@ -111,6 +110,10 @@ public class IdentitySinkOperatorTest {
driverContext.addOperatorContext(2, planNodeId2,
SeriesScanOperator.class.getSimpleName());
driverContext.addOperatorContext(
3, new PlanNodeId("3"), IdentitySinkOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ driverContext.addOperatorContext(4, planNodeId4,
SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ driverContext.addOperatorContext(5, planNodeId5,
SeriesScanOperator.class.getSimpleName());
SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(allSensors);
@@ -124,10 +127,10 @@ public class IdentitySinkOperatorTest {
seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
seriesScanOperator1
.getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS));
MeasurementPath measurementPath2 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1",
TSDataType.INT32);
+ new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor1",
TSDataType.INT32);
SeriesScanOperator seriesScanOperator2 =
new SeriesScanOperator(
driverContext.getOperatorContexts().get(1),
@@ -138,26 +141,35 @@ public class IdentitySinkOperatorTest {
seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
seriesScanOperator2
.getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS));
MeasurementPath measurementPath3 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
+ new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor0",
TSDataType.INT32);
SeriesScanOperator seriesScanOperator3 =
new SeriesScanOperator(
- driverContext.getOperatorContexts().get(1),
- planNodeId2,
+ driverContext.getOperatorContexts().get(3),
+ planNodeId4,
measurementPath3,
Ordering.ASC,
scanOptionsBuilder.build());
seriesScanOperator3.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
seriesScanOperator3
.getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS));
- TsBlock identityTsBlock = null;
- while (seriesScanOperator3.hasNext()) {
- identityTsBlock = seriesScanOperator3.next();
- }
+ MeasurementPath measurementPath4 =
+ new MeasurementPath(IDENTITY_SINK_TEST + ".device0.sensor1",
TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator4 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(4),
+ planNodeId5,
+ measurementPath4,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ seriesScanOperator4.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator4
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(1000, TimeUnit.MILLISECONDS));
ISinkHandle sinkHandle = Mockito.mock(ShuffleSinkHandle.class);
Mockito.when(sinkHandle.isChannelClosed(0)).thenReturn(false);
@@ -168,6 +180,9 @@ public class IdentitySinkOperatorTest {
Arrays.asList(seriesScanOperator1, seriesScanOperator2),
new DownStreamChannelIndex(0),
sinkHandle);
+ identitySinkOperator
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(1200, TimeUnit.MILLISECONDS));
Assert.assertEquals(
seriesScanOperator3.calculateMaxPeekMemory(),
@@ -176,7 +191,27 @@ public class IdentitySinkOperatorTest {
seriesScanOperator3.calculateMaxReturnSize(),
identitySinkOperator.calculateMaxReturnSize());
- while (identitySinkOperator.isBlocked().isDone() &&
identitySinkOperator.hasNext()) {
+ while (seriesScanOperator3.hasNext()
+ && identitySinkOperator.isBlocked().isDone()
+ && identitySinkOperator.hasNext()) {
+ TsBlock identityTsBlock = seriesScanOperator3.next();
+ TsBlock tsBlock = identitySinkOperator.next();
+ if (tsBlock == null) {
+ continue;
+ }
+ assertEquals(1, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+ assertNotNull(identityTsBlock);
+ assertEquals(identityTsBlock.getPositionCount(),
tsBlock.getPositionCount());
+ for (int i = 0; i < identityTsBlock.getPositionCount(); i++) {
+ assertEquals(identityTsBlock.getColumn(0).getInt(i),
tsBlock.getColumn(0).getInt(i));
+ }
+ }
+
+ while (seriesScanOperator4.hasNext()
+ && identitySinkOperator.isBlocked().isDone()
+ && identitySinkOperator.hasNext()) {
+ TsBlock identityTsBlock = seriesScanOperator4.next();
TsBlock tsBlock = identitySinkOperator.next();
if (tsBlock == null) {
continue;
@@ -189,9 +224,6 @@ public class IdentitySinkOperatorTest {
assertEquals(identityTsBlock.getColumn(0).getInt(i),
tsBlock.getColumn(0).getInt(i));
}
}
- List<Operator> children = identitySinkOperator.getChildren();
- Assert.assertNull(children.get(0));
- Assert.assertNull(children.get(1));
} catch (IllegalPathException e) {
e.printStackTrace();