This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 176bb876 [ISSUE #464] Fix and optimize the way of commit offset in
replicator (#465)
176bb876 is described below
commit 176bb8760cfbea7f5949d9c1fafdafc0b0a75747
Author: rongtong <[email protected]>
AuthorDate: Fri Apr 7 15:07:44 2023 +0800
[ISSUE #464] Fix and optimize the way of commit offset in replicator (#465)
* Fix and optimize the way of commit offset in replicator
* Fix and optimize the way of commit offset in replicator
* Fix and optimize the way of commit offset in replicator
---
.../replicator/ReplicatorCheckpointTask.java | 33 ++++-----
.../replicator/ReplicatorSourceConnector.java | 22 +++---
.../rocketmq/replicator/ReplicatorSourceTask.java | 81 +++++++++-------------
.../config/ReplicatorConnectorConfig.java | 11 +++
4 files changed, 71 insertions(+), 76 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
index 04aa19eb..8eba84a0 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
@@ -57,7 +57,6 @@ import static
org.apache.rocketmq.replicator.utils.ReplicatorUtils.METADATA_KEY;
import static org.apache.rocketmq.replicator.utils.ReplicatorUtils.TOPIC_KEY;
import static
org.apache.rocketmq.replicator.utils.ReplicatorUtils.UPSTREAM_LASTTIMESTAMP_KEY;
-
public class ReplicatorCheckpointTask extends SourceTask {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
@@ -66,21 +65,18 @@ public class ReplicatorCheckpointTask extends SourceTask {
private DefaultMQAdminExt srcMqAdminExt;
private DefaultMQAdminExt targetMqAdminExt;
-
public static final Schema VALUE_SCHEMA_V0 = SchemaBuilder.struct()
- .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
- .field(TOPIC_KEY, SchemaBuilder.string().build())
- .field(UPSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
- .field(DOWNSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
- .field(METADATA_KEY, SchemaBuilder.string().build())
- .build();
-
+ .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
+ .field(TOPIC_KEY, SchemaBuilder.string().build())
+ .field(UPSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
+ .field(DOWNSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
+ .field(METADATA_KEY, SchemaBuilder.string().build())
+ .build();
public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
- .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
- .field(TOPIC_KEY, SchemaBuilder.string().build())
- .build();
-
+ .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
+ .field(TOPIC_KEY, SchemaBuilder.string().build())
+ .build();
private void buildMqAdmin() throws MQClientException {
buildAndStartSrcMQAdmin();
@@ -133,7 +129,6 @@ public class ReplicatorCheckpointTask extends SourceTask {
}
}
-
@Override
public List<ConnectRecord> poll() throws InterruptedException {
if (lastCheckPointTimestamp +
connectorConfig.getCheckpointIntervalMs() > System.currentTimeMillis()) {
@@ -143,7 +138,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
}
log.info("not sleep " + lastCheckPointTimestamp + ", " +
connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
List<ConnectRecord> connectRecords = new LinkedList<>();
- // pull consumergroup's consumer commit offset
+ // pull consumer group's consumer commit offset
String syncGids = connectorConfig.getSyncGids();
if (StringUtils.isEmpty(syncGids)) {
lastCheckPointTimestamp = System.currentTimeMillis();
@@ -182,7 +177,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
connectRecord.addExtension(TOPIC_KEY,
connectorConfig.getCheckpointTopic());
connectRecords.add(connectRecord);
} catch (Exception e) {
- log.error("examineConsumeStats gid : " + consumerGroup
+ ", topic : " + srcTopic + " error", e);
+ log.error("examineConsumeStats gid : " + consumerGroup
+ ", topic : " + srcTopic + " error", e);
}
}
}
@@ -195,7 +190,8 @@ public class ReplicatorCheckpointTask extends SourceTask {
}
@NotNull
- private static Struct buildCheckpointPayload(String
srcTopicWithInstanceId, String srcConsumerGroupWithInstanceId, long
minSrcLasttimestamp, long minDestLasttimestamp) {
+ private static Struct buildCheckpointPayload(String
srcTopicWithInstanceId, String srcConsumerGroupWithInstanceId,
+ long minSrcLasttimestamp, long minDestLasttimestamp) {
Struct struct = new Struct(VALUE_SCHEMA_V0);
struct.put(CONSUMER_GROUP_KEY, srcConsumerGroupWithInstanceId);
struct.put(TOPIC_KEY, srcTopicWithInstanceId);
@@ -212,7 +208,8 @@ public class ReplicatorCheckpointTask extends SourceTask {
return struct;
}
- private long getMinSrcLasttimestamp(ConsumeStats consumeStats) throws
RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ private long getMinSrcLasttimestamp(
+ ConsumeStats consumeStats) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException {
long minSrcLasttimestamp = -1;
if (null != consumeStats) {
Map<MessageQueue, OffsetWrapper> offsetTable =
consumeStats.getOffsetTable();
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index efd34e1a..0e9f1f96 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -22,6 +22,14 @@ import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.List;
+import java.util.Set;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@@ -40,18 +48,15 @@ import
org.apache.rocketmq.connect.runtime.errors.ToleranceType;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import java.util.*;
-
import static
org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID;
import static
org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static
org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;
/**
* @author osgoo
- * @date 2022/6/16
*/
public class ReplicatorSourceConnector extends SourceConnector {
- private Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
+ private final Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
private KeyValue connectorConfig;
private DefaultMQAdminExt srcMQAdminExt;
@@ -108,12 +113,12 @@ public class ReplicatorSourceConnector extends
SourceConnector {
}
});
List<List<MessageQueue>> result = new ArrayList<>(maxTasks);
- for (int i = 0;i < maxTasks;i++) {
+ for (int i = 0; i < maxTasks; i++) {
List<MessageQueue> subTasks = new ArrayList<>();
result.add(subTasks);
log.info("add subTask");
}
- for (int i = 0;i < taskTopicInfos.size();i++) {
+ for (int i = 0; i < taskTopicInfos.size(); i++) {
int hash = i % maxTasks;
MessageQueue messageQueue = taskTopicInfos.get(i);
result.get(hash).add(messageQueue);
@@ -150,9 +155,8 @@ public class ReplicatorSourceConnector extends
SourceConnector {
List<List<MessageQueue>> normalDivided = divide(messageQueues,
maxTasks);
log.info("normalDivided : " + normalDivided + " " + normalDivided);
-
List<KeyValue> configs = new ArrayList<>();
- for (int i = 0;i < maxTasks;i++) {
+ for (int i = 0; i < maxTasks; i++) {
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES,
JSON.toJSONString(normalDivided.get(i)));
@@ -183,6 +187,7 @@ public class ReplicatorSourceConnector extends
SourceConnector {
keyValue.put(ReplicatorConnectorConfig.DEST_SECRET_KEY,
connectorConfig.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.SYNC_TPS,
connectorConfig.getInt(ReplicatorConnectorConfig.SYNC_TPS,
ReplicatorConnectorConfig.DEFAULT_SYNC_TPS));
+ keyValue.put(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS,
connectorConfig.getLong(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS,
10 * 1000L));
configs.add(keyValue);
log.info("ReplicatorSourceConnector sub task config : " +
keyValue);
@@ -227,7 +232,6 @@ public class ReplicatorSourceConnector extends
SourceConnector {
}
};
-
@Override
public void validate(KeyValue config) {
log.info("sourceconnectValidate : " + config);
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index 337627e1..e76eae44 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -21,7 +21,6 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.*;
import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -88,6 +87,12 @@ public class ReplicatorSourceTask extends SourceTask {
return new Thread(r, "Replicator_lag_metrics");
}
});
+ private ScheduledExecutorService commitOffsetScheduleService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "Commit_offset_schedule");
+ }
+ });
private Map<String, List<String>> metricsItem2KeyMap = new HashMap<>();
private final long period = 60 * 1000;
private DefaultLitePullConsumer pullConsumer;
@@ -114,16 +119,16 @@ public class ReplicatorSourceTask extends SourceTask {
private AtomicInteger pollCounter = new AtomicInteger();
private AtomicInteger rateCounter = new AtomicInteger();
- private final String REPLICATOR_SRC_TOPIC_PROPERTY_KEY =
"REPLICATOR-source-topic";
+ private static final String REPLICATOR_SRC_TOPIC_PROPERTY_KEY =
"REPLICATOR-source-topic";
// msg born timestamp on src
- private final String REPLICATOR_BORN_SOURCE_TIMESTAMP =
"REPLICATOR-BORN-SOURCE-TIMESTAMP";
+ private static final String REPLICATOR_BORN_SOURCE_TIMESTAMP =
"REPLICATOR-BORN-SOURCE-TIMESTAMP";
// msg born from where
- private final String REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION =
"REPLICATOR-BORN-SOURCE";
+ private static final String REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION =
"REPLICATOR-BORN-SOURCE";
// msg born from which topic
- private final String REPLICATOR_BORE_INSTANCEID_TOPIC =
"REPLICATOR-BORN-TOPIC";
- // src messageid equals MessageConst.PROPERTY_EXTEND_UNIQ_INFO
+ private static final String REPLICATOR_BORE_INSTANCEID_TOPIC =
"REPLICATOR-BORN-TOPIC";
+ // src message id equals MessageConst.PROPERTY_EXTEND_UNIQ_INFO
private static final String REPLICATOR_SRC_MESSAGE_ID = "EXTEND_UNIQ_INFO";
- // src dupinof equals MessageConst.DUP_INFO
+ // src dup info equals MessageConst.DUP_INFO
private static final String REPLICATOR_DUP_INFO = "DUP_INFO";
// following sys reserved properties
@@ -231,8 +236,8 @@ public class ReplicatorSourceTask extends SourceTask {
srcMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr,
subscriptionGroupConfig);
log.info("create subscription group to {} success.", addr);
} catch (Exception e) {
- log.error(" create subscription error,", e);
- Thread.sleep(1000 * 1);
+ log.error("create subscription error,", e);
+ Thread.sleep(1000);
}
}
}
@@ -284,8 +289,8 @@ public class ReplicatorSourceTask extends SourceTask {
for (MessageQueue mq : allQueues) {
String topic = mq.getTopic();
- String tag =
connectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(),
connectorConfig.getSrcTopicTags()).get(topic);
-// pullConsumer.setSubExpressionForAssign(topic, tag);
+ String tag =
ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(),
connectorConfig.getSrcTopicTags()).get(topic);
+ pullConsumer.setSubExpressionForAssign(topic, tag);
}
try {
@@ -313,31 +318,20 @@ public class ReplicatorSourceTask extends SourceTask {
@Override
public void run() {
replicateLagMetric();
- commitOffsetSchedule();
}
}, period, period, TimeUnit.MILLISECONDS);
+
+ commitOffsetScheduleService.scheduleAtFixedRate(new Runnable() {
+ @Override public void run() {
+ commitOffsetSchedule();
+ }
+ }, connectorConfig.getCommitOffsetIntervalMs(),
connectorConfig.getCommitOffsetIntervalMs(), TimeUnit.MILLISECONDS);
}
private void commitOffsetSchedule() {
- ConcurrentHashMap<MessageQueue, AtomicLong> bakPrepareCommitOffset =
prepareCommitOffset;
- prepareCommitOffset = new ConcurrentHashMap<>();
- if (MapUtils.isNotEmpty(bakPrepareCommitOffset)) {
- bakPrepareCommitOffset.forEach(new BiConsumer<MessageQueue,
AtomicLong>() {
- @Override
- public void accept(MessageQueue mq, AtomicLong atomicLong) {
- long canCommitOffset = atomicLong.get();
- log.info("markQueueCommitted commit mq : " + mq + " offset
: " + canCommitOffset);
- try {
- // commit offset directly to broker
- pullConsumer.getOffsetStore().updateOffset(mq,
canCommitOffset, true);
-
pullConsumer.getOffsetStore().updateConsumeOffsetToBroker(mq, canCommitOffset,
true);
- log.info("update consumer offset mq : " + mq + " ,
offset : " + canCommitOffset);
- } catch (Exception e) {
- log.warn("update consume offset error, mq[" + mq + "],
commitOffset[" + canCommitOffset + "]");
- }
- }
- });
- }
+ Map<MessageQueue, Long> commitOffsetTable = new HashMap<>();
+ prepareCommitOffset.forEach((messageQueue, offset) ->
commitOffsetTable.put(messageQueue, offset.get()));
+ pullConsumer.commitSync(commitOffsetTable, true);
}
private void replicateLagMetric() {
@@ -632,18 +626,6 @@ public class ReplicatorSourceTask extends SourceTask {
MixAll.compareAndIncreaseOnly(commitOffset, canCommitOffset);
}
- @Override
- public void commit() {
-
- }
-
- @Override
- public void commit(List<ConnectRecord> records, Map<String, String>
metadata) {
- for (ConnectRecord record : records) {
- this.commit(record, metadata);
- }
- }
-
@Override
public void commit(ConnectRecord record, Map<String, String> metadata) {
if (metadata == null) {
@@ -699,18 +681,20 @@ public class ReplicatorSourceTask extends SourceTask {
connectorConfig.setDestInstanceId(config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
connectorConfig.setDestEndpoint(config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
connectorConfig.setDestTopic(config.getString(ReplicatorConnectorConfig.DEST_TOPIC));
-
connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
"true")));
-
connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
"true")));
-
connectorConfig.setAutoCreateInnerConsumergroup(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP,
"false")));
+
connectorConfig.setDestAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
"true")));
+
connectorConfig.setSrcAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
"true")));
+
connectorConfig.setAutoCreateInnerConsumergroup(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP,
"false")));
connectorConfig.setSyncTps(config.getInt(ReplicatorConnectorConfig.SYNC_TPS));
connectorConfig.setDividedNormalQueues(config.getString(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES));
connectorConfig.setSrcAccessKey(config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY));
connectorConfig.setSrcSecretKey(config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY));
+
connectorConfig.setCommitOffsetIntervalMs(config.getLong(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS,
10 * 1000));
+
connectorConfig.setConsumeFromWhere(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name()));
if (connectorConfig.getConsumeFromWhere() ==
ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
-
connectorConfig.setConsumeFromTimestamp(Long.valueOf(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP)));
+
connectorConfig.setConsumeFromTimestamp(Long.parseLong(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP)));
}
log.info("ReplicatorSourceTask connectorConfig : " + connectorConfig);
@@ -731,8 +715,7 @@ public class ReplicatorSourceTask extends SourceTask {
buildConsumer();
log.info("buildConsumer finished.");
// init limiter
- int limit = connectorConfig.getSyncTps();
- tpsLimit = limit;
+ tpsLimit = connectorConfig.getSyncTps();
log.info("RateLimiter init finished.");
// subscribe topic & start consumer
subscribeTopicAndStartConsumer();
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index ab789c69..2713a165 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -92,6 +92,7 @@ public class ReplicatorConnectorConfig {
//
private int heartbeatIntervalMs = 1 * 1000;
private int checkpointIntervalMs = 10 * 1000;
+ private long commitOffsetIntervalMs = 10 * 1000;
private String heartbeatTopic;
private String checkpointTopic;
public final static String DEFAULT_HEARTBEAT_TOPIC =
"replicator_heartbeat";
@@ -160,6 +161,7 @@ public class ReplicatorConnectorConfig {
public final static String DIVIDED_DLQ_QUEUES = "divided.dlqqueues";
public final static String SYNC_TPS = "sync.tps";
public final static String MAX_TASK = "max.task";
+ public final static String COMMIT_OFFSET_INTERVALS_MS =
"commit.offset.interval.ms";
public String getTaskId() {
return taskId;
@@ -627,6 +629,14 @@ public class ReplicatorConnectorConfig {
this.consumeFromWhere = consumeFromWhere;
}
+ public long getCommitOffsetIntervalMs() {
+ return commitOffsetIntervalMs;
+ }
+
+ public void setCommitOffsetIntervalMs(long commitOffsetIntervalMs) {
+ this.commitOffsetIntervalMs = commitOffsetIntervalMs;
+ }
+
@Override
public String toString() {
return "ReplicatorConnectorConfig{" +
@@ -668,6 +678,7 @@ public class ReplicatorConnectorConfig {
", syncTps=" + syncTps +
", heartbeatIntervalMs=" + heartbeatIntervalMs +
", checkpointIntervalMs=" + checkpointIntervalMs +
+ ", commitOffsetIntervalMs=" + commitOffsetIntervalMs +
", heartbeatTopic='" + heartbeatTopic + '\'' +
", checkpointTopic='" + checkpointTopic + '\'' +
", eachQueueBufferSize=" + eachQueueBufferSize +