This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new c0745bd [ISSUE #132]The message content is inconsistent after
synchronization (#134)
c0745bd is described below
commit c0745bdaec5478ddee6c0af8ed9669c1d72f65fd
Author: zhangjidi2016 <[email protected]>
AuthorDate: Tue May 17 13:42:35 2022 +0800
[ISSUE #132]The message content is inconsistent after synchronization (#134)
Co-authored-by: zhangjidi <[email protected]>
---
.../java/org/apache/rocketmq/replicator/MetaSourceTask.java | 2 +-
.../java/org/apache/rocketmq/replicator/RmqSourceTask.java | 10 ++++------
.../java/org/apache/rocketmq/replicator/common/Utils.java | 13 +++++++------
.../org/apache/rocketmq/replicator/config/ConfigUtil.java | 2 +-
4 files changed, 13 insertions(+), 14 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
index 35d4714..3f4f2da 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -138,7 +138,7 @@ public class MetaSourceTask extends SourceTask {
JSONObject jsonObject = new JSONObject();
jsonObject.put(FieldName.OFFSET.getKey(), targetOffset);
- ConnectRecord connectRecord = new
ConnectRecord(Utils.offsetKey(mq.getTopic(), mq.getBrokerName(),
String.valueOf(mq.getQueueId())),
+ ConnectRecord connectRecord = new
ConnectRecord(Utils.offsetKey(mq),
Utils.offsetValue(srcOffset), System.currentTimeMillis(),
schema, jsonObject.toJSONString());
res.add(connectRecord);
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 73cebfd..3b633cd 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -28,6 +28,7 @@ import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -172,10 +173,8 @@ public class RmqSourceTask extends SourceTask {
Schema schema = new
Schema(SchemaEnum.MESSAGE.name(), FieldType.STRING, fields);
schema.getFields().add(new Field(0,
FieldName.COMMON_MESSAGE.getKey(), SchemaBuilder.string().build()));
for (MessageExt msg : msgs) {
- JSONObject jsonObject = new JSONObject();
-
jsonObject.put(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
- ConnectRecord connectRecord = new
ConnectRecord(Utils.offsetKey(taskTopicConfig.getTopic(),
taskTopicConfig.getBrokerName(), String.valueOf(msg.getQueueId())),
-
Utils.offsetValue(pullResult.getNextBeginOffset()), System.currentTimeMillis(),
schema, jsonObject.toJSONString());
+ ConnectRecord connectRecord = new
ConnectRecord(Utils.offsetKey(taskTopicConfig),
+
Utils.offsetValue(pullResult.getNextBeginOffset()), System.currentTimeMillis(),
schema, new String(msg.getBody(), StandardCharsets.UTF_8));
res.add(connectRecord);
}
break;
@@ -246,8 +245,7 @@ public class RmqSourceTask extends SourceTask {
OffsetStorageReader offsetStorageReader) {
Map<TaskTopicInfo, Long> positionMap = new HashMap<>();
for (TaskTopicInfo tti : taskList) {
- RecordOffset positionInfo =
offsetStorageReader.readOffset(Utils.offsetKey(tti.getTopic(),
tti.getBrokerName(),
- String.valueOf(tti.getQueueId())));
+ RecordOffset positionInfo =
offsetStorageReader.readOffset(Utils.offsetKey(tti));
if (positionInfo != null && null != positionInfo.getOffset()) {
Map<String, ?> offset = positionInfo.getOffset();
Object lastRecordedOffset =
offset.get(RmqConstants.NEXT_POSITION);
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index ac2efc0..290dbff 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
@@ -208,16 +209,16 @@ public class Utils {
return targetMQAdminExt;
}
- public static RecordPartition offsetKey(String topic, String broker,
String queueId) {
- Map<String, String> map = new HashMap<>();
- map.put(RmqConstants.TOPIC_NAME, topic);
- map.put(RmqConstants.BROKER_NAME, broker);
- map.put(RmqConstants.QUEUE_ID, queueId);
+ public static RecordPartition offsetKey(MessageQueue mq) {
+ Map<String, String> map = new HashMap<>(4);
+ map.put(RmqConstants.TOPIC_NAME, mq.getTopic());
+ map.put(RmqConstants.BROKER_NAME, mq.getBrokerName());
+ map.put(RmqConstants.QUEUE_ID, String.valueOf(mq.getQueueId()));
return new RecordPartition(map);
}
public static RecordOffset offsetValue(Long pos) {
- Map<String, String> map = new HashMap<>();
+ Map<String, String> map = new HashMap<>(2);
map.put(RmqConstants.NEXT_POSITION, String.valueOf(pos));
return new RecordOffset(map);
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
index 72f2d97..82c7448 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
@@ -32,7 +32,7 @@ public class ConfigUtil {
Method[] methods = object.getClass().getMethods();
for (Method method : methods) {
String mn = method.getName();
- if ("set".startsWith(mn)) {
+ if (mn.startsWith("set")) {
try {
String tmp = mn.substring(4);
String first = mn.substring(3, 4);