This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 993a7839133 Fix process logic of empty TsBlock in InputLayer of
TransformOperator (#16678)
993a7839133 is described below
commit 993a783913383a9bdb9bc20d563014f93d7db799
Author: Weihao Li <[email protected]>
AuthorDate: Fri Oct 31 10:22:47 2025 +0800
Fix process logic of empty TsBlock in InputLayer of TransformOperator
(#16678)
---
.../dag/input/QueryDataSetInputLayer.java | 10 +-
.../execution/operator/TransformOperatorTest.java | 142 +++++++++++++++++++++
2 files changed, 149 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/input/QueryDataSetInputLayer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/input/QueryDataSetInputLayer.java
index d13dfb3ced4..c42f3c8480b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/input/QueryDataSetInputLayer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/input/QueryDataSetInputLayer.java
@@ -99,9 +99,13 @@ public class QueryDataSetInputLayer {
YieldableState yieldableState = queryDataSet.yield();
if (YieldableState.YIELDABLE.equals(yieldableState)) {
Column[] columns = queryDataSet.currentBlock();
- rowList.put(columns);
- iterator.next();
- cachedColumns = iterator.currentBlock();
+ if (columns[0].getPositionCount() == 0) {
+ cachedColumns = columns;
+ } else {
+ rowList.put(columns);
+ iterator.next();
+ cachedColumns = iterator.currentBlock();
+ }
// No need to call `.consume()` like method in queryDataSet
}
return yieldableState;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
new file mode 100644
index 00000000000..fb9daccde03
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
+import
org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer;
+import
org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+
+public class TransformOperatorTest {
+
+ @Test
+ public void testInputLayerEmptyBlockProcess() throws Exception {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(
+ instanceId, IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification"));
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId1,
SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ driverContext.addOperatorContext(2, planNodeId2,
TransformOperator.class.getSimpleName());
+
+ // Construct child Operator
+ TsBlock[] data = new TsBlock[3];
+ TimeColumn timeColumn1 = new TimeColumn(1, new long[] {1});
+ data[0] = new TsBlock(timeColumn1, new LongColumn(1, Optional.empty(), new
long[] {1}));
+ TimeColumn timeColumn2 = new TimeColumn(0, new long[] {});
+ data[1] = new TsBlock(timeColumn2, new LongColumn(0, Optional.empty(), new
long[] {}));
+ TimeColumn timeColumn3 = new TimeColumn(1, new long[] {2});
+ data[2] = new TsBlock(timeColumn3, new LongColumn(1, Optional.empty(), new
long[] {1}));
+ Operator childOperator =
+ new Operator() {
+ boolean finished = false;
+ int count = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return driverContext.getOperatorContexts().get(0);
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock tsBlock = data[count];
+ count++;
+ if (count == 3) {
+ finished = true;
+ }
+ return tsBlock;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !finished;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ };
+
+ // Construct LayerReader for TransformOperator
+ QueryDataSetInputLayer inputLayer =
+ new QueryDataSetInputLayer(
+ queryId.toString(),
+ 1,
+ new TsBlockInputDataSet(childOperator,
ImmutableList.of(TSDataType.INT64)));
+ LayerReader reader = inputLayer.constructValueReader(0);
+ reader.yield();
+ reader.consumedAll();
+ // process empty TsBlock
+ reader.yield();
+ reader.consumedAll();
+ reader.yield();
+ reader.consumedAll();
+ }
+}