This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cd5e6fa [ISSUE #451] instanceId is "null" bug fix
5cd5e6fa is described below

commit 5cd5e6fa65a29ec472b1cb0c3dd82fef33a1d940
Author: zhoubo <[email protected]>
AuthorDate: Mon Apr 3 15:24:20 2023 +0800

    [ISSUE #451] instanceId is "null" bug fix
---
 .../org/apache/rocketmq/replicator/ReplicatorSourceConnector.java | 8 ++++++--
 .../org/apache/rocketmq/replicator/utils/ReplicatorUtils.java     | 2 +-
 .../rocketmq/connect/runtime/common/ConnectKeyValueTest.java      | 1 +
 3 files changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 51fda603..efd34e1a 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -162,7 +162,9 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
             keyValue.put(ReplicatorConnectorConfig.SRC_CLOUD, 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLOUD));
             keyValue.put(ReplicatorConnectorConfig.SRC_REGION, 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_REGION));
             keyValue.put(ReplicatorConnectorConfig.SRC_CLUSTER, 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
-            keyValue.put(ReplicatorConnectorConfig.SRC_INSTANCEID, 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
+            if (null != 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID)) {
+                keyValue.put(ReplicatorConnectorConfig.SRC_INSTANCEID, 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
+            }
             keyValue.put(ReplicatorConnectorConfig.SRC_ENDPOINT, 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
             keyValue.put(ReplicatorConnectorConfig.SRC_TOPICTAGS, 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
             keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, 
connectorConfig.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
@@ -171,7 +173,9 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
             keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_CLOUD));
             keyValue.put(ReplicatorConnectorConfig.DEST_REGION, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_REGION));
             keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
-            keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
+            if (null != 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)) {
+                keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
+            }
             keyValue.put(ReplicatorConnectorConfig.DEST_ENDPOINT, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
             keyValue.put(ReplicatorConnectorConfig.DEST_TOPIC, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_TOPIC));
             keyValue.put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false"));
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
index 04b71a67..1c7e516c 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
@@ -32,7 +32,7 @@ import java.util.*;
 public class ReplicatorUtils {
     private static Log log = LogFactory.getLog(ReplicatorUtils.class);
     public static String buildTopicWithNamespace(String rawTopic, String 
instanceId) {
-        if (StringUtils.isBlank(instanceId)) {
+        if (StringUtils.isBlank(instanceId) || 
StringUtils.isEmpty(instanceId)) {
             return rawTopic;
         }
         return instanceId + "%" + rawTopic;
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValueTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValueTest.java
index c7b3ea66..70c880d8 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValueTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValueTest.java
@@ -41,6 +41,7 @@ public class ConnectKeyValueTest {
         keyValue.put("DoubleKey", 5.2);
 
         assertEquals("StringValue", keyValue.getString("StringKey"));
+        assertEquals(null, keyValue.getString("nothingness"));
         assertEquals(0, keyValue.getInt("IntegerKey"));
         assertEquals(1L, keyValue.getLong("LongKey"));
         assertEquals(5.2, keyValue.getDouble("DoubleKey"), 0.0);

Reply via email to