This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8b81444db3e Improve DeviceViewIntoOperator's return style to pipeline
(#16994)
8b81444db3e is described below
commit 8b81444db3e6871fe0b2eaf4d87d0b31a573241f
Author: shizy <[email protected]>
AuthorDate: Mon Jan 12 14:04:24 2026 +0800
Improve DeviceViewIntoOperator's return style to pipeline (#16994)
---
.../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java | 55 +--
.../relational/it/query/recent/IoTDBCteIT.java | 4 +-
.../operator/process/AbstractIntoOperator.java | 6 +-
.../operator/process/AbstractTreeIntoOperator.java | 8 +-
.../operator/process/DeviceViewIntoOperator.java | 110 ++++-
.../process/InsertTabletStatementGenerator.java | 4 +
.../operator/process/TableIntoOperator.java | 5 +
.../operator/process/TreeIntoOperator.java | 37 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 21 +-
.../queryengine/plan/analyze/SelectIntoUtils.java | 42 +-
.../parameter/DeviceViewIntoPathDescriptor.java | 5 +-
.../planner/plan/parameter/IntoPathDescriptor.java | 5 +-
.../operator/DeviceViewIntoOperatorTest.java | 487 +++++++++++++++++++++
.../execution/operator/TreeIntoOperatorTest.java | 303 +++++++++++++
14 files changed, 962 insertions(+), 130 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index a728ac49e69..5a6b3be3db1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -644,89 +644,74 @@ public class IoTDBSelectIntoIT {
// test INT32
assertTestFail(
"select s_int32 into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_int32[INT32]).");
- assertTestFail(
- "select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_int32[INT32]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0,
value 0]");
// test INT64
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_int64[INT64]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type INT64, timestamp 0, value
0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_int64[INT64]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not
consistent, registered type FLOAT, inserting type INT64, timestamp 0, value
0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_int64[INT64]).");
- assertTestFail(
- "select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_int64[INT64]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0,
value 0]");
// test FLOAT
assertTestFail(
"select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_float[FLOAT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type FLOAT, timestamp 0, value
0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_float[FLOAT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not
consistent, registered type INT64, inserting type FLOAT, timestamp 0, value
0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_float[FLOAT]).");
- assertTestFail(
- "select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_float[FLOAT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0,
value 0.0]");
// test DOUBLE
assertTestFail(
"select s_double into root.sg_type.d_1(s_int32) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value
0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_int64) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not
consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value
0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_float) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not
consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value
0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
- assertTestFail(
- "select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0,
value 0.0]");
// test BOOLEAN
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_int32) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value
true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_int64) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not
consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value
true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_float) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not
consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value
true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_double) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
- assertTestFail(
- "select s_boolean into root.sg_type.d_1(s_text) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is
not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0,
value true]");
// test TEXT
assertTestFail(
"select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type TEXT, timestamp 0, value
text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not
consistent, registered type INT64, inserting type TEXT, timestamp 0, value
text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not
consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value
text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is
not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value
text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0,
value text0]");
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java
index b79a5b7b7fa..7f29e3f0197 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java
@@ -470,7 +470,7 @@ public class IoTDBCteIT {
@Test
public void testConcurrentCteQueries() throws Exception {
- final int threadCount = 10;
+ final int threadCount = 3;
final int queriesPerThread = 20;
final AtomicInteger successCount = new AtomicInteger(0);
final AtomicInteger failureCount = new AtomicInteger(0);
@@ -582,7 +582,7 @@ public class IoTDBCteIT {
int totalQueries = threadCount * queriesPerThread;
assertEquals("All queries should succeed", totalQueries,
successCount.get());
assertEquals("No queries should fail", 0, failureCount.get());
- assertEquals("Total query count should match", 340, totalCount.get());
+ assertEquals("Total query count should match", 102, totalCount.get());
}
private static void prepareData() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
index 444c5001975..425449469b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
@@ -102,7 +102,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
checkLastWriteOperation();
if (!processTsBlock(cachedTsBlock)) {
- return null;
+ return tryToReturnPartialResult();
}
cachedTsBlock = null;
if (child.hasNextWithTimer()) {
@@ -110,7 +110,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
processTsBlock(inputTsBlock);
// call child.next only once
- return null;
+ return tryToReturnPartialResult();
} else {
return tryToReturnResultTsBlock();
}
@@ -204,6 +204,8 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
protected abstract TsBlock tryToReturnResultTsBlock();
+ protected abstract TsBlock tryToReturnPartialResult();
+
protected abstract void resetInsertTabletStatementGenerators();
private void setMaxRowNumberInStatement(long statementSizePerLine) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java
index a72907b977b..0d15c72cfa1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import com.google.common.util.concurrent.Futures;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
import java.util.ArrayList;
import java.util.List;
@@ -39,15 +40,18 @@ import java.util.concurrent.ExecutorService;
public abstract class AbstractTreeIntoOperator extends AbstractIntoOperator {
protected List<InsertTabletStatementGenerator>
insertTabletStatementGenerators;
+ protected final TsBlockBuilder resultTsBlockBuilder;
protected AbstractTreeIntoOperator(
OperatorContext operatorContext,
Operator child,
List<TSDataType> inputColumnTypes,
ExecutorService intoOperationExecutor,
- long statementSizePerLine) {
+ long statementSizePerLine,
+ List<TSDataType> outputDataTypes) {
super(operatorContext, child, inputColumnTypes, intoOperationExecutor,
statementSizePerLine);
this.maxReturnSize =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
}
protected static List<InsertTabletStatementGenerator>
constructInsertTabletStatementGenerators(
@@ -134,7 +138,7 @@ public abstract class AbstractTreeIntoOperator extends
AbstractIntoOperator {
() -> client.insertTablets(insertMultiTabletsStatement),
writeOperationExecutor);
}
- private boolean existFullStatement(
+ protected boolean existFullStatement(
List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
for (InsertTabletStatementGenerator generator :
insertTabletStatementGenerators) {
if (generator.isFull()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
index 6321a52914b..727ef26c1c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
@@ -22,22 +22,26 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.IntoProcessException;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -48,6 +52,7 @@ public class DeviceViewIntoOperator extends
AbstractTreeIntoOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(DeviceViewIntoOperator.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final Map<String, Map<PartialPath, Map<String, InputLocation>>>
deviceToTargetPathSourceInputLocationMap;
@@ -59,7 +64,7 @@ public class DeviceViewIntoOperator extends
AbstractTreeIntoOperator {
private final int deviceColumnIndex;
private String currentDevice;
- private final TsBlockBuilder resultTsBlockBuilder;
+ private int batchedRowCount = 0;
@SuppressWarnings("squid:S107")
public DeviceViewIntoOperator(
@@ -74,18 +79,19 @@ public class DeviceViewIntoOperator extends
AbstractTreeIntoOperator {
Map<String, InputLocation> sourceColumnToInputLocationMap,
ExecutorService intoOperationExecutor,
long statementSizePerLine) {
- super(operatorContext, child, inputColumnTypes, intoOperationExecutor,
statementSizePerLine);
+ super(
+ operatorContext,
+ child,
+ inputColumnTypes,
+ intoOperationExecutor,
+ statementSizePerLine,
+ ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList()));
this.deviceToTargetPathSourceInputLocationMap =
deviceToTargetPathSourceInputLocationMap;
this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
this.deviceToSourceTargetPathPairListMap =
deviceToSourceTargetPathPairListMap;
-
- List<TSDataType> outputDataTypes =
- ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
- .map(ColumnHeader::getColumnType)
- .collect(Collectors.toList());
- this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
-
this.deviceColumnIndex =
sourceColumnToInputLocationMap.get(ColumnHeaderConstant.DEVICE).getValueColumnIndex();
}
@@ -102,7 +108,12 @@ public class DeviceViewIntoOperator extends
AbstractTreeIntoOperator {
constructInsertMultiTabletsStatement(false);
updateResultTsBlock();
- insertTabletStatementGenerators =
constructInsertTabletStatementGeneratorsByDevice(device);
+ if (insertMultiTabletsStatement != null ||
insertTabletStatementGenerators == null) {
+ insertTabletStatementGenerators =
constructInsertTabletStatementGeneratorsByDevice(device);
+ } else {
+ insertTabletStatementGenerators.addAll(
+ constructInsertTabletStatementGeneratorsByDevice(device));
+ }
currentDevice = device;
if (insertMultiTabletsStatement != null) {
@@ -115,8 +126,14 @@ public class DeviceViewIntoOperator extends
AbstractTreeIntoOperator {
int readIndex = 0;
while (readIndex < inputTsBlock.getPositionCount()) {
int lastReadIndex = readIndex;
- for (InsertTabletStatementGenerator generator :
insertTabletStatementGenerators) {
- lastReadIndex = Math.max(lastReadIndex,
generator.processTsBlock(inputTsBlock, readIndex));
+ if (!insertTabletStatementGenerators.isEmpty()) {
+ InsertTabletStatementGenerator generatorOfCurrentDevice =
+
insertTabletStatementGenerators.get(insertTabletStatementGenerators.size() - 1);
+ int rowCountBeforeProcess = generatorOfCurrentDevice.getRowCount();
+ lastReadIndex =
+ Math.max(
+ lastReadIndex,
generatorOfCurrentDevice.processTsBlock(inputTsBlock, readIndex));
+ batchedRowCount += generatorOfCurrentDevice.getRowCount() -
rowCountBeforeProcess;
}
readIndex = lastReadIndex;
if (insertMultiTabletsInternally(true)) {
@@ -143,6 +160,16 @@ public class DeviceViewIntoOperator extends
AbstractTreeIntoOperator {
return resultTsBlockBuilder.build();
}
+ @Override
+ protected TsBlock tryToReturnPartialResult() {
+ if (resultTsBlockBuilder.isFull()) {
+ TsBlock res = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return res;
+ }
+ return null;
+ }
+
private List<InsertTabletStatementGenerator>
constructInsertTabletStatementGeneratorsByDevice(
String currentDevice) {
Map<PartialPath, Map<String, InputLocation>>
targetPathToSourceInputLocationMap =
@@ -192,4 +219,61 @@ public class DeviceViewIntoOperator extends
AbstractTreeIntoOperator {
.mapToLong(InsertTabletStatementGenerator::ramBytesUsed)
.sum());
}
+
+ @Override
+ protected InsertMultiTabletsStatement
constructInsertMultiTabletsStatement(boolean needCheck) {
+ if (insertTabletStatementGenerators == null) {
+ return null;
+ }
+ boolean hasFullStatement =
existFullStatement(insertTabletStatementGenerators);
+ if (needCheck) {
+ // When needCheck is true, we only proceed if there already exists a
full statement.
+ if (!hasFullStatement) {
+ return null;
+ }
+ } else {
+ // When needCheck is false, we may delay flushing to accumulate more rows
+ // if the batch is not yet at the configured row limit and the child has
more data.
+ try {
+ if (batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit()
+ && child.hasNextWithTimer()) {
+ return null;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IntoProcessException(e.getMessage());
+ } catch (Exception e) {
+ throw new IntoProcessException(e.getMessage());
+ }
+ }
+ List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
+ for (InsertTabletStatementGenerator generator :
insertTabletStatementGenerators) {
+ if (!generator.isEmpty()) {
+
insertTabletStatementList.add(generator.constructInsertTabletStatement());
+ }
+ }
+ if (insertTabletStatementList.isEmpty()) {
+ return null;
+ }
+
+ InsertMultiTabletsStatement insertMultiTabletsStatement = new
InsertMultiTabletsStatement();
+
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+ batchedRowCount = 0;
+ return insertMultiTabletsStatement;
+ }
+
+ @Override
+ protected long findWritten(String device, String measurement) {
+ for (int i = insertTabletStatementGenerators.size() - 1; i >= 0; i--) {
+ InsertTabletStatementGenerator generator =
insertTabletStatementGenerators.get(i);
+ if (!Objects.equals(generator.getDevice(), device)) {
+ continue;
+ }
+ long writtenCountInCurrentGenerator =
generator.getWrittenCount(measurement);
+ if (writtenCountInCurrentGenerator >= 0) {
+ return writtenCountInCurrentGenerator;
+ }
+ }
+ return 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java
index b2fae4c25b1..47ff31573e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java
@@ -169,6 +169,10 @@ public abstract class InsertTabletStatementGenerator
implements Accountable {
return rowCount == 0;
}
+ public int getRowCount() {
+ return rowCount;
+ }
+
public String getDevice() {
return devicePath.toString();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java
index d9af3229ea2..40b1781b69d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java
@@ -114,6 +114,11 @@ public class TableIntoOperator extends
AbstractIntoOperator {
return constructResultTsBlock();
}
+ @Override
+ protected TsBlock tryToReturnPartialResult() {
+ return null;
+ }
+
@Override
public long ramBytesUsed() {
return INSTANCE_SIZE
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java
index 32838483cef..bed98c42086 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java
@@ -31,7 +31,6 @@ import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
@@ -49,6 +48,8 @@ public class TreeIntoOperator extends
AbstractTreeIntoOperator {
private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
+ private int outputIndex = 0;
+
@SuppressWarnings("squid:S107")
public TreeIntoOperator(
OperatorContext operatorContext,
@@ -60,7 +61,15 @@ public class TreeIntoOperator extends
AbstractTreeIntoOperator {
List<Pair<String, PartialPath>> sourceTargetPathPairList,
ExecutorService intoOperationExecutor,
long statementSizePerLine) {
- super(operatorContext, child, inputColumnTypes, intoOperationExecutor,
statementSizePerLine);
+ super(
+ operatorContext,
+ child,
+ inputColumnTypes,
+ intoOperationExecutor,
+ statementSizePerLine,
+ ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList()));
this.sourceTargetPathPairList = sourceTargetPathPairList;
insertTabletStatementGenerators =
constructInsertTabletStatementGenerators(
@@ -98,19 +107,23 @@ public class TreeIntoOperator extends
AbstractTreeIntoOperator {
return null;
}
- finished = true;
- return constructResultTsBlock();
+ TsBlock res = constructResultTsBlock();
+ finished = (outputIndex == sourceTargetPathPairList.size());
+ return res;
+ }
+
+ @Override
+ protected TsBlock tryToReturnPartialResult() {
+ return null;
}
private TsBlock constructResultTsBlock() {
- List<TSDataType> outputDataTypes =
- ColumnHeaderConstant.selectIntoColumnHeaders.stream()
- .map(ColumnHeader::getColumnType)
- .collect(Collectors.toList());
- TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
TimeColumnBuilder timeColumnBuilder =
resultTsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] columnBuilders =
resultTsBlockBuilder.getValueColumnBuilders();
- for (Pair<String, PartialPath> sourceTargetPathPair :
sourceTargetPathPairList) {
+ for (int size = sourceTargetPathPairList.size();
+ outputIndex < size && !resultTsBlockBuilder.isFull();
+ outputIndex++) {
+ Pair<String, PartialPath> sourceTargetPathPair =
sourceTargetPathPairList.get(outputIndex);
timeColumnBuilder.writeLong(0);
columnBuilders[0].writeBinary(
new Binary(sourceTargetPathPair.left, TSFileConfig.STRING_CHARSET));
@@ -122,7 +135,9 @@ public class TreeIntoOperator extends
AbstractTreeIntoOperator {
sourceTargetPathPair.right.getMeasurement()));
resultTsBlockBuilder.declarePosition();
}
- return resultTsBlockBuilder.build();
+ TsBlock res = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return res;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 4cae2c0f288..32bc088ff6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2276,7 +2276,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
intoComponent.validate(sourceDevices, sourceColumns);
DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = new
DeviceViewIntoPathDescriptor();
- PathPatternTree targetPathTree = new PathPatternTree();
IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator =
intoComponent.getIntoDeviceMeasurementIterator();
for (PartialPath sourceDevice : sourceDevices) {
@@ -2301,7 +2300,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
deviceViewIntoPathDescriptor.specifyTargetDeviceMeasurement(
sourceDevice, targetDevice, sourceColumn.getExpressionString(),
targetMeasurement);
- targetPathTree.appendFullPath(targetDevice, targetMeasurement);
deviceViewIntoPathDescriptor.recordSourceColumnDataType(
sourceColumn.getExpressionString(),
analysis.getType(sourceColumn));
@@ -2311,13 +2309,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
intoDeviceMeasurementIterator.nextDevice();
}
deviceViewIntoPathDescriptor.validate();
-
- // fetch schema of target paths
- long startTime = System.nanoTime();
- ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree,
true, context, false);
- QueryPlanCostMetricSet.getInstance()
- .recordTreePlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
- deviceViewIntoPathDescriptor.bindType(targetSchemaTree);
+ deviceViewIntoPathDescriptor.bindType();
analysis.setDeviceViewIntoPathDescriptor(deviceViewIntoPathDescriptor);
}
@@ -2341,7 +2333,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
intoComponent.validate(sourceColumns);
IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
- PathPatternTree targetPathTree = new PathPatternTree();
IntoComponent.IntoPathIterator intoPathIterator =
intoComponent.getIntoPathIterator();
for (Pair<Expression, String> pair : outputExpressions) {
Expression sourceExpression = pair.left;
@@ -2381,21 +2372,13 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
intoPathDescriptor.specifyDeviceAlignment(
targetPath.getDevicePath().toString(), isAlignedDevice);
- targetPathTree.appendFullPath(targetPath);
intoPathDescriptor.recordSourceColumnDataType(
sourceColumn, analysis.getType(sourceExpression));
intoPathIterator.next();
}
intoPathDescriptor.validate();
-
- // fetch schema of target paths
- long startTime = System.nanoTime();
- ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree,
true, context, false);
- updateSchemaTreeByViews(analysis, targetSchemaTree, context, false);
- QueryPlanCostMetricSet.getInstance()
- .recordTreePlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
- intoPathDescriptor.bindType(targetSchemaTree);
+ intoPathDescriptor.bindType();
analysis.setIntoPathDescriptor(intoPathDescriptor);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
index 9245d2eb634..7e23b0b4590 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
@@ -21,12 +21,9 @@ package org.apache.iotdb.db.queryengine.plan.analyze;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Pair;
@@ -37,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
-import static com.google.common.base.Preconditions.checkState;
import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
import static
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString;
@@ -134,48 +130,14 @@ public class SelectIntoUtils {
public static List<Pair<String, PartialPath>>
bindTypeForSourceTargetPathPairList(
List<Pair<String, PartialPath>> sourceTargetPathPairList,
- Map<String, TSDataType> sourceToDataTypeMap,
- ISchemaTree targetSchemaTree) {
+ Map<String, TSDataType> sourceToDataTypeMap) {
List<Pair<String, PartialPath>> sourceTypeBoundTargetPathPairList = new
ArrayList<>();
for (Pair<String, PartialPath> sourceTargetPathPair :
sourceTargetPathPairList) {
String sourceColumn = sourceTargetPathPair.left;
TSDataType sourceColumnType = sourceToDataTypeMap.get(sourceColumn);
-
MeasurementPath targetPathWithSchema;
PartialPath targetPath = sourceTargetPathPair.right;
- List<MeasurementPath> actualTargetPaths =
- targetSchemaTree.searchMeasurementPaths(targetPath).left;
- if (actualTargetPaths.isEmpty()) {
- targetPathWithSchema = new MeasurementPath(targetPath,
sourceColumnType);
- } else {
- checkState(actualTargetPaths.size() == 1);
- MeasurementPath actualTargetPath = actualTargetPaths.get(0);
- if (actualTargetPath.getMeasurementSchema().isLogicalView()) {
- LogicalViewSchema viewSchema =
- (LogicalViewSchema) actualTargetPath.getMeasurementSchema();
- if (viewSchema.isWritable()) {
- MeasurementPath viewSourceSeriesPath =
- targetSchemaTree
-
.searchMeasurementPaths(viewSchema.getSourcePathIfWritable())
- .left
- .get(0);
- actualTargetPath =
- new MeasurementPath(targetPath,
viewSourceSeriesPath.getSeriesType());
-
actualTargetPath.setUnderAlignedEntity(viewSourceSeriesPath.isUnderAlignedEntity());
- } else {
- throw new SemanticException(
- String.format("View %s doesn't support data insertion.",
targetPath));
- }
- }
- if (!TypeInferenceUtils.canAutoCast(sourceColumnType,
actualTargetPath.getSeriesType())) {
- throw new SemanticException(
- String.format(
- "The data type of target path (%s[%s]) is not compatible
with the data type of source column (%s[%s]).",
- targetPath, actualTargetPath.getSeriesType(), sourceColumn,
sourceColumnType));
- }
- // no need to check alignment, because the interface is common
- targetPathWithSchema = actualTargetPath;
- }
+ targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType);
sourceTypeBoundTargetPathPairList.add(new Pair<>(sourceColumn,
targetPathWithSchema));
}
return sourceTypeBoundTargetPathPairList;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
index eed727f8135..a34bf2e1b79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -100,7 +99,7 @@ public class DeviceViewIntoPathDescriptor {
}
}
- public void bindType(ISchemaTree targetSchemaTree) {
+ public void bindType() {
Map<String, List<Pair<String, PartialPath>>>
deviceToSourceTypeBoundTargetPathPairListMap =
new HashMap<>();
for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
@@ -108,7 +107,7 @@ public class DeviceViewIntoPathDescriptor {
deviceToSourceTypeBoundTargetPathPairListMap.put(
sourceTargetEntry.getKey(),
SelectIntoUtils.bindTypeForSourceTargetPathPairList(
- sourceTargetEntry.getValue(), sourceToDataTypeMap,
targetSchemaTree));
+ sourceTargetEntry.getValue(), sourceToDataTypeMap));
}
this.deviceToSourceTargetPathPairListMap =
deviceToSourceTypeBoundTargetPathPairListMap;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
index bfb03b4ea5a..5dfc1e50428 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -101,10 +100,10 @@ public class IntoPathDescriptor {
}
}
- public void bindType(ISchemaTree targetSchemaTree) {
+ public void bindType() {
this.sourceTargetPathPairList =
SelectIntoUtils.bindTypeForSourceTargetPathPairList(
- sourceTargetPathPairList, sourceToDataTypeMap, targetSchemaTree);
+ sourceTargetPathPairList, sourceToDataTypeMap);
}
public List<Pair<String, PartialPath>> getSourceTargetPathPairList() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
new file mode 100644
index 00000000000..f7fe5fcf367
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.commons.path.NonAlignedFullPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import
org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class DeviceViewIntoOperatorTest {
+
+ private static final String TEST_SG = "root.test";
+
+ private Map<String, Map<PartialPath, Map<String, InputLocation>>>
+ deviceToTargetPathSourceInputLocationMap;
+ private Map<String, Map<PartialPath, Map<String, TSDataType>>>
deviceToTargetPathDataTypeMap;
+ private Map<String, Boolean> targetDeviceToAlignedMap;
+ private Map<String, List<Pair<String, PartialPath>>>
deviceToSourceTargetPathPairListMap;
+ private Map<String, InputLocation> sourceColumnToInputLocationMap;
+ private List<TSDataType> inputColumnTypes;
+ private List<Integer> deviceColumnIndex;
+
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<IMeasurementSchema> measurementSchemas = new
ArrayList<>();
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ private DeviceViewIntoOperator operator;
+
+ @Before
+ public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+
IoTDBDescriptor.getInstance().getConfig().setSelectIntoInsertTabletPlanRowLimit(4);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(512);
+
+ deviceToTargetPathSourceInputLocationMap = new HashMap<>();
+ deviceToTargetPathDataTypeMap = new HashMap<>();
+ targetDeviceToAlignedMap = new HashMap<>();
+ deviceToSourceTargetPathPairListMap = new HashMap<>();
+ sourceColumnToInputLocationMap = new HashMap<>();
+ inputColumnTypes = new ArrayList<>();
+ deviceColumnIndex = new ArrayList<>();
+
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources, TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ private DeviceViewIntoOperator createAndInitOperatorForSingleDevices(int
sensorNum) {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+
+ // Create SeriesScanOperator for each sensor
+ List<Operator> scanOperators = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<ColumnMerger> columnMergers = new ArrayList<>();
+
+ for (int i = 0; i < sensorNum; i++) {
+ scanOperators.add(
+ createSeriesScanOperator(driverContext, i, "device0", i,
measurementSchemas.get(i)));
+ dataTypes.add(TSDataType.INT32);
+ columnMergers.add(new SingleColumnMerger(new InputLocation(i, 0), new
AscTimeComparator()));
+ }
+
+ FullOuterTimeJoinOperator timeJoinOperator =
+ createTimeJoinOperator(driverContext, sensorNum, scanOperators,
dataTypes, columnMergers);
+
+ // Prepare data types for SingleDeviceViewOperator (device column + sensor
columns)
+ inputColumnTypes.add(TSDataType.TEXT); // Device column
+ for (int i = 0; i < sensorNum; i++) {
+ deviceColumnIndex.add(i + 1);
+ inputColumnTypes.add(TSDataType.INT32); // Sensor columns
+ }
+
+ SingleDeviceViewOperator singleDeviceViewOperator =
+ new SingleDeviceViewOperator(
+ addOperatorContext(driverContext, sensorNum + 1,
SingleDeviceViewOperator.class),
+ TEST_SG + ".device0",
+ timeJoinOperator,
+ deviceColumnIndex,
+ inputColumnTypes);
+
+ return createTestDeviceViewIntoOperator(
+ driverContext,
+ sensorNum + 2,
+ singleDeviceViewOperator,
+ inputColumnTypes,
+ instanceNotificationExecutor);
+ }
+
+ private DeviceViewIntoOperator createAndInitOperatorForMultipleDevices(
+ int deviceNum, int sensorNum) {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+
+ List<IDeviceID> devices = new ArrayList<>();
+ List<Operator> deviceOperators = new ArrayList<>();
+ List<List<Integer>> deviceColumnIndexes = new ArrayList<>();
+
+ List<Integer> singleDeviceColumnIndex = new ArrayList<>();
+ for (int i = 0; i < sensorNum; i++) {
+ singleDeviceColumnIndex.add(i + 1);
+ }
+
+ int operatorIndex = 0;
+ for (int deviceIdx = 0; deviceIdx < deviceNum; deviceIdx++) {
+ String device = "device" + deviceIdx;
+
+ List<Operator> scanOperators = new ArrayList<>();
+ List<TSDataType> scanDataTypes = new ArrayList<>();
+ List<ColumnMerger> columnMergers = new ArrayList<>();
+
+ for (int sensorIdx = 0; sensorIdx < sensorNum; sensorIdx++) {
+ scanOperators.add(
+ createSeriesScanOperator(
+ driverContext,
+ operatorIndex,
+ device,
+ sensorIdx,
+ measurementSchemas.get(sensorIdx)));
+ scanDataTypes.add(TSDataType.INT32);
+ columnMergers.add(
+ new SingleColumnMerger(new InputLocation(sensorIdx, 0), new
AscTimeComparator()));
+ operatorIndex++;
+ }
+
+ FullOuterTimeJoinOperator timeJoinOperator =
+ createTimeJoinOperator(
+ driverContext, operatorIndex, scanOperators, scanDataTypes,
columnMergers);
+ operatorIndex++;
+
+ devices.add(IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + "." +
device));
+ deviceOperators.add(timeJoinOperator);
+ deviceColumnIndexes.add(singleDeviceColumnIndex);
+ }
+
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.TEXT); // Device column
+ for (int i = 0; i < sensorNum; i++) {
+ dataTypes.add(TSDataType.INT32); // Sensor columns
+ }
+
+ DeviceViewOperator deviceViewOperator =
+ new DeviceViewOperator(
+ addOperatorContext(driverContext, operatorIndex,
DeviceViewOperator.class),
+ devices,
+ deviceOperators,
+ deviceColumnIndexes,
+ dataTypes);
+
+ inputColumnTypes.add(TSDataType.TEXT); // Device column
+ for (int i = 0; i < sensorNum; i++) {
+ deviceColumnIndex.add(i + 1);
+ inputColumnTypes.add(TSDataType.INT32); // Sensor columns
+ }
+
+ return createTestDeviceViewIntoOperator(
+ driverContext,
+ operatorIndex + 1,
+ deviceViewOperator,
+ inputColumnTypes,
+ instanceNotificationExecutor);
+ }
+
+ /** Test scenario 1: single device with small amount of data, should return
in single TsBlock */
+ @Test
+ public void testSingleDeviceSmallData() throws Exception {
+ prepareDeviceData("device0", 2);
+ operator = createAndInitOperatorForSingleDevices(2);
+
+ TsBlock result = null;
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ result = operator.next();
+ }
+ assertNotNull(result);
+ assertEquals(2, result.getPositionCount());
+
+ // Verify 4 value columns (device, source, target, count)
+ assertEquals(4, result.getValueColumnCount());
+ assertTrue(operator.isFinished());
+ }
+
+ /** Test scenario 2: Single device with result set exceeds maxTsBlockSize */
+ @Test
+ public void testSingleDeviceExceedsMaxTsBlockSize() throws Exception {
+ prepareDeviceData("device0", 10);
+ operator = createAndInitOperatorForSingleDevices(10);
+
+ TsBlock result = null;
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ result = operator.next();
+ }
+ assertNotNull(result);
+ assertEquals(10, result.getPositionCount());
+
+ // Verify 4 value columns (device, source, target, count)
+ assertEquals(4, result.getValueColumnCount());
+ assertTrue(operator.isFinished());
+ }
+
+ /** Test scenario 3: Multiple device with small amount of data */
+ @Test
+ public void testMultipleDeviceSmallData() throws Exception {
+ prepareDeviceData("device0", 1);
+ prepareDeviceData("device1", 1);
+ operator = createAndInitOperatorForMultipleDevices(2, 1);
+
+ TsBlock result = null;
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ result = operator.next();
+ }
+ assertNotNull(result);
+ assertEquals(2, result.getPositionCount());
+
+ // Verify 4 value columns (device, source, target, count)
+ assertEquals(4, result.getValueColumnCount());
+ assertTrue(operator.isFinished());
+ }
+
+ /** Test scenario 4: Multiple devices, total size exceeds maxTsBlockSize */
+ @Test
+ public void testMultipleDevicesExceedsTsBlockSize() throws Exception {
+ prepareDeviceData("device0", 2);
+ prepareDeviceData("device1", 2);
+ prepareDeviceData("device2", 2);
+ operator = createAndInitOperatorForMultipleDevices(3, 2);
+
+ int totalRows = 0;
+ int totalBatches = 0;
+ // Loop through all batches
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ TsBlock result = operator.next();
+ if (result != null && !result.isEmpty()) {
+ totalRows += result.getPositionCount();
+ totalBatches += 1;
+ }
+ }
+
+ assertEquals(6, totalRows);
+ assertEquals(2, totalBatches);
+ assertTrue(operator.isFinished());
+ }
+
+ /**
+ * Helper method: Prepare test data for specified device
+ *
+ * @param device Device name
+ * @param sensorNum Number of path pairs for this device
+ */
+ private void prepareDeviceData(String device, int sensorNum) throws
Exception {
+ String sourceDevicePath = TEST_SG + "." + device;
+ String targetDevicePath = TEST_SG + ".new_" + device;
+
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputMap =
+ deviceToTargetPathSourceInputLocationMap.computeIfAbsent(
+ sourceDevicePath, k -> new HashMap<>());
+ Map<PartialPath, Map<String, TSDataType>> targetDataTypeMap =
+ deviceToTargetPathDataTypeMap.computeIfAbsent(sourceDevicePath, k ->
new HashMap<>());
+ List<Pair<String, PartialPath>> pairList =
+ deviceToSourceTargetPathPairListMap.computeIfAbsent(
+ sourceDevicePath, k -> new ArrayList<>());
+
+ Map<String, InputLocation> columnToInputLocationMap = new HashMap<>();
+ Map<String, TSDataType> dataTypeMap = new HashMap<>();
+
+ columnToInputLocationMap.put(ColumnHeaderConstant.DEVICE, new
InputLocation(0, 0));
+ dataTypeMap.put(ColumnHeaderConstant.DEVICE, TSDataType.TEXT);
+ sourceColumnToInputLocationMap.put(ColumnHeaderConstant.DEVICE, new
InputLocation(0, 0));
+
+ for (int i = 0; i < sensorNum; i++) {
+ String targetMeasurement = "sensor" + i;
+ String targetPath = targetDevicePath + "." + targetMeasurement;
+ PartialPath targetPartialPath = new PartialPath(targetPath);
+
+ pairList.add(new Pair<>(targetMeasurement, targetPartialPath));
+ columnToInputLocationMap.put(targetMeasurement, new InputLocation(0, i +
1));
+ dataTypeMap.put(targetMeasurement, TSDataType.INT32);
+
+ sourceColumnToInputLocationMap.put(targetMeasurement, new
InputLocation(0, i + 1));
+ }
+ PartialPath targetDevicePartialPath = new PartialPath(targetDevicePath);
+ targetPathToSourceInputMap.put(targetDevicePartialPath,
columnToInputLocationMap);
+ targetDataTypeMap.put(targetDevicePartialPath, dataTypeMap);
+
+ targetDeviceToAlignedMap.put(targetDevicePath, false);
+ }
+
+ private SeriesScanOperator createSeriesScanOperator(
+ DriverContext driverContext,
+ int index,
+ String device,
+ int sensorIdx,
+ IMeasurementSchema measurementSchema) {
+ PlanNodeId planNodeId = new PlanNodeId(String.valueOf(index));
+ driverContext.addOperatorContext(index, planNodeId,
SeriesScanOperator.class.getSimpleName());
+
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor" + sensorIdx);
+ SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
+ scanOptionsBuilder.withAllSensors(allSensors);
+
+ IFullPath measurementPath =
+ new NonAlignedFullPath(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + "." + device),
measurementSchema);
+
+ SeriesScanOperator seriesScanOperator =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(index),
+ planNodeId,
+ measurementPath,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources,
unSeqResources));
+ return seriesScanOperator;
+ }
+
+ private FullOuterTimeJoinOperator createTimeJoinOperator(
+ DriverContext driverContext,
+ int index,
+ List<Operator> scanOperators,
+ List<TSDataType> dataTypes,
+ List<ColumnMerger> columnMergers) {
+ addOperatorContext(driverContext, index, FullOuterTimeJoinOperator.class);
+ return new FullOuterTimeJoinOperator(
+ driverContext.getOperatorContexts().get(index),
+ scanOperators,
+ Ordering.ASC,
+ dataTypes,
+ columnMergers,
+ new AscTimeComparator());
+ }
+
+ private OperatorContext addOperatorContext(
+ DriverContext driverContext, int index, Class<?> operatorClass) {
+ driverContext.addOperatorContext(
+ index, new PlanNodeId(String.valueOf(index)),
operatorClass.getSimpleName());
+ return driverContext.getOperatorContexts().get(index);
+ }
+
+ private DeviceViewIntoOperator createTestDeviceViewIntoOperator(
+ DriverContext driverContext,
+ int index,
+ Operator child,
+ List<TSDataType> types,
+ ExecutorService executor) {
+ addOperatorContext(driverContext, index, TestDeviceViewIntoOperator.class);
+ return new TestDeviceViewIntoOperator(
+ driverContext.getOperatorContexts().get(index),
+ child,
+ types,
+ deviceToTargetPathSourceInputLocationMap,
+ deviceToTargetPathDataTypeMap,
+ targetDeviceToAlignedMap,
+ deviceToSourceTargetPathPairListMap,
+ sourceColumnToInputLocationMap,
+ executor,
+ 100);
+ }
+
+ /**
+ * Test version of DeviceViewIntoOperator that mocks the write operation.
Instead of actually
+ * executing insertTablets, it returns an immediately completed Future.
+ */
+ private static class TestDeviceViewIntoOperator extends
DeviceViewIntoOperator {
+
+ public TestDeviceViewIntoOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ List<TSDataType> inputColumnTypes,
+ Map<String, Map<PartialPath, Map<String, InputLocation>>>
+ deviceToTargetPathSourceInputLocationMap,
+ Map<String, Map<PartialPath, Map<String, TSDataType>>>
deviceToTargetPathDataTypeMap,
+ Map<String, Boolean> targetDeviceToAlignedMap,
+ Map<String, List<Pair<String, PartialPath>>>
deviceToSourceTargetPathPairListMap,
+ Map<String, InputLocation> sourceColumnToInputLocationMap,
+ ExecutorService intoOperationExecutor,
+ long statementSizePerLine) {
+ super(
+ operatorContext,
+ child,
+ inputColumnTypes,
+ deviceToTargetPathSourceInputLocationMap,
+ deviceToTargetPathDataTypeMap,
+ targetDeviceToAlignedMap,
+ deviceToSourceTargetPathPairListMap,
+ sourceColumnToInputLocationMap,
+ intoOperationExecutor,
+ statementSizePerLine);
+ }
+
+ @Override
+ protected void executeInsertMultiTabletsStatement(
+ InsertMultiTabletsStatement insertMultiTabletsStatement) {
+ // Mock the write operation by setting an immediately completed Future
+ writeOperationFuture = immediateFuture(SUCCESS_STATUS);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
new file mode 100644
index 00000000000..4b4d08f558e
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.commons.path.NonAlignedFullPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.TreeIntoOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import
org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TreeIntoOperatorTest {
+
+ private static final String TEST_SG = "root.test";
+
+ private List<Pair<String, PartialPath>> sourceTargetPathPairList;
+ private Map<PartialPath, Map<String, InputLocation>>
targetPathToSourceInputLocationMap;
+ private Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap;
+ private Map<String, Boolean> targetDeviceToAlignedMap;
+ private List<TSDataType> inputColumnTypes;
+
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<IMeasurementSchema> measurementSchemas = new
ArrayList<>();
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ private TreeIntoOperator operator;
+
+ @Before
+ public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(512);
+
+ sourceTargetPathPairList = new ArrayList<>();
+ targetPathToSourceInputLocationMap = new HashMap<>();
+ targetPathToDataTypeMap = new HashMap<>();
+ targetDeviceToAlignedMap = new HashMap<>();
+ inputColumnTypes = new ArrayList<>();
+
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources, TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ /**
+ * Common setup method to create and initialize TreeIntoOperator with
SeriesScanOperator as child.
+ */
+ private TreeIntoOperator createAndInitOperator(int sensorNum) {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+
+ // Create SeriesScanOperator for each sensor
+ List<Operator> scanOperators = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<ColumnMerger> columnMergers = new ArrayList<>();
+
+ for (int i = 0; i < sensorNum; i++) {
+ PlanNodeId planNodeId = new PlanNodeId(String.valueOf(i));
+ driverContext.addOperatorContext(i, planNodeId,
SeriesScanOperator.class.getSimpleName());
+
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor" + i);
+ SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
+ scanOptionsBuilder.withAllSensors(allSensors);
+
+ IFullPath measurementPath =
+ new NonAlignedFullPath(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + ".device0"),
+ measurementSchemas.get(i));
+
+ SeriesScanOperator seriesScanOperator =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(i),
+ planNodeId,
+ measurementPath,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources,
unSeqResources));
+
+ scanOperators.add(seriesScanOperator);
+ dataTypes.add(TSDataType.INT32);
+ columnMergers.add(new SingleColumnMerger(new InputLocation(i, 0), new
AscTimeComparator()));
+ }
+
+ // Add context for FullOuterTimeJoinOperator
+ driverContext.addOperatorContext(
+ sensorNum,
+ new PlanNodeId(String.valueOf(sensorNum)),
+ FullOuterTimeJoinOperator.class.getSimpleName());
+ // Add context for TreeIntoOperator
+ driverContext.addOperatorContext(
+ sensorNum + 1,
+ new PlanNodeId(String.valueOf(sensorNum + 1)),
+ TestTreeIntoOperator.class.getSimpleName());
+
+ // Join all sensor scans with FullOuterTimeJoinOperator
+ FullOuterTimeJoinOperator timeJoinOperator =
+ new FullOuterTimeJoinOperator(
+ driverContext.getOperatorContexts().get(sensorNum),
+ scanOperators,
+ Ordering.ASC,
+ dataTypes,
+ columnMergers,
+ new AscTimeComparator());
+
+ return new TestTreeIntoOperator(
+ driverContext.getOperatorContexts().get(sensorNum + 1),
+ timeJoinOperator,
+ inputColumnTypes,
+ targetPathToSourceInputLocationMap,
+ targetPathToDataTypeMap,
+ targetDeviceToAlignedMap,
+ sourceTargetPathPairList,
+ instanceNotificationExecutor,
+ 100);
+ }
+
+ /** Test scenario 1: small amount of data, should return in single TsBlock */
+ @Test
+ public void testAllResultsInSingleTsBlock() throws Exception {
+ prepareSourceTargetPairs(2);
+ operator = createAndInitOperator(2);
+
+ TsBlock result = null;
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ result = operator.next();
+ }
+ assertNotNull(result);
+ assertEquals(2, result.getPositionCount());
+
+ // Verify 3 value columns (source, target, count)
+ assertEquals(3, result.getValueColumnCount());
+ assertTrue(operator.isFinished());
+ }
+
+ /**
+ * Test scenario 2: Result set exceeds maxTsBlockSize, should return in
batches Create a large
+ * number of path pairs to trigger size limit
+ */
+ @Test
+ public void testResultsExceedMaxTsBlockSize() throws Exception {
+ prepareSourceTargetPairs(10);
+ operator = createAndInitOperator(10);
+
+ int totalRows = 0;
+ // Loop through all batches
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ TsBlock result = operator.next();
+ if (result != null && !result.isEmpty()) {
+ int rowCount = result.getPositionCount();
+ assertTrue(rowCount == 4 || rowCount == 2);
+ totalRows += rowCount;
+ }
+ }
+
+ // Verify all data is returned
+ assertEquals(10, totalRows);
+ assertTrue(operator.isFinished());
+ }
+
+ private void prepareSourceTargetPairs(int sensorNum) throws Exception {
+ String sourceDevicePath = TEST_SG + ".device0";
+ String targetDevicePath = TEST_SG + ".new_device0";
+ targetDeviceToAlignedMap.put(targetDevicePath, false);
+ PartialPath targetDevicePartialPath = new PartialPath(targetDevicePath);
+
+ for (int i = 0; i < sensorNum; i++) {
+ String targetMeasurement = "sensor" + i;
+ String sourcePath = sourceDevicePath + "." + targetMeasurement;
+ String targetPath = targetDevicePath + "." + targetMeasurement;
+
+ sourceTargetPathPairList.add(new Pair<>(sourcePath, new
PartialPath(targetPath)));
+
+ Map<String, InputLocation> inputLocationMap =
+ targetPathToSourceInputLocationMap.computeIfAbsent(
+ targetDevicePartialPath, k -> new HashMap<>());
+ // Each sensor comes from a different input location (different
SeriesScanOperator)
+ inputLocationMap.put(targetMeasurement, new InputLocation(0, i));
+
+ // Prepare targetPathToDataTypeMap
+ Map<String, TSDataType> dataTypeMap =
+ targetPathToDataTypeMap.computeIfAbsent(targetDevicePartialPath, k
-> new HashMap<>());
+ dataTypeMap.put(targetMeasurement, TSDataType.INT32);
+ }
+
+ // Prepare inputColumnTypes (one column for each sensor)
+ for (int i = 0; i < sensorNum; i++) {
+ inputColumnTypes.add(TSDataType.INT32);
+ }
+ }
+
+ /**
+ * Test version of TreeIntoOperator that mocks the write operation. Instead
of actually executing
+ * insertTablets, it returns an immediately completed Future.
+ */
+ private static class TestTreeIntoOperator extends TreeIntoOperator {
+
+ public TestTreeIntoOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ List<TSDataType> inputColumnTypes,
+ Map<PartialPath, Map<String, InputLocation>>
targetPathToSourceInputLocationMap,
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+ Map<String, Boolean> targetDeviceToAlignedMap,
+ List<Pair<String, PartialPath>> sourceTargetPathPairList,
+ ExecutorService intoOperationExecutor,
+ long statementSizePerLine) {
+ super(
+ operatorContext,
+ child,
+ inputColumnTypes,
+ targetPathToSourceInputLocationMap,
+ targetPathToDataTypeMap,
+ targetDeviceToAlignedMap,
+ sourceTargetPathPairList,
+ intoOperationExecutor,
+ statementSizePerLine);
+ }
+
+ @Override
+ protected void executeInsertMultiTabletsStatement(
+ InsertMultiTabletsStatement insertMultiTabletsStatement) {
+ // Mock the write operation by setting an immediately completed Future
+ writeOperationFuture = immediateFuture(SUCCESS_STATUS);
+ }
+ }
+}