This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 59ac9fd7b7fd11ad7943b5ef30a3f95b20acadd7 Author: 彭小漪 <[email protected]> AuthorDate: Thu Dec 2 16:07:30 2021 +0800 [rocketmq-connect-kafka]: Completion method KafkaSourceTask#pause(), KafkaSourceTask#resume(). (#854) [rocketmq-connect-kafka]: Normalized code style --- .../connect/kafka/connector/KafkaSourceConnector.java | 2 +- .../rocketmq/connect/kafka/connector/KafkaSourceTask.java | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java index 567a8e9..680df6e 100644 --- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java @@ -42,7 +42,7 @@ public class KafkaSourceConnector extends SourceConnector { log.info("KafkaSourceConnector verifyAndSetConfig enter"); for (String key : config.keySet()) { - log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key)); + log.info("connector verifyAndSetConfig: key: {}, value: {}", key, config.getString(key)); } for (String requestKey : ConfigDefine.REQUEST_CONFIG) { diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java index 6122b0e..f077ac0 100644 --- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java @@ -132,11 +132,13 @@ public class KafkaSourceTask extends SourceTask { @Override public void pause() { log.info("source task pause ..."); + consumer.pause(currentTPList); } @Override public void resume() { log.info("source task resume ..."); + consumer.resume(currentTPList); } public String toString() { @@ -179,7 +181,7 @@ public class KafkaSourceTask extends SourceTask { log.info("commitOffset {} topic partition {}", KafkaSourceTask.this, tpList); List<ByteBuffer> topic_partition_list = new ArrayList<>(); for (TopicPartition tp : tpList) { - topic_partition_list.add(ByteBuffer.wrap((tp.topic()+"-"+tp.partition()).getBytes())); + topic_partition_list.add(ByteBuffer.wrap((tp.topic() + "-" + tp.partition()).getBytes())); } Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>(); @@ -198,7 +200,7 @@ public class KafkaSourceTask extends SourceTask { } commitOffsets.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> - log.info("commitOffset {}, TopicPartition:{} offset:{}", KafkaSourceTask.this, entry.getKey(), entry.getValue())); + log.info("commitOffset {}, TopicPartition: {} offset: {}", KafkaSourceTask.this, entry.getKey(), entry.getValue())); if (!commitOffsets.isEmpty()) { if (isClose) { consumer.commitSync(commitOffsets); @@ -213,9 +215,9 @@ public class KafkaSourceTask extends SourceTask { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if (e != null) { - log.warn("commit async excepiton {}", e); + log.warn("commit async excepiton", e); map.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> { - log.warn("commit exception, TopicPartition:{} offset: {}", entry.getKey().toString(), entry.getValue().offset()); + log.warn("commit exception, TopicPartition: {} offset: {}", entry.getKey().toString(), entry.getValue().offset()); }); return; } @@ -229,9 +231,9 @@ public class KafkaSourceTask extends SourceTask { currentTPList.clear(); for (TopicPartition tp : partitions) { + log.info("onPartitionsAssigned TopicPartition {}", tp); currentTPList.add(tp); } - currentTPList.stream().forEach((TopicPartition tp)-> log.info("onPartitionsAssigned TopicPartition {}", tp)); } @Override
