Moves PerKeyCombineFnRunners to Flink runner

Flink is its only user. This removes the only remaining
mentions of OldDoFn in the SDK that are not OldDoFn itself.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e382c401
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e382c401
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e382c401

Branch: refs/heads/master
Commit: e382c40187754ad4f3c20565675cb3f131528070
Parents: 2b26ec8
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Jan 12 13:17:11 2017 -0800
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Jan 13 14:34:23 2017 -0800

----------------------------------------------------------------------
 .../runners/core/PerKeyCombineFnRunner.java     |  25 --
 .../runners/core/PerKeyCombineFnRunners.java    | 262 -------------------
 .../runners/flink/PerKeyCombineFnRunners.java   | 239 +++++++++++++++++
 .../FlinkMergingNonShuffleReduceFunction.java   |   2 +-
 .../FlinkMergingPartialReduceFunction.java      |   2 +-
 .../functions/FlinkMergingReduceFunction.java   |   2 +-
 .../functions/FlinkPartialReduceFunction.java   |   2 +-
 .../functions/FlinkReduceFunction.java          |   2 +-
 .../beam/sdk/util/CombineContextFactory.java    |  18 --
 9 files changed, 244 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
index a927ecd..4550273 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java
@@ -75,31 +75,6 @@ public interface PerKeyCombineFnRunner<K, InputT, AccumT, 
OutputT> extends Seria
    */
   OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, 
?>.ProcessContext c);
 
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator 
in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code 
OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and 
extract output
-   * in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code 
OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, 
?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a 
{@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code 
OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, 
?>.ProcessContext c);
-
   /////////////////////////////////////////////////////////////////////////////
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
deleted file mode 100644
index 34d711b..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.SideInputReader;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} 
implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
-  /**
-   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, 
AccumT, OutputT>
-  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      return new KeyedCombineFnWithContextRunner<>(
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) 
perKeyCombineFn);
-    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
-      return new KeyedCombineFnRunner<>(
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else {
-      throw new IllegalStateException(
-          String.format("Unknown type of CombineFn: %s", 
perKeyCombineFn.getClass()));
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link 
KeyedCombineFn}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
-   */
-  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    private KeyedCombineFnRunner(
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
-    }
-
-    @Override
-    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFn;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext 
c) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, 
?>.ProcessContext c) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, 
?>.ProcessContext c) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, 
?>.ProcessContext c) {
-      return keyedCombineFn.apply(key, inputs);
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, 
?>.ProcessContext c) {
-      AccumT accum = keyedCombineFn.createAccumulator(key);
-      for (InputT input : inputs) {
-        accum = keyedCombineFn.addInput(key, accum, input);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFn.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, 
PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, 
PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions 
options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFn.compact(key, accumulator);
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link 
KeyedCombineFnWithContext}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
-   */
-  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, 
OutputT>
-      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> 
keyedCombineFnWithContext;
-
-    private KeyedCombineFnWithContextRunner(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> 
keyedCombineFnWithContext) {
-      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
-    }
-
-    @Override
-    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
-      return keyedCombineFnWithContext;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext 
c) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, value,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.mergeAccumulators(
-          key, accumulators, 
CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, 
?>.ProcessContext c) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, OldDoFn<?, 
?>.ProcessContext c) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, 
?>.ProcessContext c) {
-      return keyedCombineFnWithContext.apply(key, inputs,
-          CombineContextFactory.createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, 
?>.ProcessContext c) {
-      CombineWithContext.Context combineContext = 
CombineContextFactory.createFromProcessContext(c);
-      AccumT accum = keyedCombineFnWithContext.createAccumulator(key, 
combineContext);
-      for (InputT input : inputs) {
-        accum = keyedCombineFnWithContext.addInput(key, accum, input, 
combineContext);
-      }
-      return accum;
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFnWithContext.toString();
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, PipelineOptions options, 
SideInputReader sideInputReader,
-        Collection<? extends BoundedWindow> windows) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT addInput(K key, AccumT accumulator, InputT input, 
PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, input,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, 
PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions 
options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-
-    @Override
-    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
-        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
-      return keyedCombineFnWithContext.compact(key, accumulator,
-        CombineContextFactory.createFromComponents(
-          options, sideInputReader, Iterables.getOnlyElement(windows)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
new file mode 100644
index 0000000..f672578
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PerKeyCombineFnRunners.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Static utility methods that provide {@link PerKeyCombineFnRunner} 
implementations
+ * for different keyed combine functions.
+ */
+public class PerKeyCombineFnRunners {
+  /**
+   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, 
AccumT, OutputT>
+  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
+    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
+      return new KeyedCombineFnWithContextRunner<>(
+          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) 
perKeyCombineFn);
+    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
+      return new KeyedCombineFnRunner<>(
+          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
+    } else {
+      throw new IllegalStateException(
+          String.format("Unknown type of CombineFn: %s", 
perKeyCombineFn.getClass()));
+    }
+  }
+
+  /** Returns a {@code Combine.Context} that wraps a {@code 
OldDoFn.ProcessContext}. */
+  private static CombineWithContext.Context createFromProcessContext(
+      final OldDoFn<?, ?>.ProcessContext c) {
+    return new CombineWithContext.Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return c.getPipelineOptions();
+      }
+
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        return c.sideInput(view);
+      }
+    };
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link 
KeyedCombineFn}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
+   */
+  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
+
+    private KeyedCombineFnRunner(
+        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
+      this.keyedCombineFn = keyedCombineFn;
+    }
+
+    @Override
+    public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
+      return keyedCombineFn;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext 
c) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, 
?>.ProcessContext c) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFn.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFn.createAccumulator(key);
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, 
PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFn.addInput(key, accumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, 
PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFn.mergeAccumulators(key, accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions 
options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFn.extractOutput(key, accumulator);
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFn.compact(key, accumulator);
+    }
+  }
+
+  /**
+   * An implementation of {@link PerKeyCombineFnRunner} with {@link 
KeyedCombineFnWithContext}.
+   *
+   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
+   */
+  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, 
OutputT>
+      implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
+    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> 
keyedCombineFnWithContext;
+
+    private KeyedCombineFnWithContextRunner(
+        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> 
keyedCombineFnWithContext) {
+      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
+    }
+
+    @Override
+    public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
+      return keyedCombineFnWithContext;
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT addInput(
+        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext 
c) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, value,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(
+        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
+      return keyedCombineFnWithContext.mergeAccumulators(
+          key, accumulators, createFromProcessContext(c));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, 
?>.ProcessContext c) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+          createFromProcessContext(c));
+    }
+
+    @Override
+    public String toString() {
+      return keyedCombineFnWithContext.toString();
+    }
+
+    @Override
+    public AccumT createAccumulator(K key, PipelineOptions options, 
SideInputReader sideInputReader,
+        Collection<? extends BoundedWindow> windows) {
+      return keyedCombineFnWithContext.createAccumulator(key,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT addInput(K key, AccumT accumulator, InputT input, 
PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFnWithContext.addInput(key, accumulator, input,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, 
PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions 
options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFnWithContext.extractOutput(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+
+    @Override
+    public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
+        SideInputReader sideInputReader, Collection<? extends BoundedWindow> 
windows) {
+      return keyedCombineFnWithContext.compact(key, accumulator,
+        CombineContextFactory.createFromComponents(
+          options, sideInputReader, Iterables.getOnlyElement(windows)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 041d0e8..6412e63 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index fef7921..1456eea 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 59163e9..2f56fac 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -27,7 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 8b6ec3a..627cfa6 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -25,7 +25,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index fb5c90c..de0d416 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -27,7 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
+import org.apache.beam.runners.flink.PerKeyCombineFnRunners;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/e382c401/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index 149d276..a983057 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -49,23 +48,6 @@ public class CombineContextFactory {
   }
 
   /**
-   * Returns a {@code Combine.Context} that wraps a {@code 
OldDoFn.ProcessContext}.
-   */
-  public static Context createFromProcessContext(final OldDoFn<?, 
?>.ProcessContext c) {
-    return new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-    };
-  }
-
-  /**
    * Returns a {@code Combine.Context} that wraps a {@link StateContext}.
    */
   public static Context createFromStateContext(final StateContext<?> c) {

Reply via email to