This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b24fb5b0bef Pipe: Fix NPE caused by forced type conversion and fix 
IoTDBipipeTypeConversionISessionIT of table model (#14667) (#14670)
b24fb5b0bef is described below

commit b24fb5b0befb8cda377e035cb8be5d1e39dc30b6
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jan 13 15:57:06 2025 +0800

    Pipe: Fix NPE caused by forced type conversion and fix 
IoTDBipipeTypeConversionISessionIT of table model (#14667) (#14670)
    
    (cherry picked from commit 7f0ac972839fa919e9cdc7c3eff814c0d249377f)
---
 .../statement/PipeConvertedInsertRowStatement.java | 30 +++++++++++++-
 .../PipeConvertedInsertTabletStatement.java        | 48 +++++++++++++++++++---
 ...peStatementDataTypeConvertExecutionVisitor.java |  3 +-
 .../plan/statement/crud/InsertBaseStatement.java   |  6 ++-
 .../plan/statement/crud/InsertRowStatement.java    | 18 ++++++--
 5 files changed, 94 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
index 6ae0e2d9fa4..43f0a6728eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -27,10 +27,14 @@ import 
org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 public class PipeConvertedInsertRowStatement extends InsertRowStatement {
 
@@ -42,7 +46,6 @@ public class PipeConvertedInsertRowStatement extends 
InsertRowStatement {
     // Statement
     isDebug = insertRowStatement.isDebug();
     // InsertBaseStatement
-    insertRowStatement.removeAllFailedMeasurementMarks();
     devicePath = insertRowStatement.getDevicePath();
     isAligned = insertRowStatement.isAligned();
     measurementSchemas = insertRowStatement.getMeasurementSchemas();
@@ -52,6 +55,31 @@ public class PipeConvertedInsertRowStatement extends 
InsertRowStatement {
     time = insertRowStatement.getTime();
     values = insertRowStatement.getValues();
     isNeedInferType = insertRowStatement.isNeedInferType();
+
+    // To ensure that the measurement remains unchanged during the WAL writing 
process, the array
+    // needs to be copied before the failed Measurement mark can be deleted.
+    final MeasurementSchema[] measurementSchemas = 
insertRowStatement.getMeasurementSchemas();
+    if (measurementSchemas != null) {
+      this.measurementSchemas = Arrays.copyOf(measurementSchemas, 
measurementSchemas.length);
+    }
+
+    final String[] measurements = insertRowStatement.getMeasurements();
+    if (measurements != null) {
+      this.measurements = Arrays.copyOf(measurements, measurements.length);
+    }
+
+    final TSDataType[] dataTypes = insertRowStatement.getDataTypes();
+    if (dataTypes != null) {
+      this.dataTypes = Arrays.copyOf(dataTypes, dataTypes.length);
+    }
+
+    final Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info =
+        insertRowStatement.getFailedMeasurementInfoMap();
+    if (failedMeasurementIndex2Info != null) {
+      this.failedMeasurementIndex2Info = new 
HashMap<>(failedMeasurementIndex2Info);
+    }
+
+    removeAllFailedMeasurementMarks();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index 5f0d1f4f80c..7a005242283 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -23,30 +23,68 @@ import 
org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
 public class PipeConvertedInsertTabletStatement extends InsertTabletStatement {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeConvertedInsertTabletStatement.class);
 
-  public PipeConvertedInsertTabletStatement(final InsertTabletStatement 
insertTabletStatement) {
+  public PipeConvertedInsertTabletStatement(
+      final InsertTabletStatement insertTabletStatement, boolean 
isCopyMeasurement) {
     super();
     // Statement
     isDebug = insertTabletStatement.isDebug();
     // InsertBaseStatement
-    insertTabletStatement.removeAllFailedMeasurementMarks();
     devicePath = insertTabletStatement.getDevicePath();
     isAligned = insertTabletStatement.isAligned();
-    measurementSchemas = insertTabletStatement.getMeasurementSchemas();
-    measurements = insertTabletStatement.getMeasurements();
-    dataTypes = insertTabletStatement.getDataTypes();
     // InsertTabletStatement
     times = insertTabletStatement.getTimes();
     bitMaps = insertTabletStatement.getBitMaps();
     columns = insertTabletStatement.getColumns();
     rowCount = insertTabletStatement.getRowCount();
+
+    // To ensure that the measurement remains unchanged during the WAL writing 
process, the array
+    // needs to be copied before the failed Measurement mark can be deleted.
+    if (isCopyMeasurement) {
+      final MeasurementSchema[] measurementSchemas = 
insertTabletStatement.getMeasurementSchemas();
+      if (measurementSchemas != null) {
+        this.measurementSchemas = Arrays.copyOf(measurementSchemas, 
measurementSchemas.length);
+      }
+
+      final String[] measurements = insertTabletStatement.getMeasurements();
+      if (measurements != null) {
+        this.measurements = Arrays.copyOf(measurements, measurements.length);
+      }
+
+      final TSDataType[] dataTypes = insertTabletStatement.getDataTypes();
+      if (dataTypes != null) {
+        this.dataTypes = Arrays.copyOf(dataTypes, dataTypes.length);
+      }
+
+      final Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info =
+          insertTabletStatement.getFailedMeasurementInfoMap();
+      if (failedMeasurementIndex2Info != null) {
+        this.failedMeasurementIndex2Info = new 
HashMap<>(failedMeasurementIndex2Info);
+      }
+    } else {
+      this.measurementSchemas = insertTabletStatement.getMeasurementSchemas();
+      this.measurements = insertTabletStatement.getMeasurements();
+      this.dataTypes = insertTabletStatement.getDataTypes();
+      this.failedMeasurementIndex2Info = 
insertTabletStatement.getFailedMeasurementInfoMap();
+    }
+
+    removeAllFailedMeasurementMarks();
+  }
+
+  public PipeConvertedInsertTabletStatement(final InsertTabletStatement 
insertTabletStatement) {
+    this(insertTabletStatement, true);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 1f5256d5a6f..1788ba02cca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -105,7 +105,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor
               new PipeConvertedInsertTabletStatement(
                   PipeTransferTabletRawReq.toTPipeTransferRawReq(
                           tabletWithIsAligned.getLeft(), 
tabletWithIsAligned.getRight())
-                      .constructStatement());
+                      .constructStatement(),
+                  false);
 
           TSStatus result;
           try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 048e75afebc..6d94f12cc35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -259,6 +259,10 @@ public abstract class InsertBaseStatement extends 
Statement {
             .collect(Collectors.toList());
   }
 
+  public Map<Integer, FailedMeasurementInfo> getFailedMeasurementInfoMap() {
+    return failedMeasurementIndex2Info;
+  }
+
   public List<Exception> getFailedExceptions() {
     return failedMeasurementIndex2Info == null
         ? Collections.emptyList()
@@ -282,7 +286,7 @@ public abstract class InsertBaseStatement extends Statement 
{
             .collect(Collectors.toList());
   }
 
-  protected static class FailedMeasurementInfo {
+  public static class FailedMeasurementInfo {
     protected String measurement;
     protected TSDataType dataType;
     protected Object value;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 0123fc6f7a6..56835e45adb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -262,13 +262,25 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
     }
     failedMeasurementIndex2Info.forEach(
         (index, info) -> {
-          measurements[index] = info.getMeasurement();
-          dataTypes[index] = info.getDataType();
-          values[index] = info.getValue();
+          if (measurements != null) {
+            measurements[index] = info.getMeasurement();
+          }
+
+          if (dataTypes != null) {
+            dataTypes[index] = info.getDataType();
+          }
+
+          if (values != null) {
+            values[index] = info.getValue();
+          }
         });
     failedMeasurementIndex2Info.clear();
   }
 
+  public Map<Integer, FailedMeasurementInfo> getFailedMeasurementInfoMap() {
+    return failedMeasurementIndex2Info;
+  }
+
   @Override
   public void semanticCheck() {
     super.semanticCheck();

Reply via email to