[ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356919&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356919
 ]

ASF GitHub Bot logged work on BEAM-8550:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Dec/19 09:22
            Start Date: 10/Dec/19 09:22
    Worklog Time Spent: 10m 
      Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355922803
 
 

 ##########
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##########
 @@ -2363,6 +2367,136 @@ public int hashCode() {
         return getClass().hashCode();
       }
     }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testRequiresTimeSortedInput() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class,
+      UsesTestStream.class
+    })
+    public void testRequiresTimeSortedInputWithTestStream() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
+      for (Long stamp : eventStamps) {
+        stream = stream.addElements(stamp);
+      }
+      testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class,
+      UsesTestStream.class
+    })
+    public void testRequiresTimeSortedInputWithLateData() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      TestStream.Builder<Long> input = TestStream.create(VarLongCoder.of());
+      for (Long stamp : eventStamps) {
+        input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+        if (stamp == 100) {
+          // advance watermark when we have 100 remaining elements
+          // all the rest are going to be late elements
+          input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+        }
+      }
+      testTimeSortedInput(
+          numElements - 100,
+          numElements - 1,
+          pipeline.apply(input.advanceWatermarkToInfinity()),
+          // Only direct runner is guaranteed to correctly drop elements after 
watermark  coming
+          // from different (sub-)partitions of TestStream. Cannot reference 
DirectRunner by
+          // class to avoid cyclic dependencies.
+          
pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner"));
 
 Review comment:
   I'm afraid, that code reuse between the two test modules would not be 
straightforward and would introduce much more counter intuitive code.
   The only option would be to split the test into two parts:
    a) validate that the logic of the late data dropping is correct (that is, 
test the annotation itself) (this would have to be placed outside validates 
runner suites)
    b) validate that runners obey the annotation (validates runner)
   There would be some code duplication probably, but it might be worth it. I 
will look into that.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 356919)
    Time Spent: 6h 20m  (was: 6h 10m)

> @RequiresTimeSortedInput DoFn annotation
> ----------------------------------------
>
>                 Key: BEAM-8550
>                 URL: https://issues.apache.org/jira/browse/BEAM-8550
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model, sdk-java-core
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: Major
>          Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to