This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 6bbb8d5  [Kinesis] Fix kinesis sink connector does not ack messages. 
(#10769)
6bbb8d5 is described below

commit 6bbb8d5c34d59a3a2cdd88e3e091de37b60d274a
Author: Zike Yang <[email protected]>
AuthorDate: Wed Jun 2 11:52:31 2021 +0800

    [Kinesis] Fix kinesis sink connector does not ack messages. (#10769)
    
    ### Motivation
    
    Currently, when the kinesis sink connector sends the message successfully, 
it will not ack the message.
    
    ### Modifications
    
    * Ack messages after the sink connector send messages successfully.
    
    (cherry picked from commit b458b4afd84dd4188a4e6bda1ff94f208a82355e)
---
 .../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java  | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index e3f5285..adc7c3c 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -238,6 +238,7 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
             }
             kinesisSink.previousPublishFailed = FALSE;
+            this.resultContext.ack();
             recycle();
         }
 

Reply via email to