This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch new_object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 367bb505ffd36623f8d0b89197d29359b380bd19 Author: Weihao Li <[email protected]> AuthorDate: Fri Oct 31 10:22:47 2025 +0800 Fix process logic of empty TsBlock in InputLayer of TransformOperator (#16678) (cherry picked from commit 993a783913383a9bdb9bc20d563014f93d7db799) --- .../dag/input/QueryDataSetInputLayer.java | 10 +- .../execution/operator/TransformOperatorTest.java | 142 +++++++++++++++++++++ 2 files changed, 149 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/input/QueryDataSetInputLayer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/input/QueryDataSetInputLayer.java index d13dfb3ced4..c42f3c8480b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/input/QueryDataSetInputLayer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/input/QueryDataSetInputLayer.java @@ -99,9 +99,13 @@ public class QueryDataSetInputLayer { YieldableState yieldableState = queryDataSet.yield(); if (YieldableState.YIELDABLE.equals(yieldableState)) { Column[] columns = queryDataSet.currentBlock(); - rowList.put(columns); - iterator.next(); - cachedColumns = iterator.currentBlock(); + if (columns[0].getPositionCount() == 0) { + cachedColumns = columns; + } else { + rowList.put(columns); + iterator.next(); + cachedColumns = iterator.currentBlock(); + } // No need to call `.consume()` like method in queryDataSet } return yieldableState; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java new file mode 100644 index 00000000000..fb9daccde03 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.transformation.api.LayerReader; +import org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer; +import org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.column.LongColumn; +import org.apache.tsfile.read.common.block.column.TimeColumn; +import org.junit.Test; + +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; + +public class TransformOperatorTest { + + @Test + public void testInputLayerEmptyBlockProcess() throws Exception { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine( + instanceId, IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification")); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + driverContext.addOperatorContext(2, planNodeId2, TransformOperator.class.getSimpleName()); + + // Construct child Operator + TsBlock[] data = new TsBlock[3]; + TimeColumn timeColumn1 = new TimeColumn(1, new long[] {1}); + data[0] = new TsBlock(timeColumn1, new LongColumn(1, Optional.empty(), new long[] {1})); + TimeColumn timeColumn2 = new TimeColumn(0, new long[] {}); + data[1] = new TsBlock(timeColumn2, new LongColumn(0, Optional.empty(), new long[] {})); + TimeColumn timeColumn3 = new TimeColumn(1, new long[] {2}); + data[2] = new TsBlock(timeColumn3, new LongColumn(1, Optional.empty(), new long[] {1})); + Operator childOperator = + new Operator() { + boolean finished = false; + int count = 0; + + @Override + public OperatorContext getOperatorContext() { + return driverContext.getOperatorContexts().get(0); + } + + @Override + public TsBlock next() { + TsBlock tsBlock = data[count]; + count++; + if (count == 3) { + finished = true; + } + return tsBlock; + } + + @Override + public boolean hasNext() { + return !finished; + } + + @Override + public void close() {} + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + }; + + // Construct LayerReader for TransformOperator + QueryDataSetInputLayer inputLayer = + new QueryDataSetInputLayer( + queryId.toString(), + 1, + new TsBlockInputDataSet(childOperator, ImmutableList.of(TSDataType.INT64))); + LayerReader reader = inputLayer.constructValueReader(0); + reader.yield(); + reader.consumedAll(); + // process empty TsBlock + reader.yield(); + reader.consumedAll(); + reader.yield(); + reader.consumedAll(); + } +}
