[
https://issues.apache.org/jira/browse/BEAM-4643?focusedWorklogId=152951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152951
]
ASF GitHub Bot logged work on BEAM-4643:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Oct/18 22:46
Start Date: 09/Oct/18 22:46
Worklog Time Spent: 10m
Work Description: akedin closed pull request #5811: [BEAM-4643] Allow to
check early panes of a window
URL: https://github.com/apache/beam/pull/5811
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index cfcb16291ac..cedc594fd49 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -250,6 +250,15 @@ public int hashCode() {
*/
IterableAssert<T> inOnTimePane(BoundedWindow window);
+ /**
+ * Creates a new {@link IterableAssert} like this one, but with the
assertion restricted to only
+ * run on the provided window across all panes that were produced by the
arrival of early data.
+ *
+ * @return a new {@link IterableAssert} like this one but with the
assertion only applied to the
+ * specified window.
+ */
+ IterableAssert<T> inEarlyPane(BoundedWindow window);
+
/**
* Creates a new {@link IterableAssert} like this one, but with the
assertion restricted to only
* run on the provided window across all panes that were not produced by
the arrival of late
@@ -336,6 +345,15 @@ public int hashCode() {
*/
SingletonAssert<T> inOnTimePane(BoundedWindow window);
+ /**
+ * Creates a new {@link SingletonAssert} like this one, but with the
assertion restricted to
+ * only run on the provided window, running the checker only on early
panes for each key.
+ *
+ * @return a new {@link SingletonAssert} like this one but with the
assertion only applied to
+ * the specified window.
+ */
+ SingletonAssert<T> inEarlyPane(BoundedWindow window);
+
/**
* Asserts that the value in question is equal to the provided value,
according to {@link
* Object#equals}.
@@ -513,6 +531,11 @@ public PCollectionContentsAssert(
return withPane(window, PaneExtractors.onTimePane());
}
+ @Override
+ public PCollectionContentsAssert<T> inEarlyPane(BoundedWindow window) {
+ return withPane(window, PaneExtractors.earlyPanes());
+ }
+
@Override
public PCollectionContentsAssert<T> inCombinedNonLatePanes(BoundedWindow
window) {
return withPane(window, PaneExtractors.nonLatePanes());
@@ -700,6 +723,11 @@ public PCollectionSingletonIterableAssert(
return withPanes(window, PaneExtractors.onTimePane());
}
+ @Override
+ public PCollectionSingletonIterableAssert<T> inEarlyPane(BoundedWindow
window) {
+ return withPanes(window, PaneExtractors.earlyPanes());
+ }
+
@Override
public PCollectionSingletonIterableAssert<T>
inCombinedNonLatePanes(BoundedWindow window) {
return withPanes(window, PaneExtractors.nonLatePanes());
@@ -805,6 +833,11 @@ private PCollectionViewAssert(
return inPane(window, PaneExtractors.onTimePane());
}
+ @Override
+ public PCollectionViewAssert<ElemT, ViewT> inEarlyPane(BoundedWindow
window) {
+ return inPane(window, PaneExtractors.earlyPanes());
+ }
+
private PCollectionViewAssert<ElemT, ViewT> inPane(
BoundedWindow window,
SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>>
paneExtractor) {
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 2aab957d902..a448694d3d8 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -20,10 +20,12 @@
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
+import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -32,6 +34,7 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
@@ -46,8 +49,10 @@
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -328,4 +333,56 @@ public void
testAdvanceWatermarkEqualToPositiveInfinityThrows() {
thrown.expect(IllegalArgumentException.class);
stream.advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
+
+ @Test
+ @Category({NeedsRunner.class, UsesTestStream.class})
+ public void testEarlyPanesOfWindow() {
+ TestStream<Long> source =
+ TestStream.create(VarLongCoder.of())
+ .addElements(TimestampedValue.of(1L, new Instant(1000L)))
+ .advanceProcessingTime(Duration.standardMinutes(6)) // Fire early
pane
+ .addElements(TimestampedValue.of(2L, new Instant(2000L)))
+ .advanceProcessingTime(Duration.standardMinutes(6)) // Fire early
pane
+ .addElements(TimestampedValue.of(3L, new Instant(3000L)))
+ .advanceProcessingTime(Duration.standardMinutes(6)) // Fire early
pane
+ .advanceWatermarkToInfinity(); // Fire on-time pane
+
+ PCollection<KV<String, Long>> sum =
+ p.apply(source)
+ .apply(
+
Window.<Long>into(FixedWindows.of(Duration.standardMinutes(30)))
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(5))))
+ .accumulatingFiredPanes()
+ .withAllowedLateness(Duration.ZERO))
+ .apply(
+ MapElements.into(
+ TypeDescriptors.kvs(TypeDescriptors.strings(),
TypeDescriptors.longs()))
+ .via(v -> KV.of("key", v)))
+ .apply(Sum.longsPerKey());
+
+ IntervalWindow window =
+ new IntervalWindow(new Instant(0L), new
Instant(0L).plus(Duration.standardMinutes(30)));
+
+ PAssert.that(sum)
+ .inEarlyPane(window)
+ .satisfies(
+ input -> {
+ assertThat(StreamSupport.stream(input.spliterator(),
false).count(), is(3L));
+ return null;
+ })
+ .containsInAnyOrder(KV.of("key", 1L), KV.of("key", 3L), KV.of("key",
6L))
+ .inOnTimePane(window)
+ .satisfies(
+ input -> {
+ assertThat(StreamSupport.stream(input.spliterator(),
false).count(), is(1L));
+ return null;
+ })
+ .containsInAnyOrder(KV.of("key", 6L));
+
+ p.run().waitUntilFinish();
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 152951)
Time Spent: 2h 10m (was: 2h)
Remaining Estimate: 21h 50m (was: 22h)
> Allow to check early panes of a window
> --------------------------------------
>
> Key: BEAM-4643
> URL: https://issues.apache.org/jira/browse/BEAM-4643
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core, testing
> Affects Versions: 2.5.0
> Reporter: Logan HAUSPIE
> Assignee: Logan HAUSPIE
> Priority: Minor
> Original Estimate: 24h
> Time Spent: 2h 10m
> Remaining Estimate: 21h 50m
>
> What I would like to do is:
> {{PAssert.that(teamScores)}}
> {{ .inEarlyPanes(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) //
> Window triggered 2 times earlier (black, 1) + (black, 1)}}
> {{ .inOnTimePane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by
> reach the watermark (no additionnal data)}}
> {{ .inFinalPane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by
> receiving a late data (black, 8)}}
> NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20
> minutes
>
> The workaround I found is to filter the PCollection to keep only the EARLY
> elements with this method:
> {{public static <T> PCollection<T> filter(PCollection<T> values,
> PaneInfo.Timing timing) {}}
> {{ PCollection<T> filtered = values}}
> {{ .apply("Wrap into ValueInSingleWindow for filtering",}}
> {{ ParDo.of(}}
> {{ new DoFn<T, ValueInSingleWindow<T>>() {}}
> {{ @ProcessElement}}
> {{ public void processElement(ProcessContext c,
> BoundedWindow window) {}}
> {{
> c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(),
> window, c.pane()), c.timestamp());}}
> {{ }}}
> {{ }}}
> {{ )}}
> {{ )}}
> {{ .setCoder(}}
> {{ ValueInSingleWindow.Coder.of(}}
> {{ values.getCoder(),
> values.getWindowingStrategy().getWindowFn().windowCoder()}}
> {{ )}}
> {{ )}}
> {{ .apply(Filter.by(a -> a.getPane().getTiming() == timing))}}
> {{ .apply("Unwrap from ValueInSingleWindow for filtering",}}
> {{ ParDo.of(}}
> {{ new DoFn<ValueInSingleWindow<T>, T>() {}}
> {{ @ProcessElement}}
> {{ public void processElement(ProcessContext c, BoundedWindow
> window) {}}
> {{ c.outputWithTimestamp(c.element().getValue(),
> c.timestamp());}}
> {{ }}}
> {{ }}}
> {{ ));}}
> {{ return filtered;}}
> {{ }}}
>
> And then check the AllPanes of the window :
> {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}}
> {{ .inWindow(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}}
> {{PAssert.that(teamScores)}}
> {{ .inOnTimePane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 2))}}
> {{ .inFinalPane(intervalWindow(05, 20))}}
> {{ .containsInAnyOrder(KV.of("black", 10))}}{{;}}
>
> But it's a bit overkill.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)