This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new c0745bd  [ISSUE #132]The message content is inconsistent after 
synchronization (#134)
c0745bd is described below

commit c0745bdaec5478ddee6c0af8ed9669c1d72f65fd
Author: zhangjidi2016 <[email protected]>
AuthorDate: Tue May 17 13:42:35 2022 +0800

    [ISSUE #132]The message content is inconsistent after synchronization (#134)
    
    Co-authored-by: zhangjidi <[email protected]>
---
 .../java/org/apache/rocketmq/replicator/MetaSourceTask.java |  2 +-
 .../java/org/apache/rocketmq/replicator/RmqSourceTask.java  | 10 ++++------
 .../java/org/apache/rocketmq/replicator/common/Utils.java   | 13 +++++++------
 .../org/apache/rocketmq/replicator/config/ConfigUtil.java   |  2 +-
 4 files changed, 13 insertions(+), 14 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
index 35d4714..3f4f2da 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -138,7 +138,7 @@ public class MetaSourceTask extends SourceTask {
 
                 JSONObject jsonObject = new JSONObject();
                 jsonObject.put(FieldName.OFFSET.getKey(), targetOffset);
-                ConnectRecord connectRecord = new 
ConnectRecord(Utils.offsetKey(mq.getTopic(), mq.getBrokerName(), 
String.valueOf(mq.getQueueId())),
+                ConnectRecord connectRecord = new 
ConnectRecord(Utils.offsetKey(mq),
                     Utils.offsetValue(srcOffset), System.currentTimeMillis(), 
schema, jsonObject.toJSONString());
                 res.add(connectRecord);
             }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 73cebfd..3b633cd 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -28,6 +28,7 @@ import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.data.SchemaBuilder;
 import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -172,10 +173,8 @@ public class RmqSourceTask extends SourceTask {
                             Schema schema = new 
Schema(SchemaEnum.MESSAGE.name(), FieldType.STRING, fields);
                             schema.getFields().add(new Field(0, 
FieldName.COMMON_MESSAGE.getKey(), SchemaBuilder.string().build()));
                             for (MessageExt msg : msgs) {
-                                JSONObject jsonObject = new JSONObject();
-                                
jsonObject.put(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
-                                ConnectRecord connectRecord = new 
ConnectRecord(Utils.offsetKey(taskTopicConfig.getTopic(), 
taskTopicConfig.getBrokerName(), String.valueOf(msg.getQueueId())),
-                                    
Utils.offsetValue(pullResult.getNextBeginOffset()), System.currentTimeMillis(), 
schema, jsonObject.toJSONString());
+                                ConnectRecord connectRecord = new 
ConnectRecord(Utils.offsetKey(taskTopicConfig),
+                                    
Utils.offsetValue(pullResult.getNextBeginOffset()), System.currentTimeMillis(), 
schema, new String(msg.getBody(), StandardCharsets.UTF_8));
                                 res.add(connectRecord);
                             }
                             break;
@@ -246,8 +245,7 @@ public class RmqSourceTask extends SourceTask {
         OffsetStorageReader offsetStorageReader) {
         Map<TaskTopicInfo, Long> positionMap = new HashMap<>();
         for (TaskTopicInfo tti : taskList) {
-            RecordOffset positionInfo = 
offsetStorageReader.readOffset(Utils.offsetKey(tti.getTopic(), 
tti.getBrokerName(),
-                String.valueOf(tti.getQueueId())));
+            RecordOffset positionInfo = 
offsetStorageReader.readOffset(Utils.offsetKey(tti));
             if (positionInfo != null && null != positionInfo.getOffset()) {
                 Map<String, ?> offset = positionInfo.getOffset();
                 Object lastRecordedOffset = 
offset.get(RmqConstants.NEXT_POSITION);
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index ac2efc0..290dbff 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -208,16 +209,16 @@ public class Utils {
         return targetMQAdminExt;
     }
 
-    public static RecordPartition offsetKey(String topic, String broker, 
String queueId) {
-        Map<String, String> map = new HashMap<>();
-        map.put(RmqConstants.TOPIC_NAME, topic);
-        map.put(RmqConstants.BROKER_NAME, broker);
-        map.put(RmqConstants.QUEUE_ID, queueId);
+    public static RecordPartition offsetKey(MessageQueue mq) {
+        Map<String, String> map = new HashMap<>(4);
+        map.put(RmqConstants.TOPIC_NAME, mq.getTopic());
+        map.put(RmqConstants.BROKER_NAME, mq.getBrokerName());
+        map.put(RmqConstants.QUEUE_ID, String.valueOf(mq.getQueueId()));
         return new RecordPartition(map);
     }
 
     public static RecordOffset offsetValue(Long pos) {
-        Map<String, String> map = new HashMap<>();
+        Map<String, String> map = new HashMap<>(2);
         map.put(RmqConstants.NEXT_POSITION, String.valueOf(pos));
         return new RecordOffset(map);
     }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
index 72f2d97..82c7448 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
@@ -32,7 +32,7 @@ public class ConfigUtil {
         Method[] methods = object.getClass().getMethods();
         for (Method method : methods) {
             String mn = method.getName();
-            if ("set".startsWith(mn)) {
+            if (mn.startsWith("set")) {
                 try {
                     String tmp = mn.substring(4);
                     String first = mn.substring(3, 4);

Reply via email to