[ 
https://issues.apache.org/jira/browse/BEAM-12459?focusedWorklogId=608691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-608691
 ]

ASF GitHub Bot logged work on BEAM-12459:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Jun/21 19:32
            Start Date: 08/Jun/21 19:32
    Worklog Time Spent: 10m 
      Work Description: boyuanzz commented on a change in pull request #14968:
URL: https://github.com/apache/beam/pull/14968#discussion_r647736613



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
##########
@@ -955,11 +961,15 @@ public ProcessContinuation process(
         return stop();
       }
 
-      if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(newResults.getWatermark())) 
{
+      if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(computedWatermark)) {
         LOG.info("{} - will stop polling, reached max timestamp.", 
c.element());
         return stop();
       }
 
+      if (computedWatermark != null) {

Review comment:
       We want to set the `watermark` for `watermarkEstimator` even when 
`stop()` is returned, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 608691)
    Time Spent: 0.5h  (was: 20m)

> Watch does not properly advance the watermark by default
> --------------------------------------------------------
>
>                 Key: BEAM-12459
>                 URL: https://issues.apache.org/jira/browse/BEAM-12459
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Daniel Collins
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Assigning to Luke who has made substantial changes to this class most 
> recently.
>  
> It appears after investigation that when using Watch in the default 
> configuration, the global watermark is not advanced properly, even though 
> Watch documentation claims it should be 
> ([https://github.com/apache/beam/blob/8922c1cf23c093262af9e4570d69947a9a749506/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L118]).
>  The below is example code using Watch that will not advance the watermark, 
> but should:
> ```
>  Watch.growthOf(
>  new PollFn<Integer, Integer>() {
>      @Override
>      public PollResult<Partition> apply(TopicPath element, Context c)
> {         return PollResult.incomplete(Instant.now(), List.of(0));     }
> })
>  .withPollInterval(...)
>  .withTerminationPerInput(Watch.Growth.never());
>  ```
> I've been advised that changing the return statement to `return 
> PollResult.incomplete(Instant.now(), 
> List.of(0)).withWatermark(Instant.now());` will resolve this issue, but the 
> `withWatermark` function is commented as "By default, the watermark for a 
> particular input is computed from a poll result as "earliest timestamp of new 
> elements in this poll result". It can also be set explicitly via \{@link 
> Growth.PollResult#withWatermark} if the \{@link Growth.PollFn} can provide a 
> more optimistic estimate.". The goal is not to provide a more optimistic 
> estimate, but to allow any advancement at all. If withWatermark is needed to 
> close windows, this function should be required (or at least more prominent 
> in all example code).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to