This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 5cd5e6fa [ISSUE #451] instanceId is "null" bug fix
5cd5e6fa is described below
commit 5cd5e6fa65a29ec472b1cb0c3dd82fef33a1d940
Author: zhoubo <[email protected]>
AuthorDate: Mon Apr 3 15:24:20 2023 +0800
[ISSUE #451] instanceId is "null" bug fix
---
.../org/apache/rocketmq/replicator/ReplicatorSourceConnector.java | 8 ++++++--
.../org/apache/rocketmq/replicator/utils/ReplicatorUtils.java | 2 +-
.../rocketmq/connect/runtime/common/ConnectKeyValueTest.java | 1 +
3 files changed, 8 insertions(+), 3 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index 51fda603..efd34e1a 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -162,7 +162,9 @@ public class ReplicatorSourceConnector extends
SourceConnector {
keyValue.put(ReplicatorConnectorConfig.SRC_CLOUD,
connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLOUD));
keyValue.put(ReplicatorConnectorConfig.SRC_REGION,
connectorConfig.getString(ReplicatorConnectorConfig.SRC_REGION));
keyValue.put(ReplicatorConnectorConfig.SRC_CLUSTER,
connectorConfig.getString(ReplicatorConnectorConfig.SRC_CLUSTER));
- keyValue.put(ReplicatorConnectorConfig.SRC_INSTANCEID,
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
+ if (null !=
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID)) {
+ keyValue.put(ReplicatorConnectorConfig.SRC_INSTANCEID,
connectorConfig.getString(ReplicatorConnectorConfig.SRC_INSTANCEID));
+ }
keyValue.put(ReplicatorConnectorConfig.SRC_ENDPOINT,
connectorConfig.getString(ReplicatorConnectorConfig.SRC_ENDPOINT));
keyValue.put(ReplicatorConnectorConfig.SRC_TOPICTAGS,
connectorConfig.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
connectorConfig.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
@@ -171,7 +173,9 @@ public class ReplicatorSourceConnector extends
SourceConnector {
keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_CLOUD));
keyValue.put(ReplicatorConnectorConfig.DEST_REGION,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_REGION));
keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
- keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
+ if (null !=
connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)) {
+ keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
+ }
keyValue.put(ReplicatorConnectorConfig.DEST_ENDPOINT,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
keyValue.put(ReplicatorConnectorConfig.DEST_TOPIC,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_TOPIC));
keyValue.put(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false"));
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
index 04b71a67..1c7e516c 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java
@@ -32,7 +32,7 @@ import java.util.*;
public class ReplicatorUtils {
private static Log log = LogFactory.getLog(ReplicatorUtils.class);
public static String buildTopicWithNamespace(String rawTopic, String
instanceId) {
- if (StringUtils.isBlank(instanceId)) {
+ if (StringUtils.isBlank(instanceId) ||
StringUtils.isEmpty(instanceId)) {
return rawTopic;
}
return instanceId + "%" + rawTopic;
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValueTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValueTest.java
index c7b3ea66..70c880d8 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValueTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValueTest.java
@@ -41,6 +41,7 @@ public class ConnectKeyValueTest {
keyValue.put("DoubleKey", 5.2);
assertEquals("StringValue", keyValue.getString("StringKey"));
+ assertEquals(null, keyValue.getString("nothingness"));
assertEquals(0, keyValue.getInt("IntegerKey"));
assertEquals(1L, keyValue.getLong("LongKey"));
assertEquals(5.2, keyValue.getDouble("DoubleKey"), 0.0);