This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 818cc657 [ISSUE #348]fix WorkerDirectTask can not send record to sink 
(#349)
818cc657 is described below

commit 818cc65779b76959f4b94d493988a3724bfd7927
Author: Slideee <[email protected]>
AuthorDate: Mon Oct 17 09:58:25 2022 +0800

    [ISSUE #348]fix WorkerDirectTask can not send record to sink (#349)
---
 .../rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index c6d3ff1b..7bba2f97 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -173,7 +173,7 @@ public class WorkerDirectTask extends WorkerSourceTask {
             updateCommittableOffsets();
             try {
                 Collection<ConnectRecord> toSendEntries = sourceTask.poll();
-                if (toSendEntries.isEmpty()) {
+                if (!toSendEntries.isEmpty()) {
                     sendRecord(toSendEntries);
                 }
             } catch (Exception e) {

Reply via email to