[flink] improve readability of processElement function
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63a7c3d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63a7c3d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63a7c3d0 Branch: refs/heads/master Commit: 63a7c3d0cb51caf65dc82141671cf28d47c2be39 Parents: 033b924 Author: Maximilian Michels <[email protected]> Authored: Wed Mar 30 12:02:01 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Wed Mar 30 12:05:04 2016 +0200 ---------------------------------------------------------------------- .../streaming/FlinkGroupAlsoByWindowWrapper.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63a7c3d0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 751d44c..3dc5a79 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -258,10 +258,18 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> @Override public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception { - ArrayList<WindowedValue<VIN>> elements = new ArrayList<>(); - elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(), - element.getValue().getWindows(), element.getValue().getPane())); - processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements)); + final WindowedValue<KV<K, VIN>> windowedValue = element.getValue(); + final KV<K, VIN> kv = windowedValue.getValue(); + + final WindowedValue<VIN> updatedWindowedValue = WindowedValue.of(kv.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPane()); + + processKeyedWorkItem( + KeyedWorkItems.elementsWorkItem( + kv.getKey(), + Collections.singletonList(updatedWindowedValue))); } @Override
