This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 598de5dbacd6e3c4c643f07bc64e0903447fd7b3 Author: jonnxu <[email protected]> AuthorDate: Fri Jul 26 21:30:56 2019 +0800 [ISSUE #341] Add wakeup before kafka consumer close to wakeup consumer poll (#342) --- .../org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java index d4b39e0..1f7ed00 100644 --- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java @@ -122,6 +122,7 @@ public class KafkaSourceTask extends SourceTask { log.info("source task stop enter"); try { commitOffset(currentTPList, true); + consumer.wakeup(); // wakeup poll in other thread consumer.close(); } catch (Exception e) { log.warn("{} consumer {} close exception {}", this, consumer, e);
