[ 
https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380419&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380419
 ]

ASF GitHub Bot logged work on BEAM-2535:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Feb/20 03:33
            Start Date: 02/Feb/20 03:33
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] 
Support outputTimestamp and watermark holds in processing timers.
URL: https://github.com/apache/beam/pull/10627#discussion_r373816566
 
 

 ##########
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 ##########
 @@ -1092,24 +1107,39 @@ private void verifyAbsoluteTimeDomain() {
      * </ul>
      */
     private void setAndVerifyOutputTimestamp() {
-      // Output timestamp is currently not supported in processing time timers.
-      if (outputTimestamp != null && 
!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        throw new IllegalStateException("Cannot set outputTimestamp in 
processing time domain.");
-      }
+
       // Output timestamp is set to the delivery time if not initialized by an 
user.
-      if (outputTimestamp == null) {
+      if (outputTimestamp == null && 
TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
         outputTimestamp = target;
       }
-
-      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
-        checkArgument(
-            !target.isAfter(windowExpiry),
-            "Attempted to set event time timer that outputs for %s but that is"
-                + " after the expiration of window %s",
-            target,
-            windowExpiry);
+      // For processing timers
+      if (outputTimestamp == null) {
+        // For processing timers output timestamp will be:
+        // 1) timestamp of input element
+        // OR
+        // 2) output timestamp of firing timer.
+        outputTimestamp = elementInputTimestamp;
       }
+
+      checkArgument(
+          !outputTimestamp.isBefore(elementInputTimestamp),
+          "output timestamp %s should be after input message timestamp or 
output timestamp of firing timers %s",
+          outputTimestamp,
+          elementInputTimestamp);
 
 Review comment:
   check this before setting outputTimestamp above (and only if outputTimestamp 
!= null)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 380419)
    Time Spent: 21h  (was: 20h 50m)

> Allow explicit output time independent of firing specification for all timers
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-2535
>                 URL: https://issues.apache.org/jira/browse/BEAM-2535
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model, sdk-java-core
>            Reporter: Kenneth Knowles
>            Assignee: Shehzaad Nakhoda
>            Priority: Major
>          Time Spent: 21h
>  Remaining Estimate: 0h
>
> Today, we have insufficient control over the event time timestamp of elements 
> output from a timer callback.
> 1. For an event time timer, it is the timestamp of the timer itself.
>  2. For a processing time timer, it is the current input watermark at the 
> time of processing.
> But for both of these, we may want to reserve the right to output a 
> particular time, aka set a "watermark hold".
> A naive implementation of a {{TimerWithWatermarkHold}} would work for making 
> sure output is not droppable, but does not fully explain window expiration 
> and late data/timer dropping.
> In the natural interpretation of a timer as a feedback loop on a transform, 
> timers should be viewed as another channel of input, with a watermark, and 
> items on that channel _all need event time timestamps even if they are 
> delivered according to a different time domain_.
> I propose that the specification for when a timer should fire should be 
> separated (with nice defaults) from the specification of the event time of 
> resulting outputs. These timestamps will determine a side channel with a new 
> "timer watermark" that constrains the output watermark.
>  - We still need to fire event time timers according to the input watermark, 
> so that event time timers fire.
>  - Late data dropping and window expiration will be in terms of the minimum 
> of the input watermark and the timer watermark. In this way, whenever a timer 
> is set, the window is not going to be garbage collected.
>  - We will need to make sure we have a way to "wake up" a window once it is 
> expired; this may be as simple as exhausting the timer channel as soon as the 
> input watermark indicates expiration of a window
> This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It 
> seems reasonable to use timers as an implementation detail (e.g. in 
> runners-core utilities) without wanting any of this additional machinery. For 
> example, if there is no possibility of output from the timer callback.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to