This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 176bb876 [ISSUE #464] Fix and optimize the way of commit offset in 
replicator (#465)
176bb876 is described below

commit 176bb8760cfbea7f5949d9c1fafdafc0b0a75747
Author: rongtong <[email protected]>
AuthorDate: Fri Apr 7 15:07:44 2023 +0800

    [ISSUE #464] Fix and optimize the way of commit offset in replicator (#465)
    
    * Fix and optimize the way of commit offset in replicator
    
    * Fix and optimize the way of commit offset in replicator
    
    * Fix and optimize the way of commit offset in replicator
---
 .../replicator/ReplicatorCheckpointTask.java       | 33 ++++-----
 .../replicator/ReplicatorSourceConnector.java      | 22 +++---
 .../rocketmq/replicator/ReplicatorSourceTask.java  | 81 +++++++++-------------
 .../config/ReplicatorConnectorConfig.java          | 11 +++
 4 files changed, 71 insertions(+), 76 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
index 04aa19eb..8eba84a0 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java
@@ -57,7 +57,6 @@ import static 
org.apache.rocketmq.replicator.utils.ReplicatorUtils.METADATA_KEY;
 import static org.apache.rocketmq.replicator.utils.ReplicatorUtils.TOPIC_KEY;
 import static 
org.apache.rocketmq.replicator.utils.ReplicatorUtils.UPSTREAM_LASTTIMESTAMP_KEY;
 
-
 public class ReplicatorCheckpointTask extends SourceTask {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.REPLICATRO_RUNTIME);
@@ -66,21 +65,18 @@ public class ReplicatorCheckpointTask extends SourceTask {
     private DefaultMQAdminExt srcMqAdminExt;
     private DefaultMQAdminExt targetMqAdminExt;
 
-
     public static final Schema VALUE_SCHEMA_V0 = SchemaBuilder.struct()
-            .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
-            .field(TOPIC_KEY, SchemaBuilder.string().build())
-            .field(UPSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
-            .field(DOWNSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
-            .field(METADATA_KEY, SchemaBuilder.string().build())
-            .build();
-
+        .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
+        .field(TOPIC_KEY, SchemaBuilder.string().build())
+        .field(UPSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
+        .field(DOWNSTREAM_LASTTIMESTAMP_KEY, SchemaBuilder.int64().build())
+        .field(METADATA_KEY, SchemaBuilder.string().build())
+        .build();
 
     public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
-            .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
-            .field(TOPIC_KEY, SchemaBuilder.string().build())
-            .build();
-
+        .field(CONSUMER_GROUP_KEY, SchemaBuilder.string().build())
+        .field(TOPIC_KEY, SchemaBuilder.string().build())
+        .build();
 
     private void buildMqAdmin() throws MQClientException {
         buildAndStartSrcMQAdmin();
@@ -133,7 +129,6 @@ public class ReplicatorCheckpointTask extends SourceTask {
         }
     }
 
-
     @Override
     public List<ConnectRecord> poll() throws InterruptedException {
         if (lastCheckPointTimestamp + 
connectorConfig.getCheckpointIntervalMs() > System.currentTimeMillis()) {
@@ -143,7 +138,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
         }
         log.info("not sleep " + lastCheckPointTimestamp + ", " + 
connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
         List<ConnectRecord> connectRecords = new LinkedList<>();
-        // pull consumergroup's consumer commit offset
+        // pull consumer group's consumer commit offset
         String syncGids = connectorConfig.getSyncGids();
         if (StringUtils.isEmpty(syncGids)) {
             lastCheckPointTimestamp = System.currentTimeMillis();
@@ -182,7 +177,7 @@ public class ReplicatorCheckpointTask extends SourceTask {
                         connectRecord.addExtension(TOPIC_KEY, 
connectorConfig.getCheckpointTopic());
                         connectRecords.add(connectRecord);
                     } catch (Exception e) {
-                        log.error("examineConsumeStats gid : " + consumerGroup 
+ ", topic : " + srcTopic +  " error", e);
+                        log.error("examineConsumeStats gid : " + consumerGroup 
+ ", topic : " + srcTopic + " error", e);
                     }
                 }
             }
@@ -195,7 +190,8 @@ public class ReplicatorCheckpointTask extends SourceTask {
     }
 
     @NotNull
-    private static Struct buildCheckpointPayload(String 
srcTopicWithInstanceId, String srcConsumerGroupWithInstanceId, long 
minSrcLasttimestamp, long minDestLasttimestamp) {
+    private static Struct buildCheckpointPayload(String 
srcTopicWithInstanceId, String srcConsumerGroupWithInstanceId,
+        long minSrcLasttimestamp, long minDestLasttimestamp) {
         Struct struct = new Struct(VALUE_SCHEMA_V0);
         struct.put(CONSUMER_GROUP_KEY, srcConsumerGroupWithInstanceId);
         struct.put(TOPIC_KEY, srcTopicWithInstanceId);
@@ -212,7 +208,8 @@ public class ReplicatorCheckpointTask extends SourceTask {
         return struct;
     }
 
-    private long getMinSrcLasttimestamp(ConsumeStats consumeStats) throws 
RemotingException, MQClientException, InterruptedException, MQBrokerException {
+    private long getMinSrcLasttimestamp(
+        ConsumeStats consumeStats) throws RemotingException, 
MQClientException, InterruptedException, MQBrokerException {
         long minSrcLasttimestamp = -1;
         if (null != consumeStats) {
             Map<MessageQueue, OffsetWrapper> offsetTable = 
consumeStats.getOffsetTable();
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
index efd34e1a..0e9f1f96 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java
@@ -22,6 +22,14 @@ import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.List;
+import java.util.Set;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
@@ -40,18 +48,15 @@ import 
org.apache.rocketmq.connect.runtime.errors.ToleranceType;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 
-import java.util.*;
-
 import static 
org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID;
 import static 
org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
 import static 
org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;
 
 /**
  * @author osgoo
- * @date 2022/6/16
  */
 public class ReplicatorSourceConnector extends SourceConnector {
-    private Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
+    private final Log log = LogFactory.getLog(ReplicatorSourceConnector.class);
     private KeyValue connectorConfig;
     private DefaultMQAdminExt srcMQAdminExt;
 
@@ -108,12 +113,12 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
             }
         });
         List<List<MessageQueue>> result = new ArrayList<>(maxTasks);
-        for (int i = 0;i < maxTasks;i++) {
+        for (int i = 0; i < maxTasks; i++) {
             List<MessageQueue> subTasks = new ArrayList<>();
             result.add(subTasks);
             log.info("add subTask");
         }
-        for (int i = 0;i < taskTopicInfos.size();i++) {
+        for (int i = 0; i < taskTopicInfos.size(); i++) {
             int hash = i % maxTasks;
             MessageQueue messageQueue = taskTopicInfos.get(i);
             result.get(hash).add(messageQueue);
@@ -150,9 +155,8 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
         List<List<MessageQueue>> normalDivided = divide(messageQueues, 
maxTasks);
         log.info("normalDivided : " + normalDivided + " " + normalDivided);
 
-
         List<KeyValue> configs = new ArrayList<>();
-        for (int i = 0;i < maxTasks;i++) {
+        for (int i = 0; i < maxTasks; i++) {
             KeyValue keyValue = new DefaultKeyValue();
             keyValue.put(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES, 
JSON.toJSONString(normalDivided.get(i)));
 
@@ -183,6 +187,7 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
             keyValue.put(ReplicatorConnectorConfig.DEST_SECRET_KEY, 
connectorConfig.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, ""));
 
             keyValue.put(ReplicatorConnectorConfig.SYNC_TPS, 
connectorConfig.getInt(ReplicatorConnectorConfig.SYNC_TPS, 
ReplicatorConnectorConfig.DEFAULT_SYNC_TPS));
+            keyValue.put(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS, 
connectorConfig.getLong(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS, 
10 * 1000L));
 
             configs.add(keyValue);
             log.info("ReplicatorSourceConnector sub task config : " + 
keyValue);
@@ -227,7 +232,6 @@ public class ReplicatorSourceConnector extends 
SourceConnector {
         }
     };
 
-
     @Override
     public void validate(KeyValue config) {
         log.info("sourceconnectValidate : " + config);
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
index 337627e1..e76eae44 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java
@@ -21,7 +21,6 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.data.*;
 import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -88,6 +87,12 @@ public class ReplicatorSourceTask extends SourceTask {
             return new Thread(r, "Replicator_lag_metrics");
         }
     });
+    private ScheduledExecutorService commitOffsetScheduleService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "Commit_offset_schedule");
+        }
+    });
     private Map<String, List<String>> metricsItem2KeyMap = new HashMap<>();
     private final long period = 60 * 1000;
     private DefaultLitePullConsumer pullConsumer;
@@ -114,16 +119,16 @@ public class ReplicatorSourceTask extends SourceTask {
     private AtomicInteger pollCounter = new AtomicInteger();
     private AtomicInteger rateCounter = new AtomicInteger();
 
-    private final String REPLICATOR_SRC_TOPIC_PROPERTY_KEY = 
"REPLICATOR-source-topic";
+    private static final String REPLICATOR_SRC_TOPIC_PROPERTY_KEY = 
"REPLICATOR-source-topic";
     // msg born timestamp on src
-    private final String REPLICATOR_BORN_SOURCE_TIMESTAMP = 
"REPLICATOR-BORN-SOURCE-TIMESTAMP";
+    private static final String REPLICATOR_BORN_SOURCE_TIMESTAMP = 
"REPLICATOR-BORN-SOURCE-TIMESTAMP";
     // msg born from where
-    private final String REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION = 
"REPLICATOR-BORN-SOURCE";
+    private static final String REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION = 
"REPLICATOR-BORN-SOURCE";
     // msg born from which topic
-    private final String REPLICATOR_BORE_INSTANCEID_TOPIC = 
"REPLICATOR-BORN-TOPIC";
-    // src messageid  equals MessageConst.PROPERTY_EXTEND_UNIQ_INFO
+    private static final String REPLICATOR_BORE_INSTANCEID_TOPIC = 
"REPLICATOR-BORN-TOPIC";
+    // src message id  equals MessageConst.PROPERTY_EXTEND_UNIQ_INFO
     private static final String REPLICATOR_SRC_MESSAGE_ID = "EXTEND_UNIQ_INFO";
-    // src dupinof  equals MessageConst.DUP_INFO
+    // src dup info  equals MessageConst.DUP_INFO
     private static final String REPLICATOR_DUP_INFO = "DUP_INFO";
 
     // following sys reserved properties
@@ -231,8 +236,8 @@ public class ReplicatorSourceTask extends SourceTask {
                 srcMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
subscriptionGroupConfig);
                 log.info("create subscription group to {} success.", addr);
             } catch (Exception e) {
-                log.error(" create subscription error,", e);
-                Thread.sleep(1000 * 1);
+                log.error("create subscription error,", e);
+                Thread.sleep(1000);
             }
         }
     }
@@ -284,8 +289,8 @@ public class ReplicatorSourceTask extends SourceTask {
 
         for (MessageQueue mq : allQueues) {
             String topic = mq.getTopic();
-            String tag = 
connectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), 
connectorConfig.getSrcTopicTags()).get(topic);
-//            pullConsumer.setSubExpressionForAssign(topic, tag);
+            String tag = 
ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), 
connectorConfig.getSrcTopicTags()).get(topic);
+            pullConsumer.setSubExpressionForAssign(topic, tag);
         }
 
         try {
@@ -313,31 +318,20 @@ public class ReplicatorSourceTask extends SourceTask {
             @Override
             public void run() {
                 replicateLagMetric();
-                commitOffsetSchedule();
             }
         }, period, period, TimeUnit.MILLISECONDS);
+
+        commitOffsetScheduleService.scheduleAtFixedRate(new Runnable() {
+            @Override public void run() {
+                commitOffsetSchedule();
+            }
+        }, connectorConfig.getCommitOffsetIntervalMs(), 
connectorConfig.getCommitOffsetIntervalMs(), TimeUnit.MILLISECONDS);
     }
 
     private void commitOffsetSchedule() {
-        ConcurrentHashMap<MessageQueue, AtomicLong> bakPrepareCommitOffset = 
prepareCommitOffset;
-        prepareCommitOffset = new ConcurrentHashMap<>();
-        if (MapUtils.isNotEmpty(bakPrepareCommitOffset)) {
-            bakPrepareCommitOffset.forEach(new BiConsumer<MessageQueue, 
AtomicLong>() {
-                @Override
-                public void accept(MessageQueue mq, AtomicLong atomicLong) {
-                    long canCommitOffset = atomicLong.get();
-                    log.info("markQueueCommitted commit mq : " + mq + " offset 
: " + canCommitOffset);
-                    try {
-                        // commit offset directly to broker
-                        pullConsumer.getOffsetStore().updateOffset(mq, 
canCommitOffset, true);
-                        
pullConsumer.getOffsetStore().updateConsumeOffsetToBroker(mq, canCommitOffset, 
true);
-                        log.info("update consumer offset mq : " + mq + " , 
offset : " + canCommitOffset);
-                    } catch (Exception e) {
-                        log.warn("update consume offset error, mq[" + mq + "], 
commitOffset[" + canCommitOffset + "]");
-                    }
-                }
-            });
-        }
+        Map<MessageQueue, Long> commitOffsetTable = new HashMap<>();
+        prepareCommitOffset.forEach((messageQueue, offset) -> 
commitOffsetTable.put(messageQueue, offset.get()));
+        pullConsumer.commitSync(commitOffsetTable, true);
     }
 
     private void replicateLagMetric() {
@@ -632,18 +626,6 @@ public class ReplicatorSourceTask extends SourceTask {
         MixAll.compareAndIncreaseOnly(commitOffset, canCommitOffset);
     }
 
-    @Override
-    public void commit() {
-
-    }
-
-    @Override
-    public void commit(List<ConnectRecord> records, Map<String, String> 
metadata) {
-        for (ConnectRecord record : records) {
-            this.commit(record, metadata);
-        }
-    }
-
     @Override
     public void commit(ConnectRecord record, Map<String, String> metadata) {
         if (metadata == null) {
@@ -699,18 +681,20 @@ public class ReplicatorSourceTask extends SourceTask {
         
connectorConfig.setDestInstanceId(config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
         
connectorConfig.setDestEndpoint(config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
         
connectorConfig.setDestTopic(config.getString(ReplicatorConnectorConfig.DEST_TOPIC));
-        
connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
 "true")));
-        
connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
 "true")));
-        
connectorConfig.setAutoCreateInnerConsumergroup(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP,
 "false")));
+        
connectorConfig.setDestAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE,
 "true")));
+        
connectorConfig.setSrcAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE,
 "true")));
+        
connectorConfig.setAutoCreateInnerConsumergroup(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP,
 "false")));
 
         
connectorConfig.setSyncTps(config.getInt(ReplicatorConnectorConfig.SYNC_TPS));
         
connectorConfig.setDividedNormalQueues(config.getString(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES));
         
connectorConfig.setSrcAccessKey(config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY));
         
connectorConfig.setSrcSecretKey(config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY));
 
+        
connectorConfig.setCommitOffsetIntervalMs(config.getLong(ReplicatorConnectorConfig.COMMIT_OFFSET_INTERVALS_MS,
 10 * 1000));
+
         
connectorConfig.setConsumeFromWhere(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE,
 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name()));
         if (connectorConfig.getConsumeFromWhere() == 
ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
-            
connectorConfig.setConsumeFromTimestamp(Long.valueOf(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP)));
+            
connectorConfig.setConsumeFromTimestamp(Long.parseLong(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP)));
         }
         log.info("ReplicatorSourceTask connectorConfig : " + connectorConfig);
 
@@ -731,8 +715,7 @@ public class ReplicatorSourceTask extends SourceTask {
             buildConsumer();
             log.info("buildConsumer finished.");
             // init limiter
-            int limit = connectorConfig.getSyncTps();
-            tpsLimit = limit;
+            tpsLimit = connectorConfig.getSyncTps();
             log.info("RateLimiter init finished.");
             // subscribe topic & start consumer
             subscribeTopicAndStartConsumer();
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
index ab789c69..2713a165 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java
@@ -92,6 +92,7 @@ public class ReplicatorConnectorConfig {
     //
     private int heartbeatIntervalMs = 1 * 1000;
     private int checkpointIntervalMs = 10 * 1000;
+    private long commitOffsetIntervalMs = 10 * 1000;
     private String heartbeatTopic;
     private String checkpointTopic;
     public final static String DEFAULT_HEARTBEAT_TOPIC = 
"replicator_heartbeat";
@@ -160,6 +161,7 @@ public class ReplicatorConnectorConfig {
     public final static String DIVIDED_DLQ_QUEUES = "divided.dlqqueues";
     public final static String SYNC_TPS = "sync.tps";
     public final static String MAX_TASK = "max.task";
+    public final static String COMMIT_OFFSET_INTERVALS_MS = 
"commit.offset.interval.ms";
 
     public String getTaskId() {
         return taskId;
@@ -627,6 +629,14 @@ public class ReplicatorConnectorConfig {
         this.consumeFromWhere = consumeFromWhere;
     }
 
+    public long getCommitOffsetIntervalMs() {
+        return commitOffsetIntervalMs;
+    }
+
+    public void setCommitOffsetIntervalMs(long commitOffsetIntervalMs) {
+        this.commitOffsetIntervalMs = commitOffsetIntervalMs;
+    }
+
     @Override
     public String toString() {
         return "ReplicatorConnectorConfig{" +
@@ -668,6 +678,7 @@ public class ReplicatorConnectorConfig {
                 ", syncTps=" + syncTps +
                 ", heartbeatIntervalMs=" + heartbeatIntervalMs +
                 ", checkpointIntervalMs=" + checkpointIntervalMs +
+                ", commitOffsetIntervalMs=" + commitOffsetIntervalMs +
                 ", heartbeatTopic='" + heartbeatTopic + '\'' +
                 ", checkpointTopic='" + checkpointTopic + '\'' +
                 ", eachQueueBufferSize=" + eachQueueBufferSize +

Reply via email to