This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cceaebd5919 Pipe/Subscription: Prevent NPE when some pipe SQL
parameter values are null (#13069)
cceaebd5919 is described below
commit cceaebd591955af95fb16b0f373bd8dc9c917cad
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Aug 1 15:13:54 2024 +0800
Pipe/Subscription: Prevent NPE when some pipe SQL parameter values are null
(#13069)
---
.../api/customizer/parameter/PipeParameters.java | 25 +++++++++++++++++++---
.../subscription/topic/CreateTopicProcedure.java | 14 +++++-------
2 files changed, 27 insertions(+), 12 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 1190054c6e9..1646423b133 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -278,13 +278,32 @@ public class PipeParameters {
.collect(
Collectors.toMap(
Entry::getKey,
- entry -> ValueHider.hide(entry.getKey(), entry.getValue()),
- // The key won't be duplicated thus we just return the oldValue
- (u, v) -> u,
+ entry -> {
+ final String value = ValueHider.hide(entry.getKey(),
entry.getValue());
+ return value == null ? "null" : value;
+ },
+ (v1, v2) -> {
+ final boolean v1IsNull = isNullValue(v1);
+ final boolean v2IsNull = isNullValue(v2);
+ if (v1IsNull && v2IsNull) {
+ return "null";
+ }
+ if (v1IsNull) {
+ return v2;
+ }
+ if (v2IsNull) {
+ return v1;
+ }
+ return v1;
+ },
TreeMap::new))
.toString();
}
+ private static boolean isNullValue(final String value) {
+ return value == null || value.equals("null");
+ }
+
/**
* This method adds (non-existed) or replaces (existed) equivalent
attributes in this
* PipeParameters with those from another PipeParameters.
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
index b712861cb54..65035f996bf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
@@ -86,8 +86,7 @@ public class CreateTopicProcedure extends
AbstractOperateSubscriptionProcedure {
@Override
protected void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env)
throws SubscriptionException {
- LOGGER.info(
- "CreateTopicProcedure: executeFromOperateOnConfigNodes({})",
topicMeta.getTopicName());
+ LOGGER.info("CreateTopicProcedure: executeFromOperateOnConfigNodes({})",
topicMeta);
TSStatus response;
try {
@@ -108,8 +107,7 @@ public class CreateTopicProcedure extends
AbstractOperateSubscriptionProcedure {
@Override
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws SubscriptionException {
- LOGGER.info(
- "CreateTopicProcedure: executeFromOperateOnDataNodes({})",
topicMeta.getTopicName());
+ LOGGER.info("CreateTopicProcedure: executeFromOperateOnDataNodes({})",
topicMeta);
try {
final List<TSStatus> statuses =
env.pushSingleTopicOnDataNode(topicMeta.serialize());
@@ -129,13 +127,12 @@ public class CreateTopicProcedure extends
AbstractOperateSubscriptionProcedure {
@Override
protected void rollbackFromValidate(ConfigNodeProcedureEnv env) {
- LOGGER.info("CreateTopicProcedure: rollbackFromValidate({})",
topicMeta.getTopicName());
+ LOGGER.info("CreateTopicProcedure: rollbackFromValidate({})", topicMeta);
}
@Override
protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) {
- LOGGER.info(
- "CreateTopicProcedure: rollbackFromCreateOnConfigNodes({})",
topicMeta.getTopicName());
+ LOGGER.info("CreateTopicProcedure: rollbackFromCreateOnConfigNodes({})",
topicMeta);
TSStatus response;
try {
@@ -158,8 +155,7 @@ public class CreateTopicProcedure extends
AbstractOperateSubscriptionProcedure {
@Override
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
- LOGGER.info(
- "CreateTopicProcedure: rollbackFromCreateOnDataNodes({})",
topicMeta.getTopicName());
+ LOGGER.info("CreateTopicProcedure: rollbackFromCreateOnDataNodes({})",
topicMeta);
final List<TSStatus> statuses =
env.dropSingleTopicOnDataNode(topicMeta.getTopicName());
if (RpcUtils.squashResponseStatusList(statuses).getCode()