This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 451a26e0 [ISSUE #364] Fix worker direct task can not pause (#365)
451a26e0 is described below
commit 451a26e03294be46d81ddad7250402c272ef98bf
Author: Slideee <[email protected]>
AuthorDate: Thu Nov 3 14:06:59 2022 +0800
[ISSUE #364] Fix worker direct task can not pause (#365)
---
.../connect/runtime/connectorwrapper/WorkerDirectTask.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 7bba2f97..59641fd2 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -171,6 +171,20 @@ public class WorkerDirectTask extends WorkerSourceTask {
protected void execute() {
while (isRunning()) {
updateCommittableOffsets();
+
+ if (shouldPause()) {
+ onPause();
+ try {
+ // wait unpause
+ if (awaitUnpause()) {
+ onResume();
+ }
+ continue;
+ } catch (InterruptedException e) {
+ // do exception
+ }
+ }
+
try {
Collection<ConnectRecord> toSendEntries = sourceTask.poll();
if (!toSendEntries.isEmpty()) {