This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c06bd3  SAMZA-2293: Propagate the watermark future to 
StreamOperatorTask correctly
4c06bd3 is described below

commit 4c06bd3a6b6ee1be2624270d5d1b015e83e0f6bf
Author: mynameborat <bharath.kumarasubraman...@gmail.com>
AuthorDate: Tue Aug 6 19:19:09 2019 -0700

    SAMZA-2293: Propagate the watermark future to StreamOperatorTask correctly
    
    Author: mynameborat <bharath.kumarasubraman...@gmail.com>
    
    Reviewers: xinyuiscool <xi...@linkedin.com>
    
    Closes #1129 from mynameborat/async-watermark-propagation-fix
---
 .../src/main/java/org/apache/samza/operators/impl/OperatorImpl.java | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 8d4ae21..3d32be3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -191,7 +191,7 @@ public abstract class OperatorImpl<M, RM> {
             .toArray(CompletableFuture[]::new));
       });
 
-    result.thenAccept(x -> {
+    return result.thenAccept(x -> {
         WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
         if (watermarkFn != null) {
           // check whether there is new watermark emitted from the user 
function
@@ -199,8 +199,6 @@ public abstract class OperatorImpl<M, RM> {
           propagateWatermark(outputWm, collector, coordinator);
         }
       });
-
-    return result;
   }
 
   /**
@@ -415,7 +413,7 @@ public abstract class OperatorImpl<M, RM> {
                 .toArray(CompletableFuture[]::new));
       }
 
-      watermarkFuture.thenCompose(res -> propagateWatermark(outputWm, 
collector, coordinator));
+      watermarkFuture = watermarkFuture.thenCompose(res -> 
propagateWatermark(outputWm, collector, coordinator));
     }
 
     return watermarkFuture;

Reply via email to