Repository: beam Updated Branches: refs/heads/master 1e49ee8f2 -> e0189f352
[BEAM-1395] Remove chunking. fixup! formatting. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71197ae6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71197ae6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71197ae6 Branch: refs/heads/master Commit: 71197ae61637d9f6317bd5abd7194d67044fee9d Parents: 1e49ee8 Author: Sela <[email protected]> Authored: Mon Feb 6 10:09:35 2017 +0200 Committer: Sela <[email protected]> Committed: Mon Feb 6 21:38:51 2017 +0200 ---------------------------------------------------------------------- .../translation/SparkGroupAlsoByWindowFn.java | 53 +++++++------------- 1 file changed, 18 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/71197ae6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java index 34eea65..9d84481 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.translation; -import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -86,7 +85,7 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow> public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call( WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>> windowedValue) throws Exception { K key = windowedValue.getValue().getKey(); - Iterable<WindowedValue<InputT>> inputs = windowedValue.getValue().getValue(); + Iterable<WindowedValue<InputT>> values = windowedValue.getValue().getValue(); //------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------// @@ -131,24 +130,8 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow> reduceFn, runtimeContext.getPipelineOptions()); - Iterable<List<WindowedValue<InputT>>> chunks = Iterables.partition(inputs, 1000); - for (Iterable<WindowedValue<InputT>> chunk : chunks) { - // Process the chunk of elements. - reduceFnRunner.processElements(chunk); - - // Then, since elements are sorted by their timestamp, advance the input watermark - // to the first element. - timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp()); - // Advance the processing times. - timerInternals.advanceProcessingTime(Instant.now()); - timerInternals.advanceSynchronizedProcessingTime(Instant.now()); - - // Fire all the eligible timers. - fireEligibleTimers(timerInternals, reduceFnRunner); - - // Leave the output watermark undefined. Since there's no late data in batch mode - // there's really no need to track it as we do for streaming. - } + // Process the grouped values. + reduceFnRunner.processElements(values); // Finish any pending windows by advancing the input watermark to infinity. timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -168,21 +151,21 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow> ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner) throws Exception { List<TimerInternals.TimerData> timers = new ArrayList<>(); while (true) { - TimerInternals.TimerData timer; - while ((timer = timerInternals.removeNextEventTimer()) != null) { - timers.add(timer); - } - while ((timer = timerInternals.removeNextProcessingTimer()) != null) { - timers.add(timer); - } - while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { - timers.add(timer); - } - if (timers.isEmpty()) { - break; - } - reduceFnRunner.onTimers(timers); - timers.clear(); + TimerInternals.TimerData timer; + while ((timer = timerInternals.removeNextEventTimer()) != null) { + timers.add(timer); + } + while ((timer = timerInternals.removeNextProcessingTimer()) != null) { + timers.add(timer); + } + while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { + timers.add(timer); + } + if (timers.isEmpty()) { + break; + } + reduceFnRunner.onTimers(timers); + timers.clear(); } }
