This is an automated email from the ASF dual-hosted git repository.

weihao pushed a commit to branch perfectTransform
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8f2087bc7c9907262985831635be4fef13d88af5
Author: Weihao Li <[email protected]>
AuthorDate: Mon Mar 30 18:05:06 2026 +0800

    add
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../operator/process/TransformOperator.java        |  18 ++-
 .../process/join/FullOuterTimeJoinOperator.java    |   8 +-
 .../execution/operator/TransformOperatorTest.java  | 136 ++++++++++++++++++
 .../join/FullOuterTimeJoinOperatorTest.java        | 155 +++++++++++++++++++++
 4 files changed, 309 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
index 28a80462ea7..ad5192f9807 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
@@ -54,6 +54,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class TransformOperator implements ProcessOperator {
 
@@ -225,6 +226,7 @@ public class TransformOperator implements ProcessOperator {
   @Override
   public TsBlock next() throws Exception {
 
+    long start = System.nanoTime();
     try {
       YieldableState yieldableState = iterateAllColumnsToNextValid();
       if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -236,9 +238,10 @@ public class TransformOperator implements ProcessOperator {
       final TimeColumnBuilder timeBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
       final ColumnBuilder[] columnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
       final int columnCount = columnBuilders.length;
-
-      int rowCount = 0;
-      while (!timeHeap.isEmpty()) {
+      long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+      while (!timeHeap.isEmpty()
+          && !tsBlockBuilder.isFull()
+          && System.nanoTime() - start < maxRuntime) {
         final long currentTime = timeHeap.pollFirst();
 
         // time
@@ -253,25 +256,26 @@ public class TransformOperator implements ProcessOperator 
{
             }
             timeHeap.add(currentTime);
 
-            tsBlockBuilder.declarePositions(rowCount);
             return tsBlockBuilder.build();
           }
         }
 
         prepareEachColumn(columnCount);
 
-        ++rowCount;
+        tsBlockBuilder.declarePosition();
 
         yieldableState = iterateAllColumnsToNextValid();
         if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
-          tsBlockBuilder.declarePositions(rowCount);
           return tsBlockBuilder.build();
         }
 
         inputLayer.updateRowRecordListEvictionUpperBound();
       }
 
-      tsBlockBuilder.declarePositions(rowCount);
+      if (tsBlockBuilder.isEmpty()) {
+        return null;
+      }
+
       return tsBlockBuilder.build();
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperator.java
index 819a1562225..eb9f940a015 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperator.java
@@ -39,6 +39,7 @@ import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.util.concurrent.Futures.successfulAsList;
@@ -161,6 +162,8 @@ public class FullOuterTimeJoinOperator extends 
AbstractConsumeAllOperator {
     }
 
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    long startTime = System.nanoTime();
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long currentTime;
     do {
       currentTime = timeSelector.pollFirst();
@@ -171,7 +174,10 @@ public class FullOuterTimeJoinOperator extends 
AbstractConsumeAllOperator {
 
       prepareForTimeHeap();
 
-    } while (comparator.lessThan(currentTime, currentEndTime) && 
!timeSelector.isEmpty());
+    } while (comparator.lessThan(currentTime, currentEndTime)
+        && !timeSelector.isEmpty()
+        && !tsBlockBuilder.isFull()
+        && System.nanoTime() - startTime < maxRuntime);
 
     resultTsBlock = tsBlockBuilder.build();
     return checkTsBlockSizeAndGetResult();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
index fb9daccde03..044f99b8b88 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.db.queryengine.execution.operator;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.NodeRef;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
@@ -28,18 +31,28 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.column.LongColumn;
 import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -139,4 +152,127 @@ public class TransformOperatorTest {
     reader.yield();
     reader.consumedAll();
   }
+
+  @Test
+  public void testTransformResultLimit() throws Exception {
+    UDFClassLoaderManager.setupAndGetInstance();
+    int savedMaxLine = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+    try {
+      int rowCount = 2001;
+      int maxLine = 200;
+      TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(200);
+      QueryId queryId = new QueryId("stub_query_chunk");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(
+              instanceId,
+              IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification"));
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+      PlanNodeId scanNodeId = new PlanNodeId("scan");
+      driverContext.addOperatorContext(1, scanNodeId, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId transformNodeId = new PlanNodeId("transform");
+      driverContext.addOperatorContext(2, transformNodeId, 
TransformOperator.class.getSimpleName());
+
+      long[] times = new long[rowCount];
+      long[] values = new long[rowCount];
+      for (int i = 0; i < rowCount; i++) {
+        times[i] = i;
+        values[i] = i * 10L;
+      }
+      TsBlock oneBatch =
+          new TsBlock(
+              new TimeColumn(rowCount, times), new LongColumn(rowCount, 
Optional.empty(), values));
+
+      Operator childOperator =
+          new Operator() {
+            boolean consumed = false;
+
+            @Override
+            public OperatorContext getOperatorContext() {
+              return driverContext.getOperatorContexts().get(0);
+            }
+
+            @Override
+            public TsBlock next() {
+              if (!consumed) {
+                consumed = true;
+                return oneBatch;
+              }
+              return null;
+            }
+
+            @Override
+            public boolean hasNext() {
+              return !consumed;
+            }
+
+            @Override
+            public void close() {}
+
+            @Override
+            public boolean isFinished() {
+              return consumed;
+            }
+
+            @Override
+            public long calculateMaxPeekMemory() {
+              return oneBatch.getSizeInBytes();
+            }
+
+            @Override
+            public long calculateMaxReturnSize() {
+              return oneBatch.getSizeInBytes();
+            }
+
+            @Override
+            public long calculateRetainedSizeAfterCallingNext() {
+              return 0;
+            }
+
+            @Override
+            public long ramBytesUsed() {
+              return 0;
+            }
+          };
+
+      TimeSeriesOperand s1 =
+          new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"), 
TSDataType.INT64);
+      Map<String, List<InputLocation>> inputLocations =
+          ImmutableMap.of(s1.getExpressionString(), ImmutableList.of(new 
InputLocation(0, 0)));
+      Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>();
+      expressionTypes.put(NodeRef.of(s1), TSDataType.INT64);
+
+      TransformOperator transform =
+          new TransformOperator(
+              driverContext.getOperatorContexts().get(1),
+              childOperator,
+              ImmutableList.of(TSDataType.INT64),
+              inputLocations,
+              new Expression[] {s1},
+              true,
+              ZoneId.systemDefault(),
+              expressionTypes,
+              true);
+
+      int totalOutRows = 0;
+      int nonNullNextCount = 0;
+      while (transform.hasNext()) {
+        TsBlock out = transform.next();
+        if (out != null) {
+          nonNullNextCount++;
+          Assert.assertTrue(
+              "Each batch must be at most " + maxLine + " rows", 
out.getPositionCount() <= maxLine);
+          totalOutRows += out.getPositionCount();
+        }
+      }
+      Assert.assertEquals(rowCount, totalOutRows);
+      System.out.println(nonNullNextCount);
+      Assert.assertTrue(nonNullNextCount >= 11);
+    } finally {
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperatorTest.java
new file mode 100644
index 00000000000..d6693351049
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperatorTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.join;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.units.Duration;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class FullOuterTimeJoinOperatorTest {
+
+  @Test
+  public void testResultSizeLimit() throws Exception {
+    final int rowCount = 2000;
+    final int maxLine = 128;
+
+    int savedMaxLine = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+    Duration savedRunTimeSlice = OperatorContext.getMaxRunTime();
+    try {
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxLine);
+      OperatorContext.setMaxRunTime(new Duration(1, TimeUnit.MINUTES));
+
+      OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+      Operator child1 = createSingleBatchOperator(operatorContext, rowCount, 
0);
+      Operator child2 = createSingleBatchOperator(operatorContext, rowCount, 
10000);
+
+      List<TSDataType> dataTypes = Arrays.asList(TSDataType.INT32, 
TSDataType.INT32);
+      FullOuterTimeJoinOperator fullOuterTimeJoinOperator =
+          new FullOuterTimeJoinOperator(
+              operatorContext,
+              Arrays.asList(child1, child2),
+              Ordering.ASC,
+              dataTypes,
+              Arrays.asList(
+                  new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
+              new AscTimeComparator());
+
+      int totalRows = 0;
+      int nonNullTsBlockCount = 0;
+      ListenableFuture<?> listenableFuture = 
fullOuterTimeJoinOperator.isBlocked();
+      listenableFuture.get();
+      while (!fullOuterTimeJoinOperator.isFinished() && 
fullOuterTimeJoinOperator.hasNext()) {
+        TsBlock tsBlock = fullOuterTimeJoinOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          nonNullTsBlockCount++;
+          Assert.assertTrue(tsBlock.getPositionCount() <= maxLine);
+          totalRows += tsBlock.getPositionCount();
+        }
+        listenableFuture = fullOuterTimeJoinOperator.isBlocked();
+        listenableFuture.get();
+      }
+
+      Assert.assertEquals(rowCount, totalRows);
+      Assert.assertTrue(nonNullTsBlockCount >= 16);
+    } finally {
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine);
+      OperatorContext.setMaxRunTime(savedRunTimeSlice);
+    }
+  }
+
+  private Operator createSingleBatchOperator(
+      OperatorContext operatorContext, int rowCount, int valueOffset) {
+    return new Operator() {
+      private boolean consumed = false;
+
+      @Override
+      public OperatorContext getOperatorContext() {
+        return operatorContext;
+      }
+
+      @Override
+      public TsBlock next() {
+        if (consumed) {
+          return null;
+        }
+        consumed = true;
+
+        TsBlockBuilder builder = new TsBlockBuilder(rowCount, 
Arrays.asList(TSDataType.INT32));
+        for (int i = 0; i < rowCount; i++) {
+          builder.getTimeColumnBuilder().writeLong(i);
+          builder.getColumnBuilder(0).writeInt(valueOffset + i);
+        }
+        builder.declarePositions(rowCount);
+        return builder.build();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return !consumed;
+      }
+
+      @Override
+      public void close() {}
+
+      @Override
+      public boolean isFinished() {
+        return consumed;
+      }
+
+      @Override
+      public long calculateMaxPeekMemory() {
+        return 0;
+      }
+
+      @Override
+      public long calculateMaxReturnSize() {
+        return 0;
+      }
+
+      @Override
+      public long calculateRetainedSizeAfterCallingNext() {
+        return 0;
+      }
+
+      @Override
+      public long ramBytesUsed() {
+        return 0;
+      }
+    };
+  }
+}

Reply via email to