Repository: incubator-beam Updated Branches: refs/heads/master 6ba288d67 -> c199f0854
[BEAM-80] Enable combiner lifting for combine with contexts Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06b18fde Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06b18fde Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06b18fde Branch: refs/heads/master Commit: 06b18fde6ec2d092a9733e5bfcfa63de3cf00833 Parents: b2b5f42 Author: Pei He <[email protected]> Authored: Thu Mar 10 14:17:36 2016 -0800 Committer: Pei He <[email protected]> Committed: Thu Mar 10 14:17:36 2016 -0800 ---------------------------------------------------------------------- .../sdk/runners/DataflowPipelineTranslator.java | 3 +++ .../cloud/dataflow/sdk/transforms/Combine.java | 18 +++--------------- .../cloud/dataflow/sdk/util/PropertyNames.java | 1 + 3 files changed, 7 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index d0cc4e5..0feae95 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -952,6 +952,9 @@ public class DataflowPipelineTranslator { context.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(windowingStrategy))); + context.addInput( + PropertyNames.IS_MERGING_WINDOW_FN, + !windowingStrategy.getWindowFn().isNonMerging()); } }); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java index cc0347a..b8d20e3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java @@ -1690,21 +1690,9 @@ public class Combine { @Override public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>> input) { - if (fn instanceof RequiresContextInternal) { - return input - .apply(GroupByKey.<K, InputT>create(fewKeys)) - .apply(ParDo.of(new DoFn<KV<K, Iterable<InputT>>, KV<K, Iterable<InputT>>>() { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element()); - } - })) - .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs)); - } else { - return input - .apply(GroupByKey.<K, InputT>create(fewKeys)) - .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs)); - } + return input + .apply(GroupByKey.<K, InputT>create(fewKeys)) + .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06b18fde/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java index 5611fab..ec65189 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java @@ -65,6 +65,7 @@ public class PropertyNames { public static final String INPUTS = "inputs"; public static final String INPUT_CODER = "input_coder"; public static final String IS_GENERATED = "is_generated"; + public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn"; public static final String IS_PAIR_LIKE = "is_pair_like"; public static final String IS_STREAM_LIKE = "is_stream_like"; public static final String IS_WRAPPER = "is_wrapper";
