This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f0ac972839 Pipe: Fix NPE caused by forced type conversion and fix 
IoTDBipipeTypeConversionISessionIT of table model (#14667)
7f0ac972839 is described below

commit 7f0ac972839fa919e9cdc7c3eff814c0d249377f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Jan 10 15:57:08 2025 +0800

    Pipe: Fix NPE caused by forced type conversion and fix 
IoTDBipipeTypeConversionISessionIT of table model (#14667)
---
 .../IoTDBPipeTypeConversionISessionIT.java         | 93 +++++++++++++++-------
 .../statement/PipeConvertedInsertRowStatement.java | 30 ++++++-
 .../PipeConvertedInsertTabletStatement.java        | 48 +++++++++--
 ...leStatementDataTypeConvertExecutionVisitor.java |  3 +-
 ...eeStatementDataTypeConvertExecutionVisitor.java |  3 +-
 .../plan/statement/crud/InsertBaseStatement.java   |  6 +-
 .../plan/statement/crud/InsertRowStatement.java    | 18 ++++-
 7 files changed, 159 insertions(+), 42 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeTypeConversionISessionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeTypeConversionISessionIT.java
index 4145fc9a5d2..b3e40bb8a10 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeTypeConversionISessionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeTypeConversionISessionIT.java
@@ -56,12 +56,13 @@ import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2TableModel.class})
 public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTestIT {
-  private static final int generateDataSize = 100;
+  private static final int generateDataSize = 1000;
 
   @Test
   public void insertTablet() {
@@ -90,8 +91,8 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
       param.append(schema.getMeasurementName());
       param.append(',');
     }
-    sql = sql + param.substring(0, param.length() - 1);
-    sql = sql + " from " + tableName + " ORDER BY time ASC";
+
+    sql = sql + param + "time from " + tableName + " ORDER BY time ASC";
     session.executeNonQueryStatement("use test");
     return session.executeQueryStatement(sql);
   }
@@ -106,7 +107,8 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
     createDatabaseAndTable(measurementSchemas, false, tablet.getColumnTypes(), 
receiverEnv);
     try (ITableSession senderSession = senderEnv.getTableSessionConnection();
         ITableSession receiverSession = 
receiverEnv.getTableSessionConnection()) {
-
+      senderSession.executeNonQueryStatement("use test");
+      receiverSession.executeNonQueryStatement("use test");
       if (isTsFile) {
         // Send TsFile data to receiver
         executeDataWriteOperation.accept(senderSession, receiverSession, 
tablet);
@@ -121,7 +123,7 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
       }
 
       // Verify receiver data
-      long timeoutSeconds = 30;
+      long timeoutSeconds = 600;
       List<List<Object>> expectedValues =
           generateTabletResultSetForTable(tablet, measurementSchemas);
       await()
@@ -140,10 +142,10 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
                   fail(e.getMessage());
                 }
               });
-      tablet.reset();
     } catch (Exception e) {
       fail(e.getMessage());
     }
+    tablet.reset();
   }
 
   private void createDatabaseAndTable(
@@ -174,16 +176,15 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
     String sql =
         String.format(
             "create pipe test"
-                + " with source 
('source'='iotdb-source','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
+                + " with source ('source'='iotdb-source','realtime.mode'='%s')"
                 + " with processor ('processor'='do-nothing-processor')"
                 + " with sink 
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
             isTSFile ? "file" : "forced-log",
-            !isTSFile,
-            isTSFile,
             receiverEnv.getIP(),
             receiverEnv.getPort(),
             isTSFile ? "tsfile" : "tablet");
-    TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, 
Collections.singletonList(sql));
+    TestUtils.tryExecuteNonQueriesWithRetry(
+        null, BaseEnv.TABLE_SQL_DIALECT, senderEnv, 
Collections.singletonList(sql));
   }
 
   private void validateResultSet(
@@ -193,11 +194,13 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
     while (dataSet.hasNext()) {
       RowRecord record = dataSet.next();
       List<Field> fields = record.getFields();
-
-      assertEquals(record.getTimestamp(), timestamps[index]);
       List<Object> rowValues = values.get(index++);
       for (int i = 0; i < fields.size(); i++) {
         Field field = fields.get(i);
+        if (field.getDataType() == null) {
+          assertNull(rowValues.get(i));
+          continue;
+        }
         switch (field.getDataType()) {
           case INT64:
           case TIMESTAMP:
@@ -207,11 +210,9 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
             assertEquals(field.getDateV(), rowValues.get(i));
             break;
           case BLOB:
-            assertEquals(field.getBinaryV(), rowValues.get(i));
-            break;
           case TEXT:
           case STRING:
-            assertEquals(field.getStringValue(), rowValues.get(i));
+            assertEquals(field.getBinaryV(), rowValues.get(i));
             break;
           case INT32:
             assertEquals(field.getIntV(), (int) rowValues.get(i));
@@ -283,13 +284,11 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
     }
   }
 
-  private long[] createTestDataForTimestamp() {
+  private void createTestDataForTimeColumn(Tablet tablet) {
     long time = new Date().getTime();
-    long[] data = new long[generateDataSize];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = time++;
+    for (int i = 0; i < generateDataSize; i++) {
+      tablet.addTimestamp(i, time++);
     }
-    return data;
   }
 
   private void createTestDataForDate(Tablet tablet, int j) {
@@ -346,19 +345,35 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
         switch (sourceType) {
           case INT64:
           case TIMESTAMP:
-            value = ValueConverter.convert(sourceType, targetType, ((long[]) 
values[j])[i]);
+            value =
+                ValueConverter.convert(
+                    sourceType,
+                    targetType,
+                    tablet.bitMaps[j].isMarked(i) ? null : ((long[]) 
values[j])[i]);
             insertRecord.add(convert(value, targetType));
             break;
           case INT32:
-            value = ValueConverter.convert(sourceType, targetType, ((int[]) 
values[j])[i]);
+            value =
+                ValueConverter.convert(
+                    sourceType,
+                    targetType,
+                    tablet.bitMaps[j].isMarked(i) ? null : ((int[]) 
values[j])[i]);
             insertRecord.add(convert(value, targetType));
             break;
           case DOUBLE:
-            value = ValueConverter.convert(sourceType, targetType, ((double[]) 
values[j])[i]);
+            value =
+                ValueConverter.convert(
+                    sourceType,
+                    targetType,
+                    tablet.bitMaps[j].isMarked(i) ? null : ((double[]) 
values[j])[i]);
             insertRecord.add(convert(value, targetType));
             break;
           case FLOAT:
-            value = ValueConverter.convert(sourceType, targetType, ((float[]) 
values[j])[i]);
+            value =
+                ValueConverter.convert(
+                    sourceType,
+                    targetType,
+                    tablet.bitMaps[j].isMarked(i) ? null : ((float[]) 
values[j])[i]);
             insertRecord.add(convert(value, targetType));
             break;
           case DATE:
@@ -366,24 +381,39 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
                 ValueConverter.convert(
                     sourceType,
                     targetType,
-                    DateUtils.parseDateExpressionToInt(((LocalDate[]) 
values[j])[i]));
+                    tablet.bitMaps[j].isMarked(i)
+                        ? null
+                        : DateUtils.parseDateExpressionToInt(((LocalDate[]) 
values[j])[i]));
             insertRecord.add(convert(value, targetType));
             break;
           case TEXT:
           case STRING:
-            value = ValueConverter.convert(sourceType, targetType, ((Binary[]) 
values[j])[i]);
+            value =
+                ValueConverter.convert(
+                    sourceType,
+                    targetType,
+                    tablet.bitMaps[j].isMarked(i) ? null : ((Binary[]) 
values[j])[i]);
             insertRecord.add(convert(value, targetType));
             break;
           case BLOB:
-            value = ValueConverter.convert(sourceType, targetType, ((Binary[]) 
values[j])[i]);
+            value =
+                ValueConverter.convert(
+                    sourceType,
+                    targetType,
+                    tablet.bitMaps[j].isMarked(i) ? null : ((Binary[]) 
values[j])[i]);
             insertRecord.add(convert(value, targetType));
             break;
           case BOOLEAN:
-            value = ValueConverter.convert(sourceType, targetType, 
((boolean[]) values[j])[i]);
+            value =
+                ValueConverter.convert(
+                    sourceType,
+                    targetType,
+                    tablet.bitMaps[j].isMarked(i) ? null : ((boolean[]) 
values[j])[i]);
             insertRecord.add(convert(value, targetType));
             break;
         }
       }
+      insertRecord.add(tablet.timestamps[i]);
       insertRecords.add(insertRecord);
     }
 
@@ -391,12 +421,15 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
   }
 
   private Object convert(Object value, TSDataType targetType) {
+    if (value == null) {
+      return null;
+    }
     switch (targetType) {
       case DATE:
         return DateUtils.parseIntToLocalDate((Integer) value);
       case TEXT:
       case STRING:
-        return new String(((Binary) value).getValues(), 
TSFileConfig.STRING_CHARSET);
+        return value;
     }
     return value;
   }
@@ -417,7 +450,7 @@ public class IoTDBPipeTypeConversionISessionIT extends 
AbstractPipeTableModelTes
             columnTypes,
             generateDataSize);
     tablet.initBitMaps();
-    tablet.timestamps = createTestDataForTimestamp();
+    createTestDataForTimeColumn(tablet);
     for (int i = 0; i < pairs.size(); i++) {
       MeasurementSchema schema = pairs.get(i).left;
       switch (schema.getType()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
index f249a773455..edf8b636ecf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -28,10 +28,14 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 
 import org.apache.tsfile.annotations.TableModel;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 public class PipeConvertedInsertRowStatement extends InsertRowStatement {
 
@@ -43,7 +47,6 @@ public class PipeConvertedInsertRowStatement extends 
InsertRowStatement {
     // Statement
     isDebug = insertRowStatement.isDebug();
     // InsertBaseStatement
-    insertRowStatement.removeAllFailedMeasurementMarks();
     devicePath = insertRowStatement.getDevicePath();
     isAligned = insertRowStatement.isAligned();
     measurementSchemas = insertRowStatement.getMeasurementSchemas();
@@ -59,6 +62,31 @@ public class PipeConvertedInsertRowStatement extends 
InsertRowStatement {
     values = insertRowStatement.getValues();
     isNeedInferType = insertRowStatement.isNeedInferType();
     deviceID = insertRowStatement.getRawTableDeviceID();
+
+    // To ensure that the measurement remains unchanged during the WAL writing 
process, the array
+    // needs to be copied before the failed Measurement mark can be deleted.
+    final MeasurementSchema[] measurementSchemas = 
insertRowStatement.getMeasurementSchemas();
+    if (measurementSchemas != null) {
+      this.measurementSchemas = Arrays.copyOf(measurementSchemas, 
measurementSchemas.length);
+    }
+
+    final String[] measurements = insertRowStatement.getMeasurements();
+    if (measurements != null) {
+      this.measurements = Arrays.copyOf(measurements, measurements.length);
+    }
+
+    final TSDataType[] dataTypes = insertRowStatement.getDataTypes();
+    if (dataTypes != null) {
+      this.dataTypes = Arrays.copyOf(dataTypes, dataTypes.length);
+    }
+
+    final Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info =
+        insertRowStatement.getFailedMeasurementInfoMap();
+    if (failedMeasurementIndex2Info != null) {
+      this.failedMeasurementIndex2Info = new 
HashMap<>(failedMeasurementIndex2Info);
+    }
+
+    removeAllFailedMeasurementMarks();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index adc19ae7e0f..eee47a5b0b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -24,25 +24,27 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
 
 import org.apache.tsfile.annotations.TableModel;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
 public class PipeConvertedInsertTabletStatement extends InsertTabletStatement {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeConvertedInsertTabletStatement.class);
 
-  public PipeConvertedInsertTabletStatement(final InsertTabletStatement 
insertTabletStatement) {
+  public PipeConvertedInsertTabletStatement(
+      final InsertTabletStatement insertTabletStatement, boolean 
isCopyMeasurement) {
     super();
     // Statement
     isDebug = insertTabletStatement.isDebug();
     // InsertBaseStatement
-    insertTabletStatement.removeAllFailedMeasurementMarks();
     devicePath = insertTabletStatement.getDevicePath();
     isAligned = insertTabletStatement.isAligned();
-    measurementSchemas = insertTabletStatement.getMeasurementSchemas();
-    measurements = insertTabletStatement.getMeasurements();
-    dataTypes = insertTabletStatement.getDataTypes();
     columnCategories = insertTabletStatement.getColumnCategories();
     idColumnIndices = insertTabletStatement.getIdColumnIndices();
     attrColumnIndices = insertTabletStatement.getAttrColumnIndices();
@@ -55,6 +57,42 @@ public class PipeConvertedInsertTabletStatement extends 
InsertTabletStatement {
     deviceIDs = insertTabletStatement.getRawTableDeviceIDs();
     singleDevice = insertTabletStatement.isSingleDevice();
     rowCount = insertTabletStatement.getRowCount();
+
+    // To ensure that the measurement remains unchanged during the WAL writing 
process, the array
+    // needs to be copied before the failed Measurement mark can be deleted.
+    if (isCopyMeasurement) {
+      final MeasurementSchema[] measurementSchemas = 
insertTabletStatement.getMeasurementSchemas();
+      if (measurementSchemas != null) {
+        this.measurementSchemas = Arrays.copyOf(measurementSchemas, 
measurementSchemas.length);
+      }
+
+      final String[] measurements = insertTabletStatement.getMeasurements();
+      if (measurements != null) {
+        this.measurements = Arrays.copyOf(measurements, measurements.length);
+      }
+
+      final TSDataType[] dataTypes = insertTabletStatement.getDataTypes();
+      if (dataTypes != null) {
+        this.dataTypes = Arrays.copyOf(dataTypes, dataTypes.length);
+      }
+
+      final Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info =
+          insertTabletStatement.getFailedMeasurementInfoMap();
+      if (failedMeasurementIndex2Info != null) {
+        this.failedMeasurementIndex2Info = new 
HashMap<>(failedMeasurementIndex2Info);
+      }
+    } else {
+      this.measurementSchemas = insertTabletStatement.getMeasurementSchemas();
+      this.measurements = insertTabletStatement.getMeasurements();
+      this.dataTypes = insertTabletStatement.getDataTypes();
+      this.failedMeasurementIndex2Info = 
insertTabletStatement.getFailedMeasurementInfoMap();
+    }
+
+    removeAllFailedMeasurementMarks();
+  }
+
+  public PipeConvertedInsertTabletStatement(final InsertTabletStatement 
insertTabletStatement) {
+    this(insertTabletStatement, true);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
index 11952814da0..11858853a2d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
@@ -141,7 +141,8 @@ public class 
PipeTableStatementDataTypeConvertExecutionVisitor
                           rawTabletInsertionEvent.convertToTablet(),
                           rawTabletInsertionEvent.isAligned(),
                           databaseName)
-                      .constructStatement());
+                      .constructStatement(),
+                  false);
 
           TSStatus result;
           try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
index eef40c714dc..ed47e519b00 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -106,7 +106,8 @@ public class 
PipeTreeStatementDataTypeConvertExecutionVisitor
               new PipeConvertedInsertTabletStatement(
                   PipeTransferTabletRawReq.toTPipeTransferRawReq(
                           tabletWithIsAligned.getLeft(), 
tabletWithIsAligned.getRight())
-                      .constructStatement());
+                      .constructStatement(),
+                  false);
 
           TSStatus result;
           try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index eb6da0bc2ff..7ae7b023200 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -341,6 +341,10 @@ public abstract class InsertBaseStatement extends 
Statement {
             .collect(Collectors.toList());
   }
 
+  public Map<Integer, FailedMeasurementInfo> getFailedMeasurementInfoMap() {
+    return failedMeasurementIndex2Info;
+  }
+
   public List<Exception> getFailedExceptions() {
     return failedMeasurementIndex2Info == null
         ? Collections.emptyList()
@@ -364,7 +368,7 @@ public abstract class InsertBaseStatement extends Statement 
{
             .collect(Collectors.toList());
   }
 
-  protected static class FailedMeasurementInfo {
+  public static class FailedMeasurementInfo {
     protected String measurement;
     protected TSDataType dataType;
     protected Object value;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 85f82ff447d..d33a83b0c32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -276,13 +276,25 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
     }
     failedMeasurementIndex2Info.forEach(
         (index, info) -> {
-          measurements[index] = info.getMeasurement();
-          dataTypes[index] = info.getDataType();
-          values[index] = info.getValue();
+          if (measurements != null) {
+            measurements[index] = info.getMeasurement();
+          }
+
+          if (dataTypes != null) {
+            dataTypes[index] = info.getDataType();
+          }
+
+          if (values != null) {
+            values[index] = info.getValue();
+          }
         });
     failedMeasurementIndex2Info.clear();
   }
 
+  public Map<Integer, FailedMeasurementInfo> getFailedMeasurementInfoMap() {
+    return failedMeasurementIndex2Info;
+  }
+
   @Override
   public void semanticCheck() {
     super.semanticCheck();

Reply via email to