This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b24fb5b0bef Pipe: Fix NPE caused by forced type conversion and fix
IoTDBipipeTypeConversionISessionIT of table model (#14667) (#14670)
b24fb5b0bef is described below
commit b24fb5b0befb8cda377e035cb8be5d1e39dc30b6
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jan 13 15:57:06 2025 +0800
Pipe: Fix NPE caused by forced type conversion and fix
IoTDBipipeTypeConversionISessionIT of table model (#14667) (#14670)
(cherry picked from commit 7f0ac972839fa919e9cdc7c3eff814c0d249377f)
---
.../statement/PipeConvertedInsertRowStatement.java | 30 +++++++++++++-
.../PipeConvertedInsertTabletStatement.java | 48 +++++++++++++++++++---
...peStatementDataTypeConvertExecutionVisitor.java | 3 +-
.../plan/statement/crud/InsertBaseStatement.java | 6 ++-
.../plan/statement/crud/InsertRowStatement.java | 18 ++++++--
5 files changed, 94 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
index 6ae0e2d9fa4..43f0a6728eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -27,10 +27,14 @@ import
org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
public class PipeConvertedInsertRowStatement extends InsertRowStatement {
@@ -42,7 +46,6 @@ public class PipeConvertedInsertRowStatement extends
InsertRowStatement {
// Statement
isDebug = insertRowStatement.isDebug();
// InsertBaseStatement
- insertRowStatement.removeAllFailedMeasurementMarks();
devicePath = insertRowStatement.getDevicePath();
isAligned = insertRowStatement.isAligned();
measurementSchemas = insertRowStatement.getMeasurementSchemas();
@@ -52,6 +55,31 @@ public class PipeConvertedInsertRowStatement extends
InsertRowStatement {
time = insertRowStatement.getTime();
values = insertRowStatement.getValues();
isNeedInferType = insertRowStatement.isNeedInferType();
+
+ // To ensure that the measurement remains unchanged during the WAL writing
process, the array
+ // needs to be copied before the failed Measurement mark can be deleted.
+ final MeasurementSchema[] measurementSchemas =
insertRowStatement.getMeasurementSchemas();
+ if (measurementSchemas != null) {
+ this.measurementSchemas = Arrays.copyOf(measurementSchemas,
measurementSchemas.length);
+ }
+
+ final String[] measurements = insertRowStatement.getMeasurements();
+ if (measurements != null) {
+ this.measurements = Arrays.copyOf(measurements, measurements.length);
+ }
+
+ final TSDataType[] dataTypes = insertRowStatement.getDataTypes();
+ if (dataTypes != null) {
+ this.dataTypes = Arrays.copyOf(dataTypes, dataTypes.length);
+ }
+
+ final Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info =
+ insertRowStatement.getFailedMeasurementInfoMap();
+ if (failedMeasurementIndex2Info != null) {
+ this.failedMeasurementIndex2Info = new
HashMap<>(failedMeasurementIndex2Info);
+ }
+
+ removeAllFailedMeasurementMarks();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index 5f0d1f4f80c..7a005242283 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -23,30 +23,68 @@ import
org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
public class PipeConvertedInsertTabletStatement extends InsertTabletStatement {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConvertedInsertTabletStatement.class);
- public PipeConvertedInsertTabletStatement(final InsertTabletStatement
insertTabletStatement) {
+ public PipeConvertedInsertTabletStatement(
+ final InsertTabletStatement insertTabletStatement, boolean
isCopyMeasurement) {
super();
// Statement
isDebug = insertTabletStatement.isDebug();
// InsertBaseStatement
- insertTabletStatement.removeAllFailedMeasurementMarks();
devicePath = insertTabletStatement.getDevicePath();
isAligned = insertTabletStatement.isAligned();
- measurementSchemas = insertTabletStatement.getMeasurementSchemas();
- measurements = insertTabletStatement.getMeasurements();
- dataTypes = insertTabletStatement.getDataTypes();
// InsertTabletStatement
times = insertTabletStatement.getTimes();
bitMaps = insertTabletStatement.getBitMaps();
columns = insertTabletStatement.getColumns();
rowCount = insertTabletStatement.getRowCount();
+
+ // To ensure that the measurement remains unchanged during the WAL writing
process, the array
+ // needs to be copied before the failed Measurement mark can be deleted.
+ if (isCopyMeasurement) {
+ final MeasurementSchema[] measurementSchemas =
insertTabletStatement.getMeasurementSchemas();
+ if (measurementSchemas != null) {
+ this.measurementSchemas = Arrays.copyOf(measurementSchemas,
measurementSchemas.length);
+ }
+
+ final String[] measurements = insertTabletStatement.getMeasurements();
+ if (measurements != null) {
+ this.measurements = Arrays.copyOf(measurements, measurements.length);
+ }
+
+ final TSDataType[] dataTypes = insertTabletStatement.getDataTypes();
+ if (dataTypes != null) {
+ this.dataTypes = Arrays.copyOf(dataTypes, dataTypes.length);
+ }
+
+ final Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info =
+ insertTabletStatement.getFailedMeasurementInfoMap();
+ if (failedMeasurementIndex2Info != null) {
+ this.failedMeasurementIndex2Info = new
HashMap<>(failedMeasurementIndex2Info);
+ }
+ } else {
+ this.measurementSchemas = insertTabletStatement.getMeasurementSchemas();
+ this.measurements = insertTabletStatement.getMeasurements();
+ this.dataTypes = insertTabletStatement.getDataTypes();
+ this.failedMeasurementIndex2Info =
insertTabletStatement.getFailedMeasurementInfoMap();
+ }
+
+ removeAllFailedMeasurementMarks();
+ }
+
+ public PipeConvertedInsertTabletStatement(final InsertTabletStatement
insertTabletStatement) {
+ this(insertTabletStatement, true);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 1f5256d5a6f..1788ba02cca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -105,7 +105,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor
new PipeConvertedInsertTabletStatement(
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(),
tabletWithIsAligned.getRight())
- .constructStatement());
+ .constructStatement(),
+ false);
TSStatus result;
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 048e75afebc..6d94f12cc35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -259,6 +259,10 @@ public abstract class InsertBaseStatement extends
Statement {
.collect(Collectors.toList());
}
+ public Map<Integer, FailedMeasurementInfo> getFailedMeasurementInfoMap() {
+ return failedMeasurementIndex2Info;
+ }
+
public List<Exception> getFailedExceptions() {
return failedMeasurementIndex2Info == null
? Collections.emptyList()
@@ -282,7 +286,7 @@ public abstract class InsertBaseStatement extends Statement
{
.collect(Collectors.toList());
}
- protected static class FailedMeasurementInfo {
+ public static class FailedMeasurementInfo {
protected String measurement;
protected TSDataType dataType;
protected Object value;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 0123fc6f7a6..56835e45adb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -262,13 +262,25 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
}
failedMeasurementIndex2Info.forEach(
(index, info) -> {
- measurements[index] = info.getMeasurement();
- dataTypes[index] = info.getDataType();
- values[index] = info.getValue();
+ if (measurements != null) {
+ measurements[index] = info.getMeasurement();
+ }
+
+ if (dataTypes != null) {
+ dataTypes[index] = info.getDataType();
+ }
+
+ if (values != null) {
+ values[index] = info.getValue();
+ }
});
failedMeasurementIndex2Info.clear();
}
+ public Map<Integer, FailedMeasurementInfo> getFailedMeasurementInfoMap() {
+ return failedMeasurementIndex2Info;
+ }
+
@Override
public void semanticCheck() {
super.semanticCheck();