This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 517bcbb6 add replicator checkpoint doc (#486)
517bcbb6 is described below
commit 517bcbb6e18db146bd3e8d282075065fd3f4c57a
Author: zhoubo <[email protected]>
AuthorDate: Fri Apr 21 17:54:00 2023 +0800
add replicator checkpoint doc (#486)
modify metrics.conf default config
---
connectors/rocketmq-replicator/README.md | 104 +++++++++++++++++----
.../replicator/ReplicatorCheckpointTask.java | 2 +-
.../rocketmq/replicator/ReplicatorSourceTask.java | 2 +-
distribution/conf/metrics.conf | 3 +-
4 files changed, 89 insertions(+), 22 deletions(-)
diff --git a/connectors/rocketmq-replicator/README.md
b/connectors/rocketmq-replicator/README.md
index 7d1bec67..21d64945 100644
--- a/connectors/rocketmq-replicator/README.md
+++ b/connectors/rocketmq-replicator/README.md
@@ -74,6 +74,68 @@ curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connector
}'
````
+同步位点
+
+````
+curl -X POST -H "Content-Type: application/json"
http://${runtime-port}:${runtime-ip}/connectors/${replicator-name} -d '{
+ "connector.class":
"org.apache.rocketmq.replicator.ReplicatorCheckpointConnector",
+ "src.cluster": "${srcDefaultCluster}",
+ "src.endpoint": "${namesrvEndpoint}",
+ "dest.acl.enable": "false",
+ "src.secret.key": "${sk}",
+ "dest.access.key": "${ak}",
+ "src.topictags": "test1",
+ "sync.gids":"test_gid1",
+ "src.acl.enable": "false",
+ "errors.tolerance": "all",
+ "dest.secret.key": "${sk}",
+ "dest.endpoint": "${namesrvEndpoint}",
+ "src.access.key": "${ak}",
+ "dest.cluster": "${targetDefaultCluster}",
+ "source.cluster": "${sourceDefaultCluster}",
+ "dest.region": "${regionA}",
+ "src.region": "${regionB}",
+ "dest.cloud": "${cloud1}",
+ "source.cloud": "${cloud2}",
+
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}'
+````
+例如
+````
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/test_checkpoint_replicator -d '{
+ "connector.class":
"org.apache.rocketmq.replicator.ReplicatorCheckpointConnector",
+ "src.endpoint": "127.0.0.1:9876",
+ "src.cluster": "DefaultCluster",
+ "src.region": "regionA",
+ "src.cloud": "cloud1",
+ "dest.acl.enable": "false",
+ "src.topictags": "TopicTest",
+ "src.acl.enable": "false",
+ "dest.endpoint": "127.0.0.2:9876",
+ "dest.region": "regionB",
+ "dest.cluster": "DefaultCluster",
+ "errors.tolerance": "all",
+ "sync.gids":"test_gid1",
+ "dest.cloud": "dest-cloud",
+
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}'
+````
+同步位点任务创建成功以后,可以再目的 RocketMQ 集群 replicator_checkpoint Topic中消费到位点信息
+
+位点信息格式
+
+````
+Struct{consumerGroup=test_gid1,topic=TopicTest,upstreamLastTimestamp=1681962375040,downstreamLastTimestamp=1681439588399,metadata=1682068439879}
+````
+
+parameter | type| description |
+---|-------------|----------------------------------------|
+consumerGroup | String | consumerGroup |
+topic | String | topic |
+upstreamLastTimestamp | long | 源 RocketMQ topic consumerGourp 的消费位点 |
+downstreamLastTimestamp | long | 目的 RocketMQ topic consumerGourp 的消费位点
|
## rocketmq-replicator停止
````
@@ -82,24 +144,28 @@ curl
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
## rocketmq-replicator参数说明
-parameter | type | must | description
| sample value
----|---|------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------|
-src.endpoint | String | Yes | namesrv address of source rocketmq cluster
| 127.0.0.1:9876 |
-src.topictags | String | Yes | source cluster topic and tag,${topic},{tag}
| test1,* |
-dest.topic | String | Yes | target cluster topic
| test2 |
-dest.endpoint | String | Yes | namesrv address of target rocketmq cluster
| 127.0.0.1:9876 |
-max.task | String | No | maximum number of tasks
| 2 |
-dest.acl.enable | String | No | acl switch,enumeration value : true/false
| false |
-dest.access.key | String | No | please refer to the RocketMQ ACL module,when
dest.acl.enable is false, this parameter does not take effect
| accesskey |
-dest.secret.key | String | No | please refer to the RocketMQ ACL module,when
dest.acl.enable is false, this parameter does not take effect
| secretkey |
-src.acl.enable | String | No | acl switch,enumeration value : true/false
| true |
-src.access.key | String | No | please refer to the RocketMQ ACL module,when
dest.acl.enable is false, this parameter does not take effect
| accesskey |
-src.secret.key | String | No | please refer to the RocketMQ ACL module,when
dest.acl.enable is false, this parameter does not take effect
| secretkey |
+parameter | type | must | description
| sample value
+---|---|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------|
+src.endpoint | String | Yes | namesrv address of source rocketmq cluster
| 127.0.0.1:9876 |
+src.topictags | String | Yes | source cluster topic and tag,${topic},{tag}
| test1,* |
+dest.topic | String | Yes | target cluster topic
| test2 |
+dest.endpoint | String | Yes | namesrv address of target rocketmq cluster
| 127.0.0.1:9876 |
+max.task | String | No | maximum number of tasks
| 2 |
+dest.acl.enable | String | No | acl switch,enumeration value : true/false
| false |
+dest.access.key | String | No | please refer to the RocketMQ ACL module,when
dest.acl.enable is false, this parameter does not take effect
| accesskey |
+dest.secret.key | String | No | please refer to the RocketMQ ACL module,when
dest.acl.enable is false, this parameter does not take effect
| secretkey |
+src.acl.enable | String | No | acl switch,enumeration value : true/false
| true |
+src.access.key | String | No | please refer to the RocketMQ ACL module,when
dest.acl.enable is false, this parameter does not take effect
| accesskey |
+src.secret.key | String | No | please refer to the RocketMQ ACL module,when
dest.acl.enable is false, this parameter does not take effect
| secretkey |
errors.tolerance | String | No | error tolerance ,enumeration value : all .
all means to tolerate all errors, the synchronization message failure will be
skipped and error log will be printed. If there is no error tolerance
configured, all errors will not be tolerated by default, a synchronization
failure occurs, and the task will stop after multiple retries | all |
-src.cluster | String | No | source cluster
| DefaultCluster |
-dest.cluster | String | No | target cluster
| DefaultCluster |
-src.region | String | No | source region
| regionA |
-dest.region | String | No | source region
| regionB |
-src.cloud | String | No | source cloud
| cloud1 |
-dest.cloud | String | No | source cloud
| cloud2 |
+src.cluster | String | No | source cluster
| DefaultCluster |
+dest.cluster | String | No | target cluster
| DefaultCluster |
+src.region | String | No | source region
| regionA |
+dest.region | String | No | source region
| regionB |
+src.cloud | String | No | source cloud
| cloud1 |
+dest.cloud | String | No | source cloud
| cloud2 |
+sync.gids | String | No | consumeGroup
| consumerGroup1 |
+key.converter | String | No | key.converter
|
org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter
|
+value.converter | String | No | value converter
|
org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter
|
+
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
index 6c9946d2..5a44ec61 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
@@ -258,7 +258,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
-
connectorConfig.setSrcTopicTags(config.getString(connectorConfig.getSrcTopicTags()));
+
connectorConfig.setSrcTopicTags(config.getString(connectorConfig.SRC_TOPICTAGS));
connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index af9352a8..f20cba48 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -386,7 +386,7 @@ public class ReplicatorSourceTask extends SourceTask {
}
public synchronized boolean putPulledQueueOffset(MessageQueue mq, long
currentOffset, int needAck, MessageExt msg) {
- log.info("putPulledQueueOffset " + mq + ", currentOffset : " +
currentOffset + ", ackCount : " + needAck);
+ log.trace("putPulledQueueOffset " + mq + ", currentOffset : " +
currentOffset + ", ackCount : " + needAck);
TreeMap<Long, UnAckMessage> offsets = queue2Offsets.get(mq);
if (offsets == null) {
TreeMap<Long, UnAckMessage> newOffsets = new TreeMap<>();
diff --git a/distribution/conf/metrics.conf b/distribution/conf/metrics.conf
index 45158f7e..98c1d6db 100644
--- a/distribution/conf/metrics.conf
+++ b/distribution/conf/metrics.conf
@@ -15,10 +15,11 @@
# metrics reporter class
-metrics.reporter=org.apache.rocketmq.connect.runtime.metrics.RocketMQScheduledReporter
+metrics.reporter=org.apache.rocketmq.connect.metrics.reporter.RocketMQScheduledReporter
# metrics topic
metrics.topic=metrics-topic
+group.id=metrics-gid
# Rocketmq namesrvAddr
name.srv.addr=localhost:9876
