This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 818cc657 [ISSUE #348]fix WorkerDirectTask can not send record to sink
(#349)
818cc657 is described below
commit 818cc65779b76959f4b94d493988a3724bfd7927
Author: Slideee <[email protected]>
AuthorDate: Mon Oct 17 09:58:25 2022 +0800
[ISSUE #348]fix WorkerDirectTask can not send record to sink (#349)
---
.../rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index c6d3ff1b..7bba2f97 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -173,7 +173,7 @@ public class WorkerDirectTask extends WorkerSourceTask {
updateCommittableOffsets();
try {
Collection<ConnectRecord> toSendEntries = sourceTask.poll();
- if (toSendEntries.isEmpty()) {
+ if (!toSendEntries.isEmpty()) {
sendRecord(toSendEntries);
}
} catch (Exception e) {