taegeonum commented on a change in pull request #151: [NEMO-267] Consider
watermark holds in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/151#discussion_r231362660
##########
File path:
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
##########
@@ -99,76 +106,148 @@ public void test() {
final Instant ts6 = new Instant(1800);
final Instant ts7 = new Instant(1900);
final Watermark watermark3 = new Watermark(2100);
+ final Instant ts8 = new Instant(2200);
+ final Instant ts9 = new Instant(2300);
+ final Watermark watermark4 = new Watermark(3000);
+
+
+ List<IntervalWindow> sortedWindows = new
ArrayList<>(slidingWindows.assignWindows(ts1));
+ Collections.sort(sortedWindows, new Comparator<IntervalWindow>() {
+ @Override
+ public int compare(IntervalWindow o1, IntervalWindow o2) {
+ return o1.maxTimestamp().compareTo(o2.maxTimestamp());
+ }
+ });
+
+ // [0---1000)
+ final IntervalWindow window0 = sortedWindows.get(0);
+ // [0---2000)
+ final IntervalWindow window1 = sortedWindows.get(1);
+
+ sortedWindows.clear();
+ sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+ Collections.sort(sortedWindows, new Comparator<IntervalWindow>() {
+ @Override
+ public int compare(IntervalWindow o1, IntervalWindow o2) {
+ return o1.maxTimestamp().compareTo(o2.maxTimestamp());
+ }
+ });
+
+ // [1000--3000)
+ final IntervalWindow window2 = sortedWindows.get(1);
final Transform.Context context = mock(Transform.Context.class);
final TestOutputCollector<KV<String, Iterable<String>>> oc = new
TestOutputCollector();
doFnTransform.prepare(context, oc);
doFnTransform.onData(WindowedValue.of(
- KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1),
PaneInfo.NO_FIRING));
+ KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1),
PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
- KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2),
PaneInfo.NO_FIRING));
+ KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2),
PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
- KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3),
PaneInfo.NO_FIRING));
+ KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3),
PaneInfo.NO_FIRING));
doFnTransform.onWatermark(watermark);
+ // output
+ // 1: ["hello", "world"]
+ // 2: ["hello"]
Collections.sort(oc.outputs, (o1, o2) ->
o1.getValue().getKey().compareTo(o2.getValue().getKey()));
// windowed result for key 1
- assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)),
oc.outputs.get(0).getWindows());
+ assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
checkOutput(KV.of("1", Arrays.asList("hello", "world")),
oc.outputs.get(0).getValue());
// windowed result for key 2
- assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)),
oc.outputs.get(1).getWindows());
+ assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
checkOutput(KV.of("2", Arrays.asList("hello")),
oc.outputs.get(1).getValue());
+ assertEquals(2, oc.outputs.size());
+ assertEquals(1, oc.watermarks.size());
+
// check output watermark
- assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+ assertEquals(1000,
oc.watermarks.get(0).getTimestamp());
oc.outputs.clear();
oc.watermarks.clear();
doFnTransform.onData(WindowedValue.of(
- KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4),
PaneInfo.NO_FIRING));
+ KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4),
PaneInfo.NO_FIRING));
// do not emit anything
doFnTransform.onWatermark(watermark2);
assertEquals(0, oc.outputs.size());
assertEquals(0, oc.watermarks.size());
doFnTransform.onData(WindowedValue.of(
- KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5),
PaneInfo.NO_FIRING));
+ KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5),
PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
- KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6),
PaneInfo.NO_FIRING));
+ KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6),
PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
- KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7),
PaneInfo.NO_FIRING));
+ KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7),
PaneInfo.NO_FIRING));
- // emit windowed value
+ // emit window1
doFnTransform.onWatermark(watermark3);
+ // output
+ // 1: ["hello", "world", "a"]
+ // 2: ["hello"]
+ // 3: ["a", "a", "b"]
Collections.sort(oc.outputs, (o1, o2) ->
o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
// windowed result for key 1
- assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)),
oc.outputs.get(0).getWindows());
- checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue());
+ assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
Review comment:
I will add TODO for the test
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services