This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0aa75b6fd296158c0369d56c3c7723ddbe37a42a Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Aug 1 15:13:54 2024 +0800 Pipe/Subscription: Prevent NPE when some pipe SQL parameter values are null (#13069) (cherry picked from commit cceaebd591955af95fb16b0f373bd8dc9c917cad) --- .../api/customizer/parameter/PipeParameters.java | 25 +++++++++++++++++++--- .../subscription/topic/CreateTopicProcedure.java | 14 +++++------- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index 1190054c6e9..1646423b133 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -278,13 +278,32 @@ public class PipeParameters { .collect( Collectors.toMap( Entry::getKey, - entry -> ValueHider.hide(entry.getKey(), entry.getValue()), - // The key won't be duplicated thus we just return the oldValue - (u, v) -> u, + entry -> { + final String value = ValueHider.hide(entry.getKey(), entry.getValue()); + return value == null ? "null" : value; + }, + (v1, v2) -> { + final boolean v1IsNull = isNullValue(v1); + final boolean v2IsNull = isNullValue(v2); + if (v1IsNull && v2IsNull) { + return "null"; + } + if (v1IsNull) { + return v2; + } + if (v2IsNull) { + return v1; + } + return v1; + }, TreeMap::new)) .toString(); } + private static boolean isNullValue(final String value) { + return value == null || value.equals("null"); + } + /** * This method adds (non-existed) or replaces (existed) equivalent attributes in this * PipeParameters with those from another PipeParameters. diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java index b712861cb54..65035f996bf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java @@ -86,8 +86,7 @@ public class CreateTopicProcedure extends AbstractOperateSubscriptionProcedure { @Override protected void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) throws SubscriptionException { - LOGGER.info( - "CreateTopicProcedure: executeFromOperateOnConfigNodes({})", topicMeta.getTopicName()); + LOGGER.info("CreateTopicProcedure: executeFromOperateOnConfigNodes({})", topicMeta); TSStatus response; try { @@ -108,8 +107,7 @@ public class CreateTopicProcedure extends AbstractOperateSubscriptionProcedure { @Override protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws SubscriptionException { - LOGGER.info( - "CreateTopicProcedure: executeFromOperateOnDataNodes({})", topicMeta.getTopicName()); + LOGGER.info("CreateTopicProcedure: executeFromOperateOnDataNodes({})", topicMeta); try { final List<TSStatus> statuses = env.pushSingleTopicOnDataNode(topicMeta.serialize()); @@ -129,13 +127,12 @@ public class CreateTopicProcedure extends AbstractOperateSubscriptionProcedure { @Override protected void rollbackFromValidate(ConfigNodeProcedureEnv env) { - LOGGER.info("CreateTopicProcedure: rollbackFromValidate({})", topicMeta.getTopicName()); + LOGGER.info("CreateTopicProcedure: rollbackFromValidate({})", topicMeta); } @Override protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) { - LOGGER.info( - "CreateTopicProcedure: rollbackFromCreateOnConfigNodes({})", topicMeta.getTopicName()); + LOGGER.info("CreateTopicProcedure: rollbackFromCreateOnConfigNodes({})", topicMeta); TSStatus response; try { @@ -158,8 +155,7 @@ public class CreateTopicProcedure extends AbstractOperateSubscriptionProcedure { @Override protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { - LOGGER.info( - "CreateTopicProcedure: rollbackFromCreateOnDataNodes({})", topicMeta.getTopicName()); + LOGGER.info("CreateTopicProcedure: rollbackFromCreateOnDataNodes({})", topicMeta); final List<TSStatus> statuses = env.dropSingleTopicOnDataNode(topicMeta.getTopicName()); if (RpcUtils.squashResponseStatusList(statuses).getCode()
