This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f372e51146c [To dev/1.3] Improve DeviceViewIntoOperator's return style
to pipeline (#16980)
f372e51146c is described below
commit f372e51146c7037d37d7485524a68757ca993393
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jan 6 23:10:13 2026 +0800
[To dev/1.3] Improve DeviceViewIntoOperator's return style to pipeline
(#16980)
---
.../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java | 50 +++++-----
.../operator/process/AbstractIntoOperator.java | 19 +++-
.../operator/process/DeviceViewIntoOperator.java | 111 ++++++++++++++++++---
.../execution/operator/process/IntoOperator.java | 37 +++++--
.../queryengine/plan/analyze/AnalyzeVisitor.java | 20 +---
.../queryengine/plan/analyze/SelectIntoUtils.java | 42 +-------
.../queryengine/plan/execution/QueryExecution.java | 2 +-
.../db/queryengine/plan/planner/IPlanner.java | 4 +-
.../queryengine/plan/planner/TreeModelPlanner.java | 5 +-
.../parameter/DeviceViewIntoPathDescriptor.java | 5 +-
.../planner/plan/parameter/IntoPathDescriptor.java | 5 +-
11 files changed, 175 insertions(+), 125 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index 3e013c898ba..f4acd5257c9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -623,89 +623,89 @@ public class IoTDBSelectIntoIT {
// test INT32
assertTestFail(
"select s_int32 into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_int32[INT32]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0,
value 0]");
assertTestFail(
"select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_int32[INT32]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not
consistent, registered type TEXT, inserting type INT32, timestamp 0, value 0]");
// test INT64
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_int64[INT64]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type INT64, timestamp 0, value
0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_int64[INT64]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not
consistent, registered type FLOAT, inserting type INT64, timestamp 0, value
0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_int64[INT64]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0,
value 0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_int64[INT64]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not
consistent, registered type TEXT, inserting type INT64, timestamp 0, value 0]");
// test FLOAT
assertTestFail(
"select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_float[FLOAT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type FLOAT, timestamp 0, value
0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_float[FLOAT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not
consistent, registered type INT64, inserting type FLOAT, timestamp 0, value
0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_float[FLOAT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0,
value 0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_float[FLOAT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not
consistent, registered type TEXT, inserting type FLOAT, timestamp 0, value
0.0]");
// test DOUBLE
assertTestFail(
"select s_double into root.sg_type.d_1(s_int32) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value
0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_int64) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not
consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value
0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_float) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not
consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value
0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0,
value 0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_double[DOUBLE]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not
consistent, registered type TEXT, inserting type DOUBLE, timestamp 0, value
0.0]");
// test BOOLEAN
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_int32) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value
true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_int64) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "301: Error occurred while inserting tablets in SELECT INTO: Fail to
insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64
is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0,
value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_float) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not
consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value
true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_double) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is
not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0,
value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_text) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_boolean[BOOLEAN]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not
consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value
true]");
// test TEXT
assertTestFail(
"select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not
consistent, registered type INT32, inserting type TEXT, timestamp 0, value
text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not
consistent, registered type INT64, inserting type TEXT, timestamp 0, value
text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not
compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not
consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value
text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is
not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value
text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_boolean) from
root.sg_type.d_0;",
- "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is
not compatible with the data type of source column
(root.sg_type.d_0.s_text[TEXT]).");
+ "Error occurred while inserting tablets in SELECT INTO: Fail to insert
measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is
not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0,
value text0]");
}
@Test
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
index 99317ba7b09..7730dbe17f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
@@ -38,6 +38,7 @@ import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.utils.Binary;
@@ -82,18 +83,22 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ protected final TsBlockBuilder resultTsBlockBuilder;
+
protected AbstractIntoOperator(
OperatorContext operatorContext,
Operator child,
List<TSDataType> inputColumnTypes,
ExecutorService intoOperationExecutor,
- long statementSizePerLine) {
+ long statementSizePerLine,
+ List<TSDataType> outputDataTypes) {
this.operatorContext = operatorContext;
this.child = child;
this.typeConvertors =
inputColumnTypes.stream().map(TypeFactory::getType).collect(Collectors.toList());
this.writeOperationExecutor = intoOperationExecutor;
+ this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
initMemoryEstimates(statementSizePerLine);
}
@@ -152,7 +157,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
checkLastWriteOperation();
if (!processTsBlock(cachedTsBlock)) {
- return null;
+ return tryToReturnPartialResult();
}
cachedTsBlock = null;
if (child.hasNextWithTimer()) {
@@ -160,7 +165,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
processTsBlock(inputTsBlock);
// call child.next only once
- return null;
+ return tryToReturnPartialResult();
} else {
return tryToReturnResultTsBlock();
}
@@ -218,6 +223,8 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
protected abstract TsBlock tryToReturnResultTsBlock();
+ protected abstract TsBlock tryToReturnPartialResult();
+
protected static List<InsertTabletStatementGenerator>
constructInsertTabletStatementGenerators(
Map<PartialPath, Map<String, InputLocation>>
targetPathToSourceInputLocationMap,
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
@@ -286,7 +293,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
() -> client.insertTablets(insertMultiTabletsStatement),
writeOperationExecutor);
}
- private boolean existFullStatement(
+ protected boolean existFullStatement(
List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
for (InsertTabletStatementGenerator generator :
insertTabletStatementGenerators) {
if (generator.isFull()) {
@@ -549,6 +556,10 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
return devicePath.toString();
}
+ public int getRowCount() {
+ return rowCount;
+ }
+
public int getWrittenCount(String measurement) {
if (!writtenCounter.containsKey(measurement)) {
return -1;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
index e584f79d1b6..14ad2b4a3d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.queryengine.execution.operator.process;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.runtime.IntoProcessException;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
@@ -27,17 +30,18 @@ import
org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -48,6 +52,7 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(DeviceViewIntoOperator.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final Map<String, Map<PartialPath, Map<String, InputLocation>>>
deviceToTargetPathSourceInputLocationMap;
@@ -59,7 +64,7 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
private final int deviceColumnIndex;
private String currentDevice;
- private final TsBlockBuilder resultTsBlockBuilder;
+ private int batchedRowCount = 0;
@SuppressWarnings("squid:S107")
public DeviceViewIntoOperator(
@@ -74,18 +79,19 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
Map<String, InputLocation> sourceColumnToInputLocationMap,
ExecutorService intoOperationExecutor,
long statementSizePerLine) {
- super(operatorContext, child, inputColumnTypes, intoOperationExecutor,
statementSizePerLine);
+ super(
+ operatorContext,
+ child,
+ inputColumnTypes,
+ intoOperationExecutor,
+ statementSizePerLine,
+ ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList()));
this.deviceToTargetPathSourceInputLocationMap =
deviceToTargetPathSourceInputLocationMap;
this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
this.deviceToSourceTargetPathPairListMap =
deviceToSourceTargetPathPairListMap;
-
- List<TSDataType> outputDataTypes =
- ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
- .map(ColumnHeader::getColumnType)
- .collect(Collectors.toList());
- this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
-
this.deviceColumnIndex =
sourceColumnToInputLocationMap.get(ColumnHeaderConstant.DEVICE).getValueColumnIndex();
}
@@ -102,7 +108,12 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
constructInsertMultiTabletsStatement(false);
updateResultTsBlock();
- insertTabletStatementGenerators =
constructInsertTabletStatementGeneratorsByDevice(device);
+ if (insertMultiTabletsStatement != null ||
insertTabletStatementGenerators == null) {
+ insertTabletStatementGenerators =
constructInsertTabletStatementGeneratorsByDevice(device);
+ } else {
+ insertTabletStatementGenerators.addAll(
+ constructInsertTabletStatementGeneratorsByDevice(device));
+ }
currentDevice = device;
if (insertMultiTabletsStatement != null) {
@@ -115,9 +126,15 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
int readIndex = 0;
while (readIndex < inputTsBlock.getPositionCount()) {
int lastReadIndex = readIndex;
- for (AbstractIntoOperator.InsertTabletStatementGenerator generator :
- insertTabletStatementGenerators) {
- lastReadIndex = Math.max(lastReadIndex,
generator.processTsBlock(inputTsBlock, readIndex));
+
+ if (!insertTabletStatementGenerators.isEmpty()) {
+ InsertTabletStatementGenerator generatorOfCurrentDevice =
+
insertTabletStatementGenerators.get(insertTabletStatementGenerators.size() - 1);
+ int rowCountBeforeProcess = generatorOfCurrentDevice.getRowCount();
+ lastReadIndex =
+ Math.max(
+ lastReadIndex,
generatorOfCurrentDevice.processTsBlock(inputTsBlock, readIndex));
+ batchedRowCount += generatorOfCurrentDevice.getRowCount() -
rowCountBeforeProcess;
}
readIndex = lastReadIndex;
if (insertMultiTabletsInternally(true)) {
@@ -144,6 +161,16 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
return resultTsBlockBuilder.build();
}
+ @Override
+ protected TsBlock tryToReturnPartialResult() {
+ if (resultTsBlockBuilder.isFull()) {
+ TsBlock res = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return res;
+ }
+ return null;
+ }
+
private List<AbstractIntoOperator.InsertTabletStatementGenerator>
constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
Map<PartialPath, Map<String, InputLocation>>
targetPathToSourceInputLocationMap =
@@ -187,4 +214,60 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ resultTsBlockBuilder.getRetainedSizeInBytes();
}
+
+ @Override
+ protected InsertMultiTabletsStatement
constructInsertMultiTabletsStatement(boolean needCheck) {
+ if (insertTabletStatementGenerators == null) {
+ return null;
+ }
+ boolean hasFullStatement =
existFullStatement(insertTabletStatementGenerators);
+ if (needCheck) {
+ // When needCheck is true, we only proceed if there already exists a
full statement.
+ if (!hasFullStatement) {
+ return null;
+ }
+ } else {
+ // When needCheck is false, we may delay flushing to accumulate more rows
+ // if the batch is not yet at the configured row limit and the child has
more data.
+ try {
+ if (batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit()
+ && child.hasNextWithTimer()) {
+ return null;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IntoProcessException(e.getMessage());
+ } catch (Exception e) {
+ throw new IntoProcessException(e.getMessage());
+ }
+ }
+ List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
+ for (InsertTabletStatementGenerator generator :
insertTabletStatementGenerators) {
+ if (!generator.isEmpty()) {
+
insertTabletStatementList.add(generator.constructInsertTabletStatement());
+ }
+ }
+ if (insertTabletStatementList.isEmpty()) {
+ return null;
+ }
+
+ InsertMultiTabletsStatement insertMultiTabletsStatement = new
InsertMultiTabletsStatement();
+
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+ batchedRowCount = 0;
+ return insertMultiTabletsStatement;
+ }
+
+ @Override
+ protected int findWritten(String device, String measurement) {
+ for (InsertTabletStatementGenerator generator :
insertTabletStatementGenerators) {
+ if (!Objects.equals(generator.getDevice(), device)) {
+ continue;
+ }
+ int writtenCountInCurrentGenerator =
generator.getWrittenCount(measurement);
+ if (writtenCountInCurrentGenerator >= 0) {
+ return writtenCountInCurrentGenerator;
+ }
+ }
+ return 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java
index 5e516eeacfa..227b299c2d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java
@@ -31,7 +31,6 @@ import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
@@ -49,6 +48,8 @@ public class IntoOperator extends AbstractIntoOperator {
private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
+ private int outputIndex = 0;
+
@SuppressWarnings("squid:S107")
public IntoOperator(
OperatorContext operatorContext,
@@ -60,7 +61,15 @@ public class IntoOperator extends AbstractIntoOperator {
List<Pair<String, PartialPath>> sourceTargetPathPairList,
ExecutorService intoOperationExecutor,
long statementSizePerLine) {
- super(operatorContext, child, inputColumnTypes, intoOperationExecutor,
statementSizePerLine);
+ super(
+ operatorContext,
+ child,
+ inputColumnTypes,
+ intoOperationExecutor,
+ statementSizePerLine,
+ ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList()));
this.sourceTargetPathPairList = sourceTargetPathPairList;
insertTabletStatementGenerators =
constructInsertTabletStatementGenerators(
@@ -98,19 +107,23 @@ public class IntoOperator extends AbstractIntoOperator {
return null;
}
- finished = true;
- return constructResultTsBlock();
+ TsBlock res = constructResultTsBlock();
+ finished = (outputIndex == sourceTargetPathPairList.size());
+ return res;
+ }
+
+ @Override
+ protected TsBlock tryToReturnPartialResult() {
+ return null;
}
private TsBlock constructResultTsBlock() {
- List<TSDataType> outputDataTypes =
- ColumnHeaderConstant.selectIntoColumnHeaders.stream()
- .map(ColumnHeader::getColumnType)
- .collect(Collectors.toList());
- TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
TimeColumnBuilder timeColumnBuilder =
resultTsBlockBuilder.getTimeColumnBuilder();
ColumnBuilder[] columnBuilders =
resultTsBlockBuilder.getValueColumnBuilders();
- for (Pair<String, PartialPath> sourceTargetPathPair :
sourceTargetPathPairList) {
+ for (int size = sourceTargetPathPairList.size();
+ outputIndex < size && !resultTsBlockBuilder.isFull();
+ outputIndex++) {
+ Pair<String, PartialPath> sourceTargetPathPair =
sourceTargetPathPairList.get(outputIndex);
timeColumnBuilder.writeLong(0);
columnBuilders[0].writeBinary(
new Binary(sourceTargetPathPair.left, TSFileConfig.STRING_CHARSET));
@@ -121,7 +134,9 @@ public class IntoOperator extends AbstractIntoOperator {
sourceTargetPathPair.right.getDevice(),
sourceTargetPathPair.right.getMeasurement()));
resultTsBlockBuilder.declarePosition();
}
- return resultTsBlockBuilder.build();
+ TsBlock res = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return res;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index dab31639205..e79fe9e3f40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2383,7 +2383,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
intoComponent.validate(sourceDevices, sourceColumns);
DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = new
DeviceViewIntoPathDescriptor();
- PathPatternTree targetPathTree = new PathPatternTree();
IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator =
intoComponent.getIntoDeviceMeasurementIterator();
for (PartialPath sourceDevice : sourceDevices) {
@@ -2405,7 +2404,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
deviceViewIntoPathDescriptor.specifyTargetDeviceMeasurement(
sourceDevice, targetDevice, sourceColumn.getExpressionString(),
targetMeasurement);
- targetPathTree.appendFullPath(targetDevice, targetMeasurement);
deviceViewIntoPathDescriptor.recordSourceColumnDataType(
sourceColumn.getExpressionString(),
analysis.getType(sourceColumn));
@@ -2415,13 +2413,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
intoDeviceMeasurementIterator.nextDevice();
}
deviceViewIntoPathDescriptor.validate();
-
- // fetch schema of target paths
- long startTime = System.nanoTime();
- ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree,
true, context);
- QueryPlanCostMetricSet.getInstance()
- .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
- deviceViewIntoPathDescriptor.bindType(targetSchemaTree);
+ deviceViewIntoPathDescriptor.bindType();
analysis.setDeviceViewIntoPathDescriptor(deviceViewIntoPathDescriptor);
}
@@ -2445,7 +2437,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
intoComponent.validate(sourceColumns);
IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
- PathPatternTree targetPathTree = new PathPatternTree();
IntoComponent.IntoPathIterator intoPathIterator =
intoComponent.getIntoPathIterator();
for (Pair<Expression, String> pair : outputExpressions) {
Expression sourceExpression = pair.left;
@@ -2477,7 +2468,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
intoPathDescriptor.specifyDeviceAlignment(
targetPath.getDevicePath().toString(), isAlignedDevice);
- targetPathTree.appendFullPath(targetPath);
intoPathDescriptor.recordSourceColumnDataType(
sourceColumn, analysis.getType(sourceExpression));
@@ -2485,13 +2475,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
intoPathDescriptor.validate();
- // fetch schema of target paths
- long startTime = System.nanoTime();
- ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree,
true, context);
- updateSchemaTreeByViews(analysis, targetSchemaTree, context);
- QueryPlanCostMetricSet.getInstance()
- .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
- intoPathDescriptor.bindType(targetSchemaTree);
+ intoPathDescriptor.bindType();
analysis.setIntoPathDescriptor(intoPathDescriptor);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
index 2a64f06d186..10056b8f3aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
@@ -21,12 +21,9 @@ package org.apache.iotdb.db.queryengine.plan.analyze;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Pair;
@@ -37,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
-import static com.google.common.base.Preconditions.checkState;
import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
import static
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString;
@@ -134,48 +130,14 @@ public class SelectIntoUtils {
public static List<Pair<String, PartialPath>>
bindTypeForSourceTargetPathPairList(
List<Pair<String, PartialPath>> sourceTargetPathPairList,
- Map<String, TSDataType> sourceToDataTypeMap,
- ISchemaTree targetSchemaTree) {
+ Map<String, TSDataType> sourceToDataTypeMap) {
List<Pair<String, PartialPath>> sourceTypeBoundTargetPathPairList = new
ArrayList<>();
for (Pair<String, PartialPath> sourceTargetPathPair :
sourceTargetPathPairList) {
String sourceColumn = sourceTargetPathPair.left;
TSDataType sourceColumnType = sourceToDataTypeMap.get(sourceColumn);
-
MeasurementPath targetPathWithSchema;
PartialPath targetPath = sourceTargetPathPair.right;
- List<MeasurementPath> actualTargetPaths =
- targetSchemaTree.searchMeasurementPaths(targetPath).left;
- if (actualTargetPaths.isEmpty()) {
- targetPathWithSchema = new MeasurementPath(targetPath,
sourceColumnType);
- } else {
- checkState(actualTargetPaths.size() == 1);
- MeasurementPath actualTargetPath = actualTargetPaths.get(0);
- if (actualTargetPath.getMeasurementSchema().isLogicalView()) {
- LogicalViewSchema viewSchema =
- (LogicalViewSchema) actualTargetPath.getMeasurementSchema();
- if (viewSchema.isWritable()) {
- MeasurementPath viewSourceSeriesPath =
- targetSchemaTree
-
.searchMeasurementPaths(viewSchema.getSourcePathIfWritable())
- .left
- .get(0);
- actualTargetPath =
- new MeasurementPath(targetPath,
viewSourceSeriesPath.getSeriesType());
-
actualTargetPath.setUnderAlignedEntity(viewSourceSeriesPath.isUnderAlignedEntity());
- } else {
- throw new SemanticException(
- String.format("View %s doesn't support data insertion.",
targetPath));
- }
- }
- if (!TypeInferenceUtils.canAutoCast(sourceColumnType,
actualTargetPath.getSeriesType())) {
- throw new SemanticException(
- String.format(
- "The data type of target path (%s[%s]) is not compatible
with the data type of source column (%s[%s]).",
- targetPath, actualTargetPath.getSeriesType(), sourceColumn,
sourceColumnType));
- }
- // no need to check alignment, because the interface is common
- targetPathWithSchema = actualTargetPath;
- }
+ targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType);
sourceTypeBoundTargetPathPairList.add(new Pair<>(sourceColumn,
targetPathWithSchema));
}
return sourceTypeBoundTargetPathPairList;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 350dd6394d9..0ae40345268 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -617,7 +617,7 @@ public class QueryExecution implements IQueryExecution {
// info to client
if (!CONFIG.isEnable13DataInsertAdapt()
||
IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion())) {
- planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus,
statusCode);
+ planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus);
}
return new ExecutionResult(context.getQueryId(), tsstatus);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
index 616cca3efb7..b9b239b5e26 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
-import org.apache.iotdb.rpc.TSStatusCode;
import java.util.concurrent.ScheduledExecutorService;
@@ -49,6 +48,5 @@ public interface IPlanner {
ScheduledExecutorService getScheduledExecutorService();
- void setRedirectInfo(
- IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus,
TSStatusCode statusCode);
+ void setRedirectInfo(IAnalysis analysis, TEndPoint localEndPoint, TSStatus
tsstatus);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index 6ec87d08918..1da701a8e35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -156,8 +156,7 @@ public class TreeModelPlanner implements IPlanner {
}
@Override
- public void setRedirectInfo(
- IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus,
TSStatusCode statusCode) {
+ public void setRedirectInfo(IAnalysis iAnalysis, TEndPoint localEndPoint,
TSStatus tsstatus) {
Analysis analysis = (Analysis) iAnalysis;
// Get the inner statement of PipeEnrichedStatement
@@ -173,7 +172,7 @@ public class TreeModelPlanner implements IPlanner {
if (insertStatement instanceof InsertRowsStatement
|| insertStatement instanceof InsertMultiTabletsStatement) {
// multiple devices
- if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+ if (tsstatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
{
boolean needRedirect = false;
List<TSStatus> subStatus = new ArrayList<>();
for (TEndPoint endPoint : redirectNodeList) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
index 4b43abe5d72..7197f4c7ca8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -100,7 +99,7 @@ public class DeviceViewIntoPathDescriptor {
}
}
- public void bindType(ISchemaTree targetSchemaTree) {
+ public void bindType() {
Map<String, List<Pair<String, PartialPath>>>
deviceToSourceTypeBoundTargetPathPairListMap =
new HashMap<>();
for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
@@ -108,7 +107,7 @@ public class DeviceViewIntoPathDescriptor {
deviceToSourceTypeBoundTargetPathPairListMap.put(
sourceTargetEntry.getKey(),
SelectIntoUtils.bindTypeForSourceTargetPathPairList(
- sourceTargetEntry.getValue(), sourceToDataTypeMap,
targetSchemaTree));
+ sourceTargetEntry.getValue(), sourceToDataTypeMap));
}
this.deviceToSourceTargetPathPairListMap =
deviceToSourceTypeBoundTargetPathPairListMap;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
index 35765ee5c07..29f04834f67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils;
import org.apache.commons.lang3.StringUtils;
@@ -101,10 +100,10 @@ public class IntoPathDescriptor {
}
}
- public void bindType(ISchemaTree targetSchemaTree) {
+ public void bindType() {
this.sourceTargetPathPairList =
SelectIntoUtils.bindTypeForSourceTargetPathPairList(
- sourceTargetPathPairList, sourceToDataTypeMap, targetSchemaTree);
+ sourceTargetPathPairList, sourceToDataTypeMap);
}
public List<Pair<String, PartialPath>> getSourceTargetPathPairList() {