This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b81444db3e Improve DeviceViewIntoOperator's return style to pipeline 
(#16994)
8b81444db3e is described below

commit 8b81444db3e6871fe0b2eaf4d87d0b31a573241f
Author: shizy <[email protected]>
AuthorDate: Mon Jan 12 14:04:24 2026 +0800

    Improve DeviceViewIntoOperator's return style to pipeline (#16994)
---
 .../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  |  55 +--
 .../relational/it/query/recent/IoTDBCteIT.java     |   4 +-
 .../operator/process/AbstractIntoOperator.java     |   6 +-
 .../operator/process/AbstractTreeIntoOperator.java |   8 +-
 .../operator/process/DeviceViewIntoOperator.java   | 110 ++++-
 .../process/InsertTabletStatementGenerator.java    |   4 +
 .../operator/process/TableIntoOperator.java        |   5 +
 .../operator/process/TreeIntoOperator.java         |  37 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  21 +-
 .../queryengine/plan/analyze/SelectIntoUtils.java  |  42 +-
 .../parameter/DeviceViewIntoPathDescriptor.java    |   5 +-
 .../planner/plan/parameter/IntoPathDescriptor.java |   5 +-
 .../operator/DeviceViewIntoOperatorTest.java       | 487 +++++++++++++++++++++
 .../execution/operator/TreeIntoOperatorTest.java   | 303 +++++++++++++
 14 files changed, 962 insertions(+), 130 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index a728ac49e69..5a6b3be3db1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -644,89 +644,74 @@ public class IoTDBSelectIntoIT {
     // test INT32
     assertTestFail(
         "select s_int32 into root.sg_type.d_1(s_boolean) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is 
not compatible with the data type of source column 
(root.sg_type.d_0.s_int32[INT32]).");
-    assertTestFail(
-        "select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_int32[INT32]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is 
not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0, 
value 0]");
 
     // test INT64
     assertTestFail(
         "select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_int64[INT64]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not 
consistent, registered type INT32, inserting type INT64, timestamp 0, value 
0]");
     assertTestFail(
         "select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_int64[INT64]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not 
consistent, registered type FLOAT, inserting type INT64, timestamp 0, value 
0]");
     assertTestFail(
         "select s_int64 into root.sg_type.d_1(s_boolean) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is 
not compatible with the data type of source column 
(root.sg_type.d_0.s_int64[INT64]).");
-    assertTestFail(
-        "select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_int64[INT64]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is 
not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0, 
value 0]");
 
     // test FLOAT
     assertTestFail(
         "select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_float[FLOAT]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not 
consistent, registered type INT32, inserting type FLOAT, timestamp 0, value 
0.0]");
     assertTestFail(
         "select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_float[FLOAT]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not 
consistent, registered type INT64, inserting type FLOAT, timestamp 0, value 
0.0]");
     assertTestFail(
         "select s_float into root.sg_type.d_1(s_boolean) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is 
not compatible with the data type of source column 
(root.sg_type.d_0.s_float[FLOAT]).");
-    assertTestFail(
-        "select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_float[FLOAT]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is 
not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0, 
value 0.0]");
 
     // test DOUBLE
     assertTestFail(
         "select s_double into root.sg_type.d_1(s_int32) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_double[DOUBLE]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not 
consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value 
0.0]");
     assertTestFail(
         "select s_double into root.sg_type.d_1(s_int64) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_double[DOUBLE]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not 
consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value 
0.0]");
     assertTestFail(
         "select s_double into root.sg_type.d_1(s_float) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_double[DOUBLE]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not 
consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value 
0.0]");
     assertTestFail(
         "select s_double into root.sg_type.d_1(s_boolean) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is 
not compatible with the data type of source column 
(root.sg_type.d_0.s_double[DOUBLE]).");
-    assertTestFail(
-        "select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_double[DOUBLE]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is 
not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0, 
value 0.0]");
 
     // test BOOLEAN
     assertTestFail(
         "select s_boolean into root.sg_type.d_1(s_int32) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not 
consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value 
true]");
     assertTestFail(
         "select s_boolean into root.sg_type.d_1(s_int64) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not 
consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value 
true]");
     assertTestFail(
         "select s_boolean into root.sg_type.d_1(s_float) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not 
consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value 
true]");
     assertTestFail(
         "select s_boolean into root.sg_type.d_1(s_double) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is 
not compatible with the data type of source column 
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
-    assertTestFail(
-        "select s_boolean into root.sg_type.d_1(s_text) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is 
not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, 
value true]");
 
     // test TEXT
     assertTestFail(
         "select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_text[TEXT]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not 
consistent, registered type INT32, inserting type TEXT, timestamp 0, value 
text0]");
     assertTestFail(
         "select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_text[TEXT]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not 
consistent, registered type INT64, inserting type TEXT, timestamp 0, value 
text0]");
     assertTestFail(
         "select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not 
compatible with the data type of source column 
(root.sg_type.d_0.s_text[TEXT]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not 
consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value 
text0]");
     assertTestFail(
         "select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is 
not compatible with the data type of source column 
(root.sg_type.d_0.s_text[TEXT]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is 
not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value 
text0]");
     assertTestFail(
         "select s_text into root.sg_type.d_1(s_boolean) from 
root.sg_type.d_0;",
-        "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is 
not compatible with the data type of source column 
(root.sg_type.d_0.s_text[TEXT]).");
+        "Error occurred while inserting tablets in SELECT INTO: Fail to insert 
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is 
not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, 
value text0]");
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java
index b79a5b7b7fa..7f29e3f0197 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java
@@ -470,7 +470,7 @@ public class IoTDBCteIT {
 
   @Test
   public void testConcurrentCteQueries() throws Exception {
-    final int threadCount = 10;
+    final int threadCount = 3;
     final int queriesPerThread = 20;
     final AtomicInteger successCount = new AtomicInteger(0);
     final AtomicInteger failureCount = new AtomicInteger(0);
@@ -582,7 +582,7 @@ public class IoTDBCteIT {
     int totalQueries = threadCount * queriesPerThread;
     assertEquals("All queries should succeed", totalQueries, 
successCount.get());
     assertEquals("No queries should fail", 0, failureCount.get());
-    assertEquals("Total query count should match", 340, totalCount.get());
+    assertEquals("Total query count should match", 102, totalCount.get());
   }
 
   private static void prepareData() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
index 444c5001975..425449469b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
@@ -102,7 +102,7 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
     checkLastWriteOperation();
 
     if (!processTsBlock(cachedTsBlock)) {
-      return null;
+      return tryToReturnPartialResult();
     }
     cachedTsBlock = null;
     if (child.hasNextWithTimer()) {
@@ -110,7 +110,7 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
       processTsBlock(inputTsBlock);
 
       // call child.next only once
-      return null;
+      return tryToReturnPartialResult();
     } else {
       return tryToReturnResultTsBlock();
     }
@@ -204,6 +204,8 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
 
   protected abstract TsBlock tryToReturnResultTsBlock();
 
+  protected abstract TsBlock tryToReturnPartialResult();
+
   protected abstract void resetInsertTabletStatementGenerators();
 
   private void setMaxRowNumberInStatement(long statementSizePerLine) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java
index a72907b977b..0d15c72cfa1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
 import com.google.common.util.concurrent.Futures;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,15 +40,18 @@ import java.util.concurrent.ExecutorService;
 
 public abstract class AbstractTreeIntoOperator extends AbstractIntoOperator {
   protected List<InsertTabletStatementGenerator> 
insertTabletStatementGenerators;
+  protected final TsBlockBuilder resultTsBlockBuilder;
 
   protected AbstractTreeIntoOperator(
       OperatorContext operatorContext,
       Operator child,
       List<TSDataType> inputColumnTypes,
       ExecutorService intoOperationExecutor,
-      long statementSizePerLine) {
+      long statementSizePerLine,
+      List<TSDataType> outputDataTypes) {
     super(operatorContext, child, inputColumnTypes, intoOperationExecutor, 
statementSizePerLine);
     this.maxReturnSize = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
   protected static List<InsertTabletStatementGenerator> 
constructInsertTabletStatementGenerators(
@@ -134,7 +138,7 @@ public abstract class AbstractTreeIntoOperator extends 
AbstractIntoOperator {
             () -> client.insertTablets(insertMultiTabletsStatement), 
writeOperationExecutor);
   }
 
-  private boolean existFullStatement(
+  protected boolean existFullStatement(
       List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
     for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
       if (generator.isFull()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
index 6321a52914b..727ef26c1c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
@@ -22,22 +22,26 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.process;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.column.ColumnHeader;
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.IntoProcessException;
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 
 import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -48,6 +52,7 @@ public class DeviceViewIntoOperator extends 
AbstractTreeIntoOperator {
 
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(DeviceViewIntoOperator.class);
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   private final Map<String, Map<PartialPath, Map<String, InputLocation>>>
       deviceToTargetPathSourceInputLocationMap;
@@ -59,7 +64,7 @@ public class DeviceViewIntoOperator extends 
AbstractTreeIntoOperator {
   private final int deviceColumnIndex;
   private String currentDevice;
 
-  private final TsBlockBuilder resultTsBlockBuilder;
+  private int batchedRowCount = 0;
 
   @SuppressWarnings("squid:S107")
   public DeviceViewIntoOperator(
@@ -74,18 +79,19 @@ public class DeviceViewIntoOperator extends 
AbstractTreeIntoOperator {
       Map<String, InputLocation> sourceColumnToInputLocationMap,
       ExecutorService intoOperationExecutor,
       long statementSizePerLine) {
-    super(operatorContext, child, inputColumnTypes, intoOperationExecutor, 
statementSizePerLine);
+    super(
+        operatorContext,
+        child,
+        inputColumnTypes,
+        intoOperationExecutor,
+        statementSizePerLine,
+        ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList()));
     this.deviceToTargetPathSourceInputLocationMap = 
deviceToTargetPathSourceInputLocationMap;
     this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
     this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
     this.deviceToSourceTargetPathPairListMap = 
deviceToSourceTargetPathPairListMap;
-
-    List<TSDataType> outputDataTypes =
-        ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
-            .map(ColumnHeader::getColumnType)
-            .collect(Collectors.toList());
-    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
-
     this.deviceColumnIndex =
         
sourceColumnToInputLocationMap.get(ColumnHeaderConstant.DEVICE).getValueColumnIndex();
   }
@@ -102,7 +108,12 @@ public class DeviceViewIntoOperator extends 
AbstractTreeIntoOperator {
           constructInsertMultiTabletsStatement(false);
       updateResultTsBlock();
 
-      insertTabletStatementGenerators = 
constructInsertTabletStatementGeneratorsByDevice(device);
+      if (insertMultiTabletsStatement != null || 
insertTabletStatementGenerators == null) {
+        insertTabletStatementGenerators = 
constructInsertTabletStatementGeneratorsByDevice(device);
+      } else {
+        insertTabletStatementGenerators.addAll(
+            constructInsertTabletStatementGeneratorsByDevice(device));
+      }
       currentDevice = device;
 
       if (insertMultiTabletsStatement != null) {
@@ -115,8 +126,14 @@ public class DeviceViewIntoOperator extends 
AbstractTreeIntoOperator {
     int readIndex = 0;
     while (readIndex < inputTsBlock.getPositionCount()) {
       int lastReadIndex = readIndex;
-      for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
-        lastReadIndex = Math.max(lastReadIndex, 
generator.processTsBlock(inputTsBlock, readIndex));
+      if (!insertTabletStatementGenerators.isEmpty()) {
+        InsertTabletStatementGenerator generatorOfCurrentDevice =
+            
insertTabletStatementGenerators.get(insertTabletStatementGenerators.size() - 1);
+        int rowCountBeforeProcess = generatorOfCurrentDevice.getRowCount();
+        lastReadIndex =
+            Math.max(
+                lastReadIndex, 
generatorOfCurrentDevice.processTsBlock(inputTsBlock, readIndex));
+        batchedRowCount += generatorOfCurrentDevice.getRowCount() - 
rowCountBeforeProcess;
       }
       readIndex = lastReadIndex;
       if (insertMultiTabletsInternally(true)) {
@@ -143,6 +160,16 @@ public class DeviceViewIntoOperator extends 
AbstractTreeIntoOperator {
     return resultTsBlockBuilder.build();
   }
 
+  @Override
+  protected TsBlock tryToReturnPartialResult() {
+    if (resultTsBlockBuilder.isFull()) {
+      TsBlock res = resultTsBlockBuilder.build();
+      resultTsBlockBuilder.reset();
+      return res;
+    }
+    return null;
+  }
+
   private List<InsertTabletStatementGenerator> 
constructInsertTabletStatementGeneratorsByDevice(
       String currentDevice) {
     Map<PartialPath, Map<String, InputLocation>> 
targetPathToSourceInputLocationMap =
@@ -192,4 +219,61 @@ public class DeviceViewIntoOperator extends 
AbstractTreeIntoOperator {
                 .mapToLong(InsertTabletStatementGenerator::ramBytesUsed)
                 .sum());
   }
+
+  @Override
+  protected InsertMultiTabletsStatement 
constructInsertMultiTabletsStatement(boolean needCheck) {
+    if (insertTabletStatementGenerators == null) {
+      return null;
+    }
+    boolean hasFullStatement = 
existFullStatement(insertTabletStatementGenerators);
+    if (needCheck) {
+      // When needCheck is true, we only proceed if there already exists a 
full statement.
+      if (!hasFullStatement) {
+        return null;
+      }
+    } else {
+      // When needCheck is false, we may delay flushing to accumulate more rows
+      // if the batch is not yet at the configured row limit and the child has 
more data.
+      try {
+        if (batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit()
+            && child.hasNextWithTimer()) {
+          return null;
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IntoProcessException(e.getMessage());
+      } catch (Exception e) {
+        throw new IntoProcessException(e.getMessage());
+      }
+    }
+    List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
+    for (InsertTabletStatementGenerator generator : 
insertTabletStatementGenerators) {
+      if (!generator.isEmpty()) {
+        
insertTabletStatementList.add(generator.constructInsertTabletStatement());
+      }
+    }
+    if (insertTabletStatementList.isEmpty()) {
+      return null;
+    }
+
+    InsertMultiTabletsStatement insertMultiTabletsStatement = new 
InsertMultiTabletsStatement();
+    
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    batchedRowCount = 0;
+    return insertMultiTabletsStatement;
+  }
+
+  @Override
+  protected long findWritten(String device, String measurement) {
+    for (int i = insertTabletStatementGenerators.size() - 1; i >= 0; i--) {
+      InsertTabletStatementGenerator generator = 
insertTabletStatementGenerators.get(i);
+      if (!Objects.equals(generator.getDevice(), device)) {
+        continue;
+      }
+      long writtenCountInCurrentGenerator = 
generator.getWrittenCount(measurement);
+      if (writtenCountInCurrentGenerator >= 0) {
+        return writtenCountInCurrentGenerator;
+      }
+    }
+    return 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java
index b2fae4c25b1..47ff31573e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java
@@ -169,6 +169,10 @@ public abstract class InsertTabletStatementGenerator 
implements Accountable {
     return rowCount == 0;
   }
 
+  public int getRowCount() {
+    return rowCount;
+  }
+
   public String getDevice() {
     return devicePath.toString();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java
index d9af3229ea2..40b1781b69d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java
@@ -114,6 +114,11 @@ public class TableIntoOperator extends 
AbstractIntoOperator {
     return constructResultTsBlock();
   }
 
+  @Override
+  protected TsBlock tryToReturnPartialResult() {
+    return null;
+  }
+
   @Override
   public long ramBytesUsed() {
     return INSTANCE_SIZE
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java
index 32838483cef..bed98c42086 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java
@@ -31,7 +31,6 @@ import org.apache.tsfile.block.column.ColumnBuilder;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
@@ -49,6 +48,8 @@ public class TreeIntoOperator extends 
AbstractTreeIntoOperator {
 
   private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
 
+  private int outputIndex = 0;
+
   @SuppressWarnings("squid:S107")
   public TreeIntoOperator(
       OperatorContext operatorContext,
@@ -60,7 +61,15 @@ public class TreeIntoOperator extends 
AbstractTreeIntoOperator {
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
       ExecutorService intoOperationExecutor,
       long statementSizePerLine) {
-    super(operatorContext, child, inputColumnTypes, intoOperationExecutor, 
statementSizePerLine);
+    super(
+        operatorContext,
+        child,
+        inputColumnTypes,
+        intoOperationExecutor,
+        statementSizePerLine,
+        ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList()));
     this.sourceTargetPathPairList = sourceTargetPathPairList;
     insertTabletStatementGenerators =
         constructInsertTabletStatementGenerators(
@@ -98,19 +107,23 @@ public class TreeIntoOperator extends 
AbstractTreeIntoOperator {
       return null;
     }
 
-    finished = true;
-    return constructResultTsBlock();
+    TsBlock res = constructResultTsBlock();
+    finished = (outputIndex == sourceTargetPathPairList.size());
+    return res;
+  }
+
+  @Override
+  protected TsBlock tryToReturnPartialResult() {
+    return null;
   }
 
   private TsBlock constructResultTsBlock() {
-    List<TSDataType> outputDataTypes =
-        ColumnHeaderConstant.selectIntoColumnHeaders.stream()
-            .map(ColumnHeader::getColumnType)
-            .collect(Collectors.toList());
-    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
     TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
     ColumnBuilder[] columnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
-    for (Pair<String, PartialPath> sourceTargetPathPair : 
sourceTargetPathPairList) {
+    for (int size = sourceTargetPathPairList.size();
+        outputIndex < size && !resultTsBlockBuilder.isFull();
+        outputIndex++) {
+      Pair<String, PartialPath> sourceTargetPathPair = 
sourceTargetPathPairList.get(outputIndex);
       timeColumnBuilder.writeLong(0);
       columnBuilders[0].writeBinary(
           new Binary(sourceTargetPathPair.left, TSFileConfig.STRING_CHARSET));
@@ -122,7 +135,9 @@ public class TreeIntoOperator extends 
AbstractTreeIntoOperator {
               sourceTargetPathPair.right.getMeasurement()));
       resultTsBlockBuilder.declarePosition();
     }
-    return resultTsBlockBuilder.build();
+    TsBlock res = resultTsBlockBuilder.build();
+    resultTsBlockBuilder.reset();
+    return res;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 4cae2c0f288..32bc088ff6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2276,7 +2276,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     intoComponent.validate(sourceDevices, sourceColumns);
 
     DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = new 
DeviceViewIntoPathDescriptor();
-    PathPatternTree targetPathTree = new PathPatternTree();
     IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator =
         intoComponent.getIntoDeviceMeasurementIterator();
     for (PartialPath sourceDevice : sourceDevices) {
@@ -2301,7 +2300,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         deviceViewIntoPathDescriptor.specifyTargetDeviceMeasurement(
             sourceDevice, targetDevice, sourceColumn.getExpressionString(), 
targetMeasurement);
 
-        targetPathTree.appendFullPath(targetDevice, targetMeasurement);
         deviceViewIntoPathDescriptor.recordSourceColumnDataType(
             sourceColumn.getExpressionString(), 
analysis.getType(sourceColumn));
 
@@ -2311,13 +2309,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       intoDeviceMeasurementIterator.nextDevice();
     }
     deviceViewIntoPathDescriptor.validate();
-
-    // fetch schema of target paths
-    long startTime = System.nanoTime();
-    ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree, 
true, context, false);
-    QueryPlanCostMetricSet.getInstance()
-        .recordTreePlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
-    deviceViewIntoPathDescriptor.bindType(targetSchemaTree);
+    deviceViewIntoPathDescriptor.bindType();
 
     analysis.setDeviceViewIntoPathDescriptor(deviceViewIntoPathDescriptor);
   }
@@ -2341,7 +2333,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     intoComponent.validate(sourceColumns);
 
     IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
-    PathPatternTree targetPathTree = new PathPatternTree();
     IntoComponent.IntoPathIterator intoPathIterator = 
intoComponent.getIntoPathIterator();
     for (Pair<Expression, String> pair : outputExpressions) {
       Expression sourceExpression = pair.left;
@@ -2381,21 +2372,13 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       intoPathDescriptor.specifyDeviceAlignment(
           targetPath.getDevicePath().toString(), isAlignedDevice);
 
-      targetPathTree.appendFullPath(targetPath);
       intoPathDescriptor.recordSourceColumnDataType(
           sourceColumn, analysis.getType(sourceExpression));
 
       intoPathIterator.next();
     }
     intoPathDescriptor.validate();
-
-    // fetch schema of target paths
-    long startTime = System.nanoTime();
-    ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree, 
true, context, false);
-    updateSchemaTreeByViews(analysis, targetSchemaTree, context, false);
-    QueryPlanCostMetricSet.getInstance()
-        .recordTreePlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
-    intoPathDescriptor.bindType(targetSchemaTree);
+    intoPathDescriptor.bindType();
 
     analysis.setIntoPathDescriptor(intoPathDescriptor);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
index 9245d2eb634..7e23b0b4590 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
@@ -21,12 +21,9 @@ package org.apache.iotdb.db.queryengine.plan.analyze;
 
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Pair;
@@ -37,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
 import static 
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString;
@@ -134,48 +130,14 @@ public class SelectIntoUtils {
 
   public static List<Pair<String, PartialPath>> 
bindTypeForSourceTargetPathPairList(
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
-      Map<String, TSDataType> sourceToDataTypeMap,
-      ISchemaTree targetSchemaTree) {
+      Map<String, TSDataType> sourceToDataTypeMap) {
     List<Pair<String, PartialPath>> sourceTypeBoundTargetPathPairList = new 
ArrayList<>();
     for (Pair<String, PartialPath> sourceTargetPathPair : 
sourceTargetPathPairList) {
       String sourceColumn = sourceTargetPathPair.left;
       TSDataType sourceColumnType = sourceToDataTypeMap.get(sourceColumn);
-
       MeasurementPath targetPathWithSchema;
       PartialPath targetPath = sourceTargetPathPair.right;
-      List<MeasurementPath> actualTargetPaths =
-          targetSchemaTree.searchMeasurementPaths(targetPath).left;
-      if (actualTargetPaths.isEmpty()) {
-        targetPathWithSchema = new MeasurementPath(targetPath, 
sourceColumnType);
-      } else {
-        checkState(actualTargetPaths.size() == 1);
-        MeasurementPath actualTargetPath = actualTargetPaths.get(0);
-        if (actualTargetPath.getMeasurementSchema().isLogicalView()) {
-          LogicalViewSchema viewSchema =
-              (LogicalViewSchema) actualTargetPath.getMeasurementSchema();
-          if (viewSchema.isWritable()) {
-            MeasurementPath viewSourceSeriesPath =
-                targetSchemaTree
-                    
.searchMeasurementPaths(viewSchema.getSourcePathIfWritable())
-                    .left
-                    .get(0);
-            actualTargetPath =
-                new MeasurementPath(targetPath, 
viewSourceSeriesPath.getSeriesType());
-            
actualTargetPath.setUnderAlignedEntity(viewSourceSeriesPath.isUnderAlignedEntity());
-          } else {
-            throw new SemanticException(
-                String.format("View %s doesn't support data insertion.", 
targetPath));
-          }
-        }
-        if (!TypeInferenceUtils.canAutoCast(sourceColumnType, 
actualTargetPath.getSeriesType())) {
-          throw new SemanticException(
-              String.format(
-                  "The data type of target path (%s[%s]) is not compatible 
with the data type of source column (%s[%s]).",
-                  targetPath, actualTargetPath.getSeriesType(), sourceColumn, 
sourceColumnType));
-        }
-        // no need to check alignment, because the interface is common
-        targetPathWithSchema = actualTargetPath;
-      }
+      targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType);
       sourceTypeBoundTargetPathPairList.add(new Pair<>(sourceColumn, 
targetPathWithSchema));
     }
     return sourceTypeBoundTargetPathPairList;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
index eed727f8135..a34bf2e1b79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -100,7 +99,7 @@ public class DeviceViewIntoPathDescriptor {
     }
   }
 
-  public void bindType(ISchemaTree targetSchemaTree) {
+  public void bindType() {
     Map<String, List<Pair<String, PartialPath>>> 
deviceToSourceTypeBoundTargetPathPairListMap =
         new HashMap<>();
     for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
@@ -108,7 +107,7 @@ public class DeviceViewIntoPathDescriptor {
       deviceToSourceTypeBoundTargetPathPairListMap.put(
           sourceTargetEntry.getKey(),
           SelectIntoUtils.bindTypeForSourceTargetPathPairList(
-              sourceTargetEntry.getValue(), sourceToDataTypeMap, 
targetSchemaTree));
+              sourceTargetEntry.getValue(), sourceToDataTypeMap));
     }
     this.deviceToSourceTargetPathPairListMap = 
deviceToSourceTypeBoundTargetPathPairListMap;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
index bfb03b4ea5a..5dfc1e50428 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -101,10 +100,10 @@ public class IntoPathDescriptor {
     }
   }
 
-  public void bindType(ISchemaTree targetSchemaTree) {
+  public void bindType() {
     this.sourceTargetPathPairList =
         SelectIntoUtils.bindTypeForSourceTargetPathPairList(
-            sourceTargetPathPairList, sourceToDataTypeMap, targetSchemaTree);
+            sourceTargetPathPairList, sourceToDataTypeMap);
   }
 
   public List<Pair<String, PartialPath>> getSourceTargetPathPairList() {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
new file mode 100644
index 00000000000..f7fe5fcf367
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.commons.path.NonAlignedFullPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class DeviceViewIntoOperatorTest {
+
+  private static final String TEST_SG = "root.test";
+
+  private Map<String, Map<PartialPath, Map<String, InputLocation>>>
+      deviceToTargetPathSourceInputLocationMap;
+  private Map<String, Map<PartialPath, Map<String, TSDataType>>> 
deviceToTargetPathDataTypeMap;
+  private Map<String, Boolean> targetDeviceToAlignedMap;
+  private Map<String, List<Pair<String, PartialPath>>> 
deviceToSourceTargetPathPairListMap;
+  private Map<String, InputLocation> sourceColumnToInputLocationMap;
+  private List<TSDataType> inputColumnTypes;
+  private List<Integer> deviceColumnIndex;
+
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<IMeasurementSchema> measurementSchemas = new 
ArrayList<>();
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  private DeviceViewIntoOperator operator;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+    
IoTDBDescriptor.getInstance().getConfig().setSelectIntoInsertTabletPlanRowLimit(4);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(512);
+
+    deviceToTargetPathSourceInputLocationMap = new HashMap<>();
+    deviceToTargetPathDataTypeMap = new HashMap<>();
+    targetDeviceToAlignedMap = new HashMap<>();
+    deviceToSourceTargetPathPairListMap = new HashMap<>();
+    sourceColumnToInputLocationMap = new HashMap<>();
+    inputColumnTypes = new ArrayList<>();
+    deviceColumnIndex = new ArrayList<>();
+
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  private DeviceViewIntoOperator createAndInitOperatorForSingleDevices(int 
sensorNum) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+
+    // Create SeriesScanOperator for each sensor
+    List<Operator> scanOperators = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<ColumnMerger> columnMergers = new ArrayList<>();
+
+    for (int i = 0; i < sensorNum; i++) {
+      scanOperators.add(
+          createSeriesScanOperator(driverContext, i, "device0", i, 
measurementSchemas.get(i)));
+      dataTypes.add(TSDataType.INT32);
+      columnMergers.add(new SingleColumnMerger(new InputLocation(i, 0), new 
AscTimeComparator()));
+    }
+
+    FullOuterTimeJoinOperator timeJoinOperator =
+        createTimeJoinOperator(driverContext, sensorNum, scanOperators, 
dataTypes, columnMergers);
+
+    // Prepare data types for SingleDeviceViewOperator (device column + sensor 
columns)
+    inputColumnTypes.add(TSDataType.TEXT); // Device column
+    for (int i = 0; i < sensorNum; i++) {
+      deviceColumnIndex.add(i + 1);
+      inputColumnTypes.add(TSDataType.INT32); // Sensor columns
+    }
+
+    SingleDeviceViewOperator singleDeviceViewOperator =
+        new SingleDeviceViewOperator(
+            addOperatorContext(driverContext, sensorNum + 1, 
SingleDeviceViewOperator.class),
+            TEST_SG + ".device0",
+            timeJoinOperator,
+            deviceColumnIndex,
+            inputColumnTypes);
+
+    return createTestDeviceViewIntoOperator(
+        driverContext,
+        sensorNum + 2,
+        singleDeviceViewOperator,
+        inputColumnTypes,
+        instanceNotificationExecutor);
+  }
+
+  private DeviceViewIntoOperator createAndInitOperatorForMultipleDevices(
+      int deviceNum, int sensorNum) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+
+    List<IDeviceID> devices = new ArrayList<>();
+    List<Operator> deviceOperators = new ArrayList<>();
+    List<List<Integer>> deviceColumnIndexes = new ArrayList<>();
+
+    List<Integer> singleDeviceColumnIndex = new ArrayList<>();
+    for (int i = 0; i < sensorNum; i++) {
+      singleDeviceColumnIndex.add(i + 1);
+    }
+
+    int operatorIndex = 0;
+    for (int deviceIdx = 0; deviceIdx < deviceNum; deviceIdx++) {
+      String device = "device" + deviceIdx;
+
+      List<Operator> scanOperators = new ArrayList<>();
+      List<TSDataType> scanDataTypes = new ArrayList<>();
+      List<ColumnMerger> columnMergers = new ArrayList<>();
+
+      for (int sensorIdx = 0; sensorIdx < sensorNum; sensorIdx++) {
+        scanOperators.add(
+            createSeriesScanOperator(
+                driverContext,
+                operatorIndex,
+                device,
+                sensorIdx,
+                measurementSchemas.get(sensorIdx)));
+        scanDataTypes.add(TSDataType.INT32);
+        columnMergers.add(
+            new SingleColumnMerger(new InputLocation(sensorIdx, 0), new 
AscTimeComparator()));
+        operatorIndex++;
+      }
+
+      FullOuterTimeJoinOperator timeJoinOperator =
+          createTimeJoinOperator(
+              driverContext, operatorIndex, scanOperators, scanDataTypes, 
columnMergers);
+      operatorIndex++;
+
+      devices.add(IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + "." + 
device));
+      deviceOperators.add(timeJoinOperator);
+      deviceColumnIndexes.add(singleDeviceColumnIndex);
+    }
+
+    List<TSDataType> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.TEXT); // Device column
+    for (int i = 0; i < sensorNum; i++) {
+      dataTypes.add(TSDataType.INT32); // Sensor columns
+    }
+
+    DeviceViewOperator deviceViewOperator =
+        new DeviceViewOperator(
+            addOperatorContext(driverContext, operatorIndex, 
DeviceViewOperator.class),
+            devices,
+            deviceOperators,
+            deviceColumnIndexes,
+            dataTypes);
+
+    inputColumnTypes.add(TSDataType.TEXT); // Device column
+    for (int i = 0; i < sensorNum; i++) {
+      deviceColumnIndex.add(i + 1);
+      inputColumnTypes.add(TSDataType.INT32); // Sensor columns
+    }
+
+    return createTestDeviceViewIntoOperator(
+        driverContext,
+        operatorIndex + 1,
+        deviceViewOperator,
+        inputColumnTypes,
+        instanceNotificationExecutor);
+  }
+
+  /** Test scenario 1: single device with small amount of data, should return 
in single TsBlock */
+  @Test
+  public void testSingleDeviceSmallData() throws Exception {
+    prepareDeviceData("device0", 2);
+    operator = createAndInitOperatorForSingleDevices(2);
+
+    TsBlock result = null;
+    while (operator.isBlocked().isDone() && operator.hasNext()) {
+      result = operator.next();
+    }
+    assertNotNull(result);
+    assertEquals(2, result.getPositionCount());
+
+    // Verify 4 value columns (device, source, target, count)
+    assertEquals(4, result.getValueColumnCount());
+    assertTrue(operator.isFinished());
+  }
+
+  /** Test scenario 2: Single device with result set exceeds maxTsBlockSize */
+  @Test
+  public void testSingleDeviceExceedsMaxTsBlockSize() throws Exception {
+    prepareDeviceData("device0", 10);
+    operator = createAndInitOperatorForSingleDevices(10);
+
+    TsBlock result = null;
+    while (operator.isBlocked().isDone() && operator.hasNext()) {
+      result = operator.next();
+    }
+    assertNotNull(result);
+    assertEquals(10, result.getPositionCount());
+
+    // Verify 4 value columns (device, source, target, count)
+    assertEquals(4, result.getValueColumnCount());
+    assertTrue(operator.isFinished());
+  }
+
+  /** Test scenario 3: Multiple device with small amount of data */
+  @Test
+  public void testMultipleDeviceSmallData() throws Exception {
+    prepareDeviceData("device0", 1);
+    prepareDeviceData("device1", 1);
+    operator = createAndInitOperatorForMultipleDevices(2, 1);
+
+    TsBlock result = null;
+    while (operator.isBlocked().isDone() && operator.hasNext()) {
+      result = operator.next();
+    }
+    assertNotNull(result);
+    assertEquals(2, result.getPositionCount());
+
+    // Verify 4 value columns (device, source, target, count)
+    assertEquals(4, result.getValueColumnCount());
+    assertTrue(operator.isFinished());
+  }
+
+  /** Test scenario 4: Multiple devices, total size exceeds maxTsBlockSize */
+  @Test
+  public void testMultipleDevicesExceedsTsBlockSize() throws Exception {
+    prepareDeviceData("device0", 2);
+    prepareDeviceData("device1", 2);
+    prepareDeviceData("device2", 2);
+    operator = createAndInitOperatorForMultipleDevices(3, 2);
+
+    int totalRows = 0;
+    int totalBatches = 0;
+    // Loop through all batches
+    while (operator.isBlocked().isDone() && operator.hasNext()) {
+      TsBlock result = operator.next();
+      if (result != null && !result.isEmpty()) {
+        totalRows += result.getPositionCount();
+        totalBatches += 1;
+      }
+    }
+
+    assertEquals(6, totalRows);
+    assertEquals(2, totalBatches);
+    assertTrue(operator.isFinished());
+  }
+
+  /**
+   * Helper method: Prepare test data for specified device
+   *
+   * @param device Device name
+   * @param sensorNum Number of path pairs for this device
+   */
+  private void prepareDeviceData(String device, int sensorNum) throws 
Exception {
+    String sourceDevicePath = TEST_SG + "." + device;
+    String targetDevicePath = TEST_SG + ".new_" + device;
+
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputMap =
+        deviceToTargetPathSourceInputLocationMap.computeIfAbsent(
+            sourceDevicePath, k -> new HashMap<>());
+    Map<PartialPath, Map<String, TSDataType>> targetDataTypeMap =
+        deviceToTargetPathDataTypeMap.computeIfAbsent(sourceDevicePath, k -> 
new HashMap<>());
+    List<Pair<String, PartialPath>> pairList =
+        deviceToSourceTargetPathPairListMap.computeIfAbsent(
+            sourceDevicePath, k -> new ArrayList<>());
+
+    Map<String, InputLocation> columnToInputLocationMap = new HashMap<>();
+    Map<String, TSDataType> dataTypeMap = new HashMap<>();
+
+    columnToInputLocationMap.put(ColumnHeaderConstant.DEVICE, new 
InputLocation(0, 0));
+    dataTypeMap.put(ColumnHeaderConstant.DEVICE, TSDataType.TEXT);
+    sourceColumnToInputLocationMap.put(ColumnHeaderConstant.DEVICE, new 
InputLocation(0, 0));
+
+    for (int i = 0; i < sensorNum; i++) {
+      String targetMeasurement = "sensor" + i;
+      String targetPath = targetDevicePath + "." + targetMeasurement;
+      PartialPath targetPartialPath = new PartialPath(targetPath);
+
+      pairList.add(new Pair<>(targetMeasurement, targetPartialPath));
+      columnToInputLocationMap.put(targetMeasurement, new InputLocation(0, i + 
1));
+      dataTypeMap.put(targetMeasurement, TSDataType.INT32);
+
+      sourceColumnToInputLocationMap.put(targetMeasurement, new 
InputLocation(0, i + 1));
+    }
+    PartialPath targetDevicePartialPath = new PartialPath(targetDevicePath);
+    targetPathToSourceInputMap.put(targetDevicePartialPath, 
columnToInputLocationMap);
+    targetDataTypeMap.put(targetDevicePartialPath, dataTypeMap);
+
+    targetDeviceToAlignedMap.put(targetDevicePath, false);
+  }
+
+  private SeriesScanOperator createSeriesScanOperator(
+      DriverContext driverContext,
+      int index,
+      String device,
+      int sensorIdx,
+      IMeasurementSchema measurementSchema) {
+    PlanNodeId planNodeId = new PlanNodeId(String.valueOf(index));
+    driverContext.addOperatorContext(index, planNodeId, 
SeriesScanOperator.class.getSimpleName());
+
+    Set<String> allSensors = new HashSet<>();
+    allSensors.add("sensor" + sensorIdx);
+    SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
+    scanOptionsBuilder.withAllSensors(allSensors);
+
+    IFullPath measurementPath =
+        new NonAlignedFullPath(
+            IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + "." + device), 
measurementSchema);
+
+    SeriesScanOperator seriesScanOperator =
+        new SeriesScanOperator(
+            driverContext.getOperatorContexts().get(index),
+            planNodeId,
+            measurementPath,
+            Ordering.ASC,
+            scanOptionsBuilder.build());
+    seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, 
unSeqResources));
+    return seriesScanOperator;
+  }
+
+  private FullOuterTimeJoinOperator createTimeJoinOperator(
+      DriverContext driverContext,
+      int index,
+      List<Operator> scanOperators,
+      List<TSDataType> dataTypes,
+      List<ColumnMerger> columnMergers) {
+    addOperatorContext(driverContext, index, FullOuterTimeJoinOperator.class);
+    return new FullOuterTimeJoinOperator(
+        driverContext.getOperatorContexts().get(index),
+        scanOperators,
+        Ordering.ASC,
+        dataTypes,
+        columnMergers,
+        new AscTimeComparator());
+  }
+
+  private OperatorContext addOperatorContext(
+      DriverContext driverContext, int index, Class<?> operatorClass) {
+    driverContext.addOperatorContext(
+        index, new PlanNodeId(String.valueOf(index)), 
operatorClass.getSimpleName());
+    return driverContext.getOperatorContexts().get(index);
+  }
+
+  private DeviceViewIntoOperator createTestDeviceViewIntoOperator(
+      DriverContext driverContext,
+      int index,
+      Operator child,
+      List<TSDataType> types,
+      ExecutorService executor) {
+    addOperatorContext(driverContext, index, TestDeviceViewIntoOperator.class);
+    return new TestDeviceViewIntoOperator(
+        driverContext.getOperatorContexts().get(index),
+        child,
+        types,
+        deviceToTargetPathSourceInputLocationMap,
+        deviceToTargetPathDataTypeMap,
+        targetDeviceToAlignedMap,
+        deviceToSourceTargetPathPairListMap,
+        sourceColumnToInputLocationMap,
+        executor,
+        100);
+  }
+
+  /**
+   * Test version of DeviceViewIntoOperator that mocks the write operation. 
Instead of actually
+   * executing insertTablets, it returns an immediately completed Future.
+   */
+  private static class TestDeviceViewIntoOperator extends 
DeviceViewIntoOperator {
+
+    public TestDeviceViewIntoOperator(
+        OperatorContext operatorContext,
+        Operator child,
+        List<TSDataType> inputColumnTypes,
+        Map<String, Map<PartialPath, Map<String, InputLocation>>>
+            deviceToTargetPathSourceInputLocationMap,
+        Map<String, Map<PartialPath, Map<String, TSDataType>>> 
deviceToTargetPathDataTypeMap,
+        Map<String, Boolean> targetDeviceToAlignedMap,
+        Map<String, List<Pair<String, PartialPath>>> 
deviceToSourceTargetPathPairListMap,
+        Map<String, InputLocation> sourceColumnToInputLocationMap,
+        ExecutorService intoOperationExecutor,
+        long statementSizePerLine) {
+      super(
+          operatorContext,
+          child,
+          inputColumnTypes,
+          deviceToTargetPathSourceInputLocationMap,
+          deviceToTargetPathDataTypeMap,
+          targetDeviceToAlignedMap,
+          deviceToSourceTargetPathPairListMap,
+          sourceColumnToInputLocationMap,
+          intoOperationExecutor,
+          statementSizePerLine);
+    }
+
+    @Override
+    protected void executeInsertMultiTabletsStatement(
+        InsertMultiTabletsStatement insertMultiTabletsStatement) {
+      // Mock the write operation by setting an immediately completed Future
+      writeOperationFuture = immediateFuture(SUCCESS_STATUS);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
new file mode 100644
index 00000000000..4b4d08f558e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.commons.path.NonAlignedFullPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.TreeIntoOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TreeIntoOperatorTest {
+
+  private static final String TEST_SG = "root.test";
+
+  private List<Pair<String, PartialPath>> sourceTargetPathPairList;
+  private Map<PartialPath, Map<String, InputLocation>> 
targetPathToSourceInputLocationMap;
+  private Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap;
+  private Map<String, Boolean> targetDeviceToAlignedMap;
+  private List<TSDataType> inputColumnTypes;
+
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<IMeasurementSchema> measurementSchemas = new 
ArrayList<>();
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  private TreeIntoOperator operator;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(512);
+
+    sourceTargetPathPairList = new ArrayList<>();
+    targetPathToSourceInputLocationMap = new HashMap<>();
+    targetPathToDataTypeMap = new HashMap<>();
+    targetDeviceToAlignedMap = new HashMap<>();
+    inputColumnTypes = new ArrayList<>();
+
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  /**
+   * Common setup method to create and initialize TreeIntoOperator with 
SeriesScanOperator as child.
+   */
+  private TreeIntoOperator createAndInitOperator(int sensorNum) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+
+    // Create SeriesScanOperator for each sensor
+    List<Operator> scanOperators = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<ColumnMerger> columnMergers = new ArrayList<>();
+
+    for (int i = 0; i < sensorNum; i++) {
+      PlanNodeId planNodeId = new PlanNodeId(String.valueOf(i));
+      driverContext.addOperatorContext(i, planNodeId, 
SeriesScanOperator.class.getSimpleName());
+
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor" + i);
+      SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
+      scanOptionsBuilder.withAllSensors(allSensors);
+
+      IFullPath measurementPath =
+          new NonAlignedFullPath(
+              IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + ".device0"),
+              measurementSchemas.get(i));
+
+      SeriesScanOperator seriesScanOperator =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(i),
+              planNodeId,
+              measurementPath,
+              Ordering.ASC,
+              scanOptionsBuilder.build());
+      seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, 
unSeqResources));
+
+      scanOperators.add(seriesScanOperator);
+      dataTypes.add(TSDataType.INT32);
+      columnMergers.add(new SingleColumnMerger(new InputLocation(i, 0), new 
AscTimeComparator()));
+    }
+
+    // Add context for FullOuterTimeJoinOperator
+    driverContext.addOperatorContext(
+        sensorNum,
+        new PlanNodeId(String.valueOf(sensorNum)),
+        FullOuterTimeJoinOperator.class.getSimpleName());
+    // Add context for TreeIntoOperator
+    driverContext.addOperatorContext(
+        sensorNum + 1,
+        new PlanNodeId(String.valueOf(sensorNum + 1)),
+        TestTreeIntoOperator.class.getSimpleName());
+
+    // Join all sensor scans with FullOuterTimeJoinOperator
+    FullOuterTimeJoinOperator timeJoinOperator =
+        new FullOuterTimeJoinOperator(
+            driverContext.getOperatorContexts().get(sensorNum),
+            scanOperators,
+            Ordering.ASC,
+            dataTypes,
+            columnMergers,
+            new AscTimeComparator());
+
+    return new TestTreeIntoOperator(
+        driverContext.getOperatorContexts().get(sensorNum + 1),
+        timeJoinOperator,
+        inputColumnTypes,
+        targetPathToSourceInputLocationMap,
+        targetPathToDataTypeMap,
+        targetDeviceToAlignedMap,
+        sourceTargetPathPairList,
+        instanceNotificationExecutor,
+        100);
+  }
+
+  /** Test scenario 1: small amount of data, should return in single TsBlock */
+  @Test
+  public void testAllResultsInSingleTsBlock() throws Exception {
+    prepareSourceTargetPairs(2);
+    operator = createAndInitOperator(2);
+
+    TsBlock result = null;
+    while (operator.isBlocked().isDone() && operator.hasNext()) {
+      result = operator.next();
+    }
+    assertNotNull(result);
+    assertEquals(2, result.getPositionCount());
+
+    // Verify 3 value columns (source, target, count)
+    assertEquals(3, result.getValueColumnCount());
+    assertTrue(operator.isFinished());
+  }
+
+  /**
+   * Test scenario 2: Result set exceeds maxTsBlockSize, should return in 
batches Create a large
+   * number of path pairs to trigger size limit
+   */
+  @Test
+  public void testResultsExceedMaxTsBlockSize() throws Exception {
+    prepareSourceTargetPairs(10);
+    operator = createAndInitOperator(10);
+
+    int totalRows = 0;
+    // Loop through all batches
+    while (operator.isBlocked().isDone() && operator.hasNext()) {
+      TsBlock result = operator.next();
+      if (result != null && !result.isEmpty()) {
+        int rowCount = result.getPositionCount();
+        assertTrue(rowCount == 4 || rowCount == 2);
+        totalRows += rowCount;
+      }
+    }
+
+    // Verify all data is returned
+    assertEquals(10, totalRows);
+    assertTrue(operator.isFinished());
+  }
+
+  private void prepareSourceTargetPairs(int sensorNum) throws Exception {
+    String sourceDevicePath = TEST_SG + ".device0";
+    String targetDevicePath = TEST_SG + ".new_device0";
+    targetDeviceToAlignedMap.put(targetDevicePath, false);
+    PartialPath targetDevicePartialPath = new PartialPath(targetDevicePath);
+
+    for (int i = 0; i < sensorNum; i++) {
+      String targetMeasurement = "sensor" + i;
+      String sourcePath = sourceDevicePath + "." + targetMeasurement;
+      String targetPath = targetDevicePath + "." + targetMeasurement;
+
+      sourceTargetPathPairList.add(new Pair<>(sourcePath, new 
PartialPath(targetPath)));
+
+      Map<String, InputLocation> inputLocationMap =
+          targetPathToSourceInputLocationMap.computeIfAbsent(
+              targetDevicePartialPath, k -> new HashMap<>());
+      // Each sensor comes from a different input location (different 
SeriesScanOperator)
+      inputLocationMap.put(targetMeasurement, new InputLocation(0, i));
+
+      // Prepare targetPathToDataTypeMap
+      Map<String, TSDataType> dataTypeMap =
+          targetPathToDataTypeMap.computeIfAbsent(targetDevicePartialPath, k 
-> new HashMap<>());
+      dataTypeMap.put(targetMeasurement, TSDataType.INT32);
+    }
+
+    // Prepare inputColumnTypes (one column for each sensor)
+    for (int i = 0; i < sensorNum; i++) {
+      inputColumnTypes.add(TSDataType.INT32);
+    }
+  }
+
+  /**
+   * Test version of TreeIntoOperator that mocks the write operation. Instead 
of actually executing
+   * insertTablets, it returns an immediately completed Future.
+   */
+  private static class TestTreeIntoOperator extends TreeIntoOperator {
+
+    public TestTreeIntoOperator(
+        OperatorContext operatorContext,
+        Operator child,
+        List<TSDataType> inputColumnTypes,
+        Map<PartialPath, Map<String, InputLocation>> 
targetPathToSourceInputLocationMap,
+        Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+        Map<String, Boolean> targetDeviceToAlignedMap,
+        List<Pair<String, PartialPath>> sourceTargetPathPairList,
+        ExecutorService intoOperationExecutor,
+        long statementSizePerLine) {
+      super(
+          operatorContext,
+          child,
+          inputColumnTypes,
+          targetPathToSourceInputLocationMap,
+          targetPathToDataTypeMap,
+          targetDeviceToAlignedMap,
+          sourceTargetPathPairList,
+          intoOperationExecutor,
+          statementSizePerLine);
+    }
+
+    @Override
+    protected void executeInsertMultiTabletsStatement(
+        InsertMultiTabletsStatement insertMultiTabletsStatement) {
+      // Mock the write operation by setting an immediately completed Future
+      writeOperationFuture = immediateFuture(SUCCESS_STATUS);
+    }
+  }
+}

Reply via email to