[ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355796
 ]

ASF GitHub Bot logged work on BEAM-8550:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Dec/19 10:24
            Start Date: 08/Dec/19 10:24
    Worklog Time Spent: 10m 
      Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355174975
 
 

 ##########
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##########
 @@ -85,45 +123,83 @@ public void startBundle() {
     doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+    doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue<InputT> input) {
 
     // StatefulDoFnRunner always observes windows, so we need to explode
     for (WindowedValue<InputT> value : input.explodeWindows()) {
-
       BoundedWindow window = value.getWindows().iterator().next();
-
       if (isLate(window)) {
         // The element is too late for this window.
-        droppedDueToLateness.inc();
-        WindowTracing.debug(
-            "StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-                + "since too far behind inputWatermark:{}",
-            input.getTimestamp(),
-            window,
-            cleanupTimer.currentInputWatermarkTime());
+        reportDroppedElement(value, window);
+      } else if (requiresTimeSortedInput) {
+        processElementOrdered(window, value);
       } else {
-        cleanupTimer.setForWindow(value.getValue(), window);
-        doFnRunner.processElement(value);
+        processElementUnordered(window, value);
+      }
+    }
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue<InputT> value) {
+    cleanupTimer.setForWindow(value.getValue(), window);
+    doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue<InputT> value) {
+
+    StateInternals stateInternals = stepContext.stateInternals();
+    TimerInternals timerInternals = stepContext.timerInternals();
+
+    if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+      StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+      BagState<WindowedValue<InputT>> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+      ValueState<Instant> minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+      sortBuffer.add(value);
+      Instant minStamp =
+          MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+      if (value.getTimestamp().isBefore(minStamp)) {
+        minStamp = value.getTimestamp();
+        minStampState.write(minStamp);
+        setupFlushTimerAndWatermarkHold(namespace, minStamp);
       }
+    } else {
+      reportDroppedElement(value, window);
 
 Review comment:
   Such internal detail needs to be more explicitly mentioned in javadoc for 
the annotation.
   
   What if I am processing stream and my out of order spread can be quite 
large. Does it make sense for me to use this annotation or better not. Some 
recommendations / hints for users would be great.
   
   This makes me hesitant to use this annotation for stream processing (not 
that I have use case for it right now) unless I can be sure about timing of my 
data stream and watermark move.
   I wonder if it would make sense to give some tradeoffs between latency, 
buffer size, lateness and introduce a possibility to hold watermark back for 
some delta, smaller than max allowed lateness. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 355796)
    Time Spent: 3.5h  (was: 3h 20m)

> @RequiresTimeSortedInput DoFn annotation
> ----------------------------------------
>
>                 Key: BEAM-8550
>                 URL: https://issues.apache.org/jira/browse/BEAM-8550
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model, sdk-java-core
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to