support OutputTimeFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6aaf0d9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6aaf0d9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6aaf0d9 Branch: refs/heads/gearpump-runner Commit: f6aaf0d9ecd6b67ad6f7eed413af3fae3b3bdf6f Parents: 3bf8263 Author: manuzhang <[email protected]> Authored: Sat Jan 14 21:41:40 2017 +0800 Committer: manuzhang <[email protected]> Committed: Sat Jan 14 21:41:40 2017 +0800 ---------------------------------------------------------------------- .../translators/GroupByKeyTranslator.java | 57 +++++++++++++++++--- .../translators/WindowBoundTranslator.java | 20 ++++--- 2 files changed, 64 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 4eaf755..e16a178 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -58,12 +59,16 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe JavaStream<WindowedValue<KV<K, V>>> inputStream = context.getInputStream(input); int parallelism = context.getPipelineOptions().getParallelism(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>) + input.getWindowingStrategy().getOutputTimeFn(); JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()), EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window") .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window") .map(new ValueToIterable<K, V>(), "map_value_to_iterable") - .reduce(new MergeValue<K, V>(), "merge_value"); + .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp") + .reduce(new Merge<K, V>(outputTimeFn), "merge") + .map(new Values<K, V>(), "values"); context.setOutputStream(context.getOutput(transform), outputStream); } @@ -141,15 +146,53 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe } } - private static class MergeValue<K, V> extends - ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> { + private static class KeyedByTimestamp<K, V> + extends MapFunction<WindowedValue<KV<K, Iterable<V>>>, + KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> { @Override - public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1, - WindowedValue<KV<K, Iterable<V>>> wv2) { - return WindowedValue.of(KV.of(wv1.getValue().getKey(), + public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply( + WindowedValue<KV<K, Iterable<V>>> wv) { + return KV.of(wv.getTimestamp(), wv); + } + } + + private static class Merge<K, V> extends + ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> { + + private final OutputTimeFn<? super BoundedWindow> outputTimeFn; + + Merge(OutputTimeFn<? super BoundedWindow> outputTimeFn) { + this.outputTimeFn = outputTimeFn; + } + + @Override + public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply( + KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv1, + KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv2) { + org.joda.time.Instant t1 = kv1.getKey(); + org.joda.time.Instant t2 = kv2.getKey(); + + WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue(); + WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue(); + + return KV.of(outputTimeFn.combine(t1, t2), + WindowedValue.of(KV.of(wv1.getValue().getKey(), Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())), - wv1.getTimestamp(), wv1.getWindows(), wv1.getPane()); + wv1.getTimestamp(), wv1.getWindows(), wv1.getPane())); + } + } + + private static class Values<K, V> extends + MapFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>, + WindowedValue<KV<K, Iterable<V>>>> { + + @Override + public WindowedValue<KV<K, Iterable<V>>> apply(KV<org.joda.time.Instant, + WindowedValue<KV<K, Iterable<V>>>> kv) { + org.joda.time.Instant timestamp = kv.getKey(); + WindowedValue<KV<K, Iterable<V>>> wv = kv.getValue(); + return WindowedValue.of(wv.getValue(), timestamp, wv.getWindows(), wv.getPane()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java index d3c50a5..9bf1936 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; @@ -53,9 +54,11 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou transform.getOutputStrategyInternal(input.getWindowingStrategy()); WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>) + outputStrategy.getOutputTimeFn(); JavaStream<WindowedValue<T>> outputStream = inputStream - .flatMap(new AssignWindows(windowFn), "assign_windows") + .flatMap(new AssignWindows(windowFn, outputTimeFn), "assign_windows") .process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp"); context.setOutputStream(context.getOutput(transform), outputStream); @@ -64,17 +67,21 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou private static class AssignWindows<T> extends FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { - private final WindowFn<T, BoundedWindow> fn; + private final WindowFn<T, BoundedWindow> windowFn; + private final OutputTimeFn<? super BoundedWindow> outputTimeFn; - AssignWindows(WindowFn<T, BoundedWindow> fn) { - this.fn = fn; + AssignWindows( + WindowFn<T, BoundedWindow> windowFn, + OutputTimeFn<? super BoundedWindow> outputTimeFn) { + this.windowFn = windowFn; + this.outputTimeFn = outputTimeFn; } @Override public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) { List<WindowedValue<T>> ret = new LinkedList<>(); try { - Collection<BoundedWindow> windows = fn.assignWindows(fn.new AssignContext() { + Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() { @Override public T element() { return value.getValue(); @@ -91,8 +98,9 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou } }); for (BoundedWindow window: windows) { + Instant timestamp = outputTimeFn.assignOutputTime(value.getTimestamp(), window); ret.add(WindowedValue.of( - value.getValue(), value.getTimestamp(), window, value.getPane())); + value.getValue(), timestamp, window, value.getPane())); } } catch (Exception e) { throw new RuntimeException(e);
