Repository: incubator-beam Updated Branches: refs/heads/master f20bf8afd -> 135cb733f
Materialize PCollection/RDD as windowed values with the appropriate windows. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ca3b30d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ca3b30d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ca3b30d Branch: refs/heads/master Commit: 1ca3b30dd5679e75ce9e35dc08cc0012fb899186 Parents: d852c5b Author: Sela <[email protected]> Authored: Thu Apr 14 22:05:14 2016 +0300 Committer: Sela <[email protected]> Committed: Tue Apr 19 22:06:15 2016 +0300 ---------------------------------------------------------------------- .../spark/translation/EvaluationContext.java | 62 ++++++++++---------- 1 file changed, 30 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ca3b30d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 78a62aa..531a6ce 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -37,6 +37,9 @@ import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -76,12 +79,13 @@ public class EvaluationContext implements EvaluationResult { */ private class RDDHolder<T> { - private Iterable<T> values; + private Iterable<WindowedValue<T>> windowedValues; private Coder<T> coder; private JavaRDDLike<WindowedValue<T>, ?> rdd; RDDHolder(Iterable<T> values, Coder<T> coder) { - this.values = values; + this.windowedValues = + Iterables.transform(values, WindowingHelpers.<T>windowValueFunction()); this.coder = coder; } @@ -91,14 +95,6 @@ public class EvaluationContext implements EvaluationResult { JavaRDDLike<WindowedValue<T>, ?> getRDD() { if (rdd == null) { - Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values, - new Function<T, WindowedValue<T>>() { - @Override - public WindowedValue<T> apply(T t) { - // TODO: this is wrong if T is a TimestampedValue - return WindowedValue.valueInEmptyWindows(t); - } - }); WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = WindowedValue.getValueOnlyCoder(coder); rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) @@ -107,29 +103,31 @@ public class EvaluationContext implements EvaluationResult { return rdd; } - Iterable<T> getValues(PCollection<T> pcollection) { - if (values == null) { - coder = pcollection.getCoder(); - JavaRDDLike<byte[], ?> bytesRDD = rdd.map(WindowingHelpers.<T>unwindowFunction()) - .map(CoderHelpers.toByteFunction(coder)); + Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) { + if (windowedValues == null) { + WindowFn<?, ?> windowFn = + pcollection.getWindowingStrategy().getWindowFn(); + Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder(); + final WindowedValue.WindowedValueCoder<T> windowedValueCoder; + if (windowFn instanceof GlobalWindows) { + windowedValueCoder = + WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder()); + } else { + windowedValueCoder = + WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder); + } + JavaRDDLike<byte[], ?> bytesRDD = + rdd.map(CoderHelpers.toByteFunction(windowedValueCoder)); List<byte[]> clientBytes = bytesRDD.collect(); - values = Iterables.transform(clientBytes, new Function<byte[], T>() { + windowedValues = Iterables.transform(clientBytes, + new Function<byte[], WindowedValue<T>>() { @Override - public T apply(byte[] bytes) { - return CoderHelpers.fromByteArray(bytes, coder); + public WindowedValue<T> apply(byte[] bytes) { + return CoderHelpers.fromByteArray(bytes, windowedValueCoder); } }); } - return values; - } - - Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) { - return Iterables.transform(get(pcollection), new Function<T, WindowedValue<T>>() { - @Override - public WindowedValue<T> apply(T t) { - return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place? - } - }); + return windowedValues; } } @@ -264,15 +262,15 @@ public class EvaluationContext implements EvaluationResult { @Override public <T> Iterable<T> get(PCollection<T> pcollection) { - @SuppressWarnings("unchecked") - RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); - return rddHolder.getValues(pcollection); + @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); + Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection); + return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction()); } <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) { @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); - return rddHolder.getWindowedValues(pcollection); + return rddHolder.getValues(pcollection); } @Override
