This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a9093c29250 [ISSUE-11199] Pipe: fix NPE when casting from null values 
in InsertRowNode deserialization (#11200)
a9093c29250 is described below

commit a9093c29250979c7bf9cb5e0ffe11bf8c185f5fb
Author: V_Galaxy <[email protected]>
AuthorDate: Sun Sep 24 19:33:02 2023 +0800

    [ISSUE-11199] Pipe: fix NPE when casting from null values in InsertRowNode 
deserialization (#11200)
    
    * fix: allow casting from null when deserializing InsertRowNode
    
    * test: add test of inserting null values for pipe data synchronization in 
IoTDBPipeDataSyncIT
---
 .../apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java  | 62 ++++++++++++++++++++++
 .../org/apache/iotdb/db/utils/CommonUtils.java     |  7 +++
 2 files changed, 69 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
index f79b6d11f62..96dbaac1ddc 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
@@ -129,4 +129,66 @@ public class IoTDBPipeDataSyncIT {
       }
     }
   }
+
+  @Test
+  public void testInsertNull() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      try (Connection connection = receiverEnv.getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.execute("create aligned timeseries root.sg.d1(s0 float, s1 
float)");
+      } catch (SQLException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try (Connection connection = senderEnv.getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.execute("create aligned timeseries root.sg.d1(s0 float, s1 
float)");
+        statement.execute("insert into root.sg.d1(time, s0, s1) values (3, 
null, 25.34)");
+      } catch (SQLException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try (Connection connection = receiverEnv.getConnection();
+          Statement statement = connection.createStatement()) {
+        await()
+            .atMost(600, TimeUnit.SECONDS)
+            .untilAsserted(
+                () ->
+                    TestUtils.assertResultSetEqual(
+                        statement.executeQuery("select * from root.**"),
+                        "Time,root.sg.d1.s0,root.sg.d1.s1,",
+                        Collections.singleton("3,null,25.34,")));
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index de6d3cec39f..caed7e9178e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 @SuppressWarnings("java:S106") // for console outputs
 public class CommonUtils {
@@ -110,6 +111,9 @@ public class CommonUtils {
   }
 
   public static boolean checkCanCastType(TSDataType src, TSDataType dest) {
+    if (Objects.isNull(src)) {
+      return true;
+    }
     switch (src) {
       case INT32:
         if (dest == TSDataType.INT64 || dest == TSDataType.FLOAT || dest == 
TSDataType.DOUBLE) {
@@ -128,6 +132,9 @@ public class CommonUtils {
   }
 
   public static Object castValue(TSDataType srcDataType, TSDataType 
destDataType, Object value) {
+    if (Objects.isNull(value)) {
+      return null;
+    }
     switch (srcDataType) {
       case INT32:
         if (destDataType == TSDataType.INT64) {

Reply via email to