This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 686be0b  SAMZA-2329: Chain watermark future to the result future in 
onMessageAsync
     new 96097a4  Merge pull request #1167 from mynameborat/watermark-fix
686be0b is described below

commit 686be0bdb56535985847a03b1c8b6347df89ed5a
Author: mynameborat <[email protected]>
AuthorDate: Mon Sep 23 15:39:24 2019 -0700

    SAMZA-2329: Chain watermark future to the result future in onMessageAsync
---
 .../org/apache/samza/operators/impl/OperatorImpl.java    | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 3d32be3..528acc6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -191,14 +191,14 @@ public abstract class OperatorImpl<M, RM> {
             .toArray(CompletableFuture[]::new));
       });
 
-    return result.thenAccept(x -> {
-        WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
-        if (watermarkFn != null) {
-          // check whether there is new watermark emitted from the user 
function
-          Long outputWm = watermarkFn.getOutputWatermark();
-          propagateWatermark(outputWm, collector, coordinator);
-        }
-      });
+    WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
+    if (watermarkFn != null) {
+      // check whether there is new watermark emitted from the user function
+      Long outputWm = watermarkFn.getOutputWatermark();
+      return result.thenCompose(ignored -> propagateWatermark(outputWm, 
collector, coordinator));
+    }
+
+    return result;
   }
 
   /**

Reply via email to