This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a9093c29250 [ISSUE-11199] Pipe: fix NPE when casting from null values
in InsertRowNode deserialization (#11200)
a9093c29250 is described below
commit a9093c29250979c7bf9cb5e0ffe11bf8c185f5fb
Author: V_Galaxy <[email protected]>
AuthorDate: Sun Sep 24 19:33:02 2023 +0800
[ISSUE-11199] Pipe: fix NPE when casting from null values in InsertRowNode
deserialization (#11200)
* fix: allow casting from null when deserializing InsertRowNode
* test: add test of inserting null values for pipe data synchronization in
IoTDBPipeDataSyncIT
---
.../apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java | 62 ++++++++++++++++++++++
.../org/apache/iotdb/db/utils/CommonUtils.java | 7 +++
2 files changed, 69 insertions(+)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
index f79b6d11f62..96dbaac1ddc 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
@@ -129,4 +129,66 @@ public class IoTDBPipeDataSyncIT {
}
}
}
+
+ @Test
+ public void testInsertNull() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ try (Connection connection = receiverEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("create aligned timeseries root.sg.d1(s0 float, s1
float)");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("create aligned timeseries root.sg.d1(s0 float, s1
float)");
+ statement.execute("insert into root.sg.d1(time, s0, s1) values (3,
null, 25.34)");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (Connection connection = receiverEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ await()
+ .atMost(600, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ TestUtils.assertResultSetEqual(
+ statement.executeQuery("select * from root.**"),
+ "Time,root.sg.d1.s0,root.sg.d1.s1,",
+ Collections.singleton("3,null,25.34,")));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index de6d3cec39f..caed7e9178e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
@SuppressWarnings("java:S106") // for console outputs
public class CommonUtils {
@@ -110,6 +111,9 @@ public class CommonUtils {
}
public static boolean checkCanCastType(TSDataType src, TSDataType dest) {
+ if (Objects.isNull(src)) {
+ return true;
+ }
switch (src) {
case INT32:
if (dest == TSDataType.INT64 || dest == TSDataType.FLOAT || dest ==
TSDataType.DOUBLE) {
@@ -128,6 +132,9 @@ public class CommonUtils {
}
public static Object castValue(TSDataType srcDataType, TSDataType
destDataType, Object value) {
+ if (Objects.isNull(value)) {
+ return null;
+ }
switch (srcDataType) {
case INT32:
if (destDataType == TSDataType.INT64) {