Moves PerKeyCombineFnRunners to Flink runner Flink is its only user. This removes the only remaining mentions of OldDoFn in the SDK that are not OldDoFn itself.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e382c401 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e382c401 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e382c401 Branch: refs/heads/master Commit: e382c40187754ad4f3c20565675cb3f131528070 Parents: 2b26ec8 Author: Eugene Kirpichov <[email protected]> Authored: Thu Jan 12 13:17:11 2017 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Jan 13 14:34:23 2017 -0800 ---------------------------------------------------------------------- .../runners/core/PerKeyCombineFnRunner.java | 25 -- .../runners/core/PerKeyCombineFnRunners.java | 262 ------------------- .../runners/flink/PerKeyCombineFnRunners.java | 239 +++++++++++++++++ .../FlinkMergingNonShuffleReduceFunction.java | 2 +- .../FlinkMergingPartialReduceFunction.java | 2 +- .../functions/FlinkMergingReduceFunction.java | 2 +- .../functions/FlinkPartialReduceFunction.java | 2 +- .../functions/FlinkReduceFunction.java | 2 +- .../beam/sdk/util/CombineContextFactory.java | 18 -- 9 files changed, 244 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java index a927ecd..4550273 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java @@ -75,31 +75,6 @@ public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Seria */ OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c); - /** - * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and extract output - * in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c); - ///////////////////////////////////////////////////////////////////////////// /** http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java deleted file mode 100644 index 34d711b..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import com.google.common.collect.Iterables; -import java.util.Collection; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.CombineContextFactory; -import org.apache.beam.sdk.util.SideInputReader; - -/** - * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations - * for different keyed combine functions. - */ -public class PerKeyCombineFnRunners { - /** - * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}. - */ - public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> - create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) { - if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { - return new KeyedCombineFnWithContextRunner<>( - (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn); - } else if (perKeyCombineFn instanceof KeyedCombineFn) { - return new KeyedCombineFnRunner<>( - (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn); - } else { - throw new IllegalStateException( - String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass())); - } - } - - /** - * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}. - * - * <p>It forwards functions calls to the {@link KeyedCombineFn}. - */ - private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT> - implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { - private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; - - private KeyedCombineFnRunner( - KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; - } - - @Override - public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() { - return keyedCombineFn; - } - - @Override - public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.createAccumulator(key); - } - - @Override - public AccumT addInput( - K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.addInput(key, accumulator, input); - } - - @Override - public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.mergeAccumulators(key, accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.extractOutput(key, accumulator); - } - - @Override - public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.compact(key, accumulator); - } - - @Override - public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.apply(key, inputs); - } - - @Override - public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) { - AccumT accum = keyedCombineFn.createAccumulator(key); - for (InputT input : inputs) { - accum = keyedCombineFn.addInput(key, accum, input); - } - return accum; - } - - @Override - public String toString() { - return keyedCombineFn.toString(); - } - - @Override - public AccumT createAccumulator(K key, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.createAccumulator(key); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.addInput(key, accumulator, input); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.mergeAccumulators(key, accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.extractOutput(key, accumulator); - } - - @Override - public AccumT compact(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.compact(key, accumulator); - } - } - - /** - * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}. - * - * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}. - */ - private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT> - implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { - private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext; - - private KeyedCombineFnWithContextRunner( - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) { - this.keyedCombineFnWithContext = keyedCombineFnWithContext; - } - - @Override - public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() { - return keyedCombineFnWithContext; - } - - @Override - public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.createAccumulator(key, - CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public AccumT addInput( - K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.addInput(key, accumulator, value, - CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.mergeAccumulators( - key, accumulators, CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.extractOutput(key, accumulator, - CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.compact(key, accumulator, - CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.apply(key, inputs, - CombineContextFactory.createFromProcessContext(c)); - } - - @Override - public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) { - CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c); - AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext); - for (InputT input : inputs) { - accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext); - } - return accum; - } - - @Override - public String toString() { - return keyedCombineFnWithContext.toString(); - } - - @Override - public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader, - Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.createAccumulator(key, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.addInput(key, accumulator, input, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.mergeAccumulators(key, accumulators, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.extractOutput(key, accumulator, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT compact(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.compact(key, accumulator, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java new file mode 100644 index 0000000..f672578 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.CombineContextFactory; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations + * for different keyed combine functions. + */ +public class PerKeyCombineFnRunners { + /** + * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}. + */ + public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> + create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) { + if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { + return new KeyedCombineFnWithContextRunner<>( + (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn); + } else if (perKeyCombineFn instanceof KeyedCombineFn) { + return new KeyedCombineFnRunner<>( + (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn); + } else { + throw new IllegalStateException( + String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass())); + } + } + + /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */ + private static CombineWithContext.Context createFromProcessContext( + final OldDoFn<?, ?>.ProcessContext c) { + return new CombineWithContext.Context() { + @Override + public PipelineOptions getPipelineOptions() { + return c.getPipelineOptions(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return c.sideInput(view); + } + }; + } + + /** + * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}. + * + * <p>It forwards functions calls to the {@link KeyedCombineFn}. + */ + private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT> + implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { + private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; + + private KeyedCombineFnRunner( + KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { + this.keyedCombineFn = keyedCombineFn; + } + + @Override + public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() { + return keyedCombineFn; + } + + @Override + public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFn.createAccumulator(key); + } + + @Override + public AccumT addInput( + K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFn.addInput(key, accumulator, input); + } + + @Override + public AccumT mergeAccumulators( + K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFn.mergeAccumulators(key, accumulators); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFn.extractOutput(key, accumulator); + } + + @Override + public String toString() { + return keyedCombineFn.toString(); + } + + @Override + public AccumT createAccumulator(K key, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.createAccumulator(key); + } + + @Override + public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.addInput(key, accumulator, input); + } + + @Override + public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.mergeAccumulators(key, accumulators); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.extractOutput(key, accumulator); + } + + @Override + public AccumT compact(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFn.compact(key, accumulator); + } + } + + /** + * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}. + * + * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}. + */ + private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT> + implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { + private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext; + + private KeyedCombineFnWithContextRunner( + KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) { + this.keyedCombineFnWithContext = keyedCombineFnWithContext; + } + + @Override + public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() { + return keyedCombineFnWithContext; + } + + @Override + public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFnWithContext.createAccumulator(key, + createFromProcessContext(c)); + } + + @Override + public AccumT addInput( + K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFnWithContext.addInput(key, accumulator, value, + createFromProcessContext(c)); + } + + @Override + public AccumT mergeAccumulators( + K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFnWithContext.mergeAccumulators( + key, accumulators, createFromProcessContext(c)); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { + return keyedCombineFnWithContext.extractOutput(key, accumulator, + createFromProcessContext(c)); + } + + @Override + public String toString() { + return keyedCombineFnWithContext.toString(); + } + + @Override + public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.createAccumulator(key, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.addInput(key, accumulator, input, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.mergeAccumulators(key, accumulators, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.extractOutput(key, accumulator, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT compact(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return keyedCombineFnWithContext.compact(key, accumulator, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 041d0e8..6412e63 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java index fef7921..1456eea 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java index 59163e9..2f56fac 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java @@ -27,7 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index 8b6ec3a..627cfa6 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -25,7 +25,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index fb5c90c..de0d416 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -27,7 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java index 149d276..a983057 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.values.PCollectionView; @@ -49,23 +48,6 @@ public class CombineContextFactory { } /** - * Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. - */ - public static Context createFromProcessContext(final OldDoFn<?, ?>.ProcessContext c) { - return new Context() { - @Override - public PipelineOptions getPipelineOptions() { - return c.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return c.sideInput(view); - } - }; - } - - /** * Returns a {@code Combine.Context} that wraps a {@link StateContext}. */ public static Context createFromStateContext(final StateContext<?> c) {
