This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 686be0b SAMZA-2329: Chain watermark future to the result future in
onMessageAsync
new 96097a4 Merge pull request #1167 from mynameborat/watermark-fix
686be0b is described below
commit 686be0bdb56535985847a03b1c8b6347df89ed5a
Author: mynameborat <[email protected]>
AuthorDate: Mon Sep 23 15:39:24 2019 -0700
SAMZA-2329: Chain watermark future to the result future in onMessageAsync
---
.../org/apache/samza/operators/impl/OperatorImpl.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 3d32be3..528acc6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -191,14 +191,14 @@ public abstract class OperatorImpl<M, RM> {
.toArray(CompletableFuture[]::new));
});
- return result.thenAccept(x -> {
- WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
- if (watermarkFn != null) {
- // check whether there is new watermark emitted from the user
function
- Long outputWm = watermarkFn.getOutputWatermark();
- propagateWatermark(outputWm, collector, coordinator);
- }
- });
+ WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
+ if (watermarkFn != null) {
+ // check whether there is new watermark emitted from the user function
+ Long outputWm = watermarkFn.getOutputWatermark();
+ return result.thenCompose(ignored -> propagateWatermark(outputWm,
collector, coordinator));
+ }
+
+ return result;
}
/**