This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc8b380  Fail the source record if the write fails (#3706)
bc8b380 is described below

commit bc8b380e584a4d91a80c7afb72d5525bf845176f
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Fri Mar 1 22:22:11 2019 -0800

    Fail the source record if the write fails (#3706)
---
 .../apache/pulsar/functions/sink/PulsarSink.java   | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8c32608..3f0e1a9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -67,9 +67,9 @@ public class PulsarSink<T> implements Sink<T> {
 
     private interface PulsarSinkProcessor<T> {
 
-        TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception;
+        TypedMessageBuilder<T> newMessage(Record<T> record);
 
-        void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) 
throws Exception;
+        void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record);
 
         void close() throws Exception;
     }
@@ -136,11 +136,14 @@ public class PulsarSink<T> implements Sink<T> {
             }
         }
 
-        public Function<Throwable, Void> getPublishErrorHandler(Record<T> 
record) {
+        public Function<Throwable, Void> getPublishErrorHandler(Record<T> 
record, boolean failSource) {
 
             return throwable -> {
                 SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
                 Record<T> srcRecord = sinkRecord.getSourceRecord();
+                if (failSource) {
+                    srcRecord.fail();
+                }
 
                 String topic = 
record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic());
 
@@ -177,10 +180,10 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) throws Exception {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) {
             msg.sendAsync().thenAccept(messageId -> {
                 //no op
-            }).exceptionally(getPublishErrorHandler(record));
+            }).exceptionally(getPublishErrorHandler(record, false));
         }
     }
 
@@ -191,10 +194,10 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) throws Exception {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) {
             msg.sendAsync()
                     .thenAccept(messageId -> record.ack())
-                    .exceptionally(getPublishErrorHandler(record));
+                    .exceptionally(getPublishErrorHandler(record, true));
         }
     }
 
@@ -206,7 +209,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public TypedMessageBuilder<T> newMessage(Record<T> record) throws 
Exception {
+        public TypedMessageBuilder<T> newMessage(Record<T> record) {
             if (!record.getPartitionId().isPresent()) {
                 throw new RuntimeException("PartitionId needs to be specified 
for every record while in Effectively-once mode");
             }
@@ -219,8 +222,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record)
-                throws Exception {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) {
 
             if (!record.getRecordSequence().isPresent()) {
                 throw new RuntimeException("RecordSequence needs to be 
specified for every record while in Effectively-once mode");
@@ -230,7 +232,7 @@ public class PulsarSink<T> implements Sink<T> {
             msg.sequenceId(record.getRecordSequence().get());
             CompletableFuture<MessageId> future = msg.sendAsync();
 
-            future.thenAccept(messageId -> 
record.ack()).exceptionally(getPublishErrorHandler(record));
+            future.thenAccept(messageId -> 
record.ack()).exceptionally(getPublishErrorHandler(record, true));
             future.join();
         }
     }
@@ -268,7 +270,7 @@ public class PulsarSink<T> implements Sink<T> {
     }
 
     @Override
-    public void write(Record<T> record) throws Exception {
+    public void write(Record<T> record) {
         TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);
         if (record.getKey().isPresent()) {
             msg.key(record.getKey().get());

Reply via email to