Revert "Moves DoFnAdapters to runners-core"

This reverts commit 33ed3238e2b3899cff061be3056c5cc29fc60a04.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/954e57d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/954e57d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/954e57d7

Branch: refs/heads/master
Commit: 954e57d7696fd14f7d1015f4e40f025ef8538802
Parents: 4aa0ee1
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Dec 16 15:37:02 2016 -0800
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Fri Dec 16 16:39:20 2016 -0800

----------------------------------------------------------------------
 .../apex/translation/WindowBoundTranslator.java |   2 +-
 .../operators/ApexGroupByKeyOperator.java       |   2 +-
 .../operators/ApexParDoOperator.java            |   2 +-
 .../apache/beam/runners/core/DoFnAdapters.java  | 508 -------------------
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   4 +-
 .../core/GroupAlsoByWindowsProperties.java      |   2 +-
 .../functions/FlinkDoFnFunction.java            |   2 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |   2 +-
 .../functions/FlinkProcessContextBase.java      |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../sdk/transforms/AggregatorRetriever.java     |  13 +-
 .../beam/sdk/transforms/DoFnAdapters.java       | 504 ++++++++++++++++++
 .../org/apache/beam/sdk/transforms/OldDoFn.java |   2 +-
 .../apache/beam/sdk/transforms/NoOpOldDoFn.java |   2 +-
 14 files changed, 518 insertions(+), 531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
index ef049e1..33b9269 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -22,8 +22,8 @@ import java.util.Collections;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 4af7ff0..48ac177 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -413,7 +413,7 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator {
     }
 
     @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
         String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 4538fb5..a3d3a97 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -38,7 +38,6 @@ import 
org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
 import 
org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import 
org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
 import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
@@ -49,6 +48,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.NullSideInputReader;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
deleted file mode 100644
index fc5847c..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.IOException;
-import java.util.Collection;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AggregatorRetriever;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
- *
- * @deprecated This class will go away when we start running {@link DoFn}'s 
directly (using {@link
- *     DoFnInvoker}) rather than via {@link OldDoFn}.
- */
-@Deprecated
-public class DoFnAdapters {
-  /** Should not be instantiated. */
-  private DoFnAdapters() {}
-
-  /**
-   * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns 
the class of the
-   * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
-   */
-  public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
-    } else {
-      return fn.getClass();
-    }
-  }
-
-  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT> 
toOldDoFn(DoFn<InputT, OutputT> fn) {
-    DoFnSignature signature = DoFnSignatures.getSignature((Class) 
fn.getClass());
-    if (signature.processElement().observesWindow()) {
-      return new WindowDoFnAdapter<>(fn);
-    } else {
-      return new SimpleDoFnAdapter<>(fn);
-    }
-  }
-
-  /** Creates a {@link OldDoFn.ProcessContext} from a {@link 
DoFn.ProcessContext}. */
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext 
adaptProcessContext(
-      OldDoFn<InputT, OutputT> fn,
-      final DoFn<InputT, OutputT>.ProcessContext c,
-      final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
-    return fn.new ProcessContext() {
-      @Override
-      public InputT element() {
-        return c.element();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-
-      @Override
-      public Instant timestamp() {
-        return c.timestamp();
-      }
-
-      @Override
-      public BoundedWindow window() {
-        return extra.window();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return c.pane();
-      }
-
-      @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        return extra.windowingInternals();
-      }
-
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public void output(OutputT output) {
-        c.output(output);
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        c.outputWithTimestamp(output, timestamp);
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        c.sideOutput(tag, output);
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
-        c.sideOutputWithTimestamp(tag, output, timestamp);
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return c.createAggregator(name, combiner);
-      }
-    };
-  }
-
-  /** Creates a {@link OldDoFn.ProcessContext} from a {@link 
DoFn.ProcessContext}. */
-  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context 
adaptContext(
-      OldDoFn<InputT, OutputT> fn,
-      final DoFn<InputT, OutputT>.Context c) {
-    return fn.new Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public void output(OutputT output) {
-        c.output(output);
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        c.outputWithTimestamp(output, timestamp);
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        c.sideOutput(tag, output);
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
-        c.sideOutputWithTimestamp(tag, output, timestamp);
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return c.createAggregator(name, combiner);
-      }
-    };
-  }
-
-  /**
-   * If the fn was created using {@link #toOldDoFn}, returns the original 
{@link DoFn}. Otherwise,
-   * returns {@code null}.
-   */
-  @Nullable
-  public static <InputT, OutputT> DoFn<InputT, OutputT> 
getDoFn(OldDoFn<InputT, OutputT> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} 
as an {@link
-   * OldDoFn}.
-   */
-  private static class SimpleDoFnAdapter<InputT, OutputT> extends 
OldDoFn<InputT, OutputT> {
-    private final DoFn<InputT, OutputT> fn;
-    private transient DoFnInvoker<InputT, OutputT> invoker;
-
-    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(AggregatorRetriever.getDelegatingAggregators(fn));
-      this.fn = fn;
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-
-    @Override
-    public void setup() throws Exception {
-      this.invoker.invokeSetup();
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      fn.prepareForProcessing();
-      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
-    }
-
-    @Override
-    public void teardown() throws Exception {
-      this.invoker.invokeTeardown();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      ProcessContextAdapter<InputT, OutputT> adapter = new 
ProcessContextAdapter<>(fn, c);
-      invoker.invokeProcessElement(adapter);
-    }
-
-    @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return fn.getInputTypeDescriptor();
-    }
-
-    @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return fn.getOutputTypeDescriptor();
-    }
-
-    @Override
-    Collection<Aggregator<?, ?>> getAggregators() {
-      return fn.getAggregators();
-    }
-
-    @Override
-    public Duration getAllowedTimestampSkew() {
-      return fn.getAllowedTimestampSkew();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(fn);
-    }
-
-    private void readObject(java.io.ObjectInputStream in)
-        throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      this.invoker = DoFnInvokers.invokerFor(fn);
-    }
-  }
-
-  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an 
{@link OldDoFn}. */
-  private static class WindowDoFnAdapter<InputT, OutputT> extends 
SimpleDoFnAdapter<InputT, OutputT>
-      implements OldDoFn.RequiresWindowAccess {
-
-    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(fn);
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.Context} as a {@link 
DoFnInvoker.ArgumentProvider} inside a {@link
-   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the 
extra context is
-   * unavailable.
-   */
-  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, 
OutputT>.Context
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.Context context;
-
-    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, 
OutputT>.Context context) {
-      fn.super();
-      this.context = context;
-      super.setupDelegateAggregators();
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
-        String name,
-        CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // The OldDoFn doesn't allow us to ask for these outside processElement, 
so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get the window in processElement; elsewhere there is no 
defined window.");
-    }
-
-    @Override
-    public Context context(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Can only get a ProcessContext in processElement");
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Timers are not supported for OldDoFn");
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      // The OldDoFn doesn't allow us to ask for these outside 
ProcessElements, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get WindowingInternals in processElement");
-    }
-
-    @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for 
testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only 
for testing");
-    }
-
-    @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this 
runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by 
this runner");
-    }
-  }
-
-  /**
-   * Wraps an {@link OldDoFn.ProcessContext} as a {@link 
DoFnInvoker.ArgumentProvider} method.
-   */
-  private static class ProcessContextAdapter<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    private OldDoFn<InputT, OutputT>.ProcessContext context;
-
-    private ProcessContextAdapter(
-        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext 
context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return context.sideInput(view);
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
-    public InputT element() {
-      return context.element();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return context.timestamp();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return context.pane();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return context.window();
-    }
-
-    @Override
-    public Context context(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException("Timers are not supported for 
OldDoFn");
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return context.windowingInternals();
-    }
-
-    @Override
-    public DoFn.InputProvider<InputT> inputProvider() {
-      throw new UnsupportedOperationException("inputProvider() exists only for 
testing");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<OutputT> outputReceiver() {
-      throw new UnsupportedOperationException("outputReceiver() exists only 
for testing");
-    }
-
-    @Override
-    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
-      throw new UnsupportedOperationException("This is a non-splittable DoFn");
-    }
-
-    @Override
-    public State state(String stateId) {
-      throw new UnsupportedOperationException("State is not supported by this 
runner");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException("Timers are not supported by 
this runner");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 7d93200..1048fdc 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -322,7 +322,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT
     }
 
     @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       checkNotNull(combiner, "Combiner passed to createAggregatorInternal 
cannot be null");
       return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), 
stepContext, name, combiner);
@@ -504,7 +504,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT
     }
 
     @Override
-    public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, 
AggregatorOutputT>
+    protected <AggregatorInputT, AggregatorOutputT> 
Aggregator<AggregatorInputT, AggregatorOutputT>
         createAggregatorInternal(
             String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> 
combiner) {
       return context.createAggregatorInternal(name, combiner);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index ef01106..97b67c6 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -744,7 +744,7 @@ public class GroupAlsoByWindowsProperties {
     }
 
     @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 2a4a68e..ed200d5 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -18,10 +18,10 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
-import org.apache.beam.runners.core.DoFnAdapters;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index a97bd46..7f6a436 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -18,10 +18,10 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
-import org.apache.beam.runners.core.DoFnAdapters;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 53b9803..6afca38 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -252,7 +252,7 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
   public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp);
 
   @Override
-  public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+  protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
   createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
     @SuppressWarnings("unchecked")
     SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 001e3b6..8704308 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
@@ -42,6 +41,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
index b1d3ead..ce47e22 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
@@ -18,10 +18,9 @@
 package org.apache.beam.sdk.transforms;
 
 import java.util.Collection;
-import java.util.Map;
 
 /**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link 
DoFn DoFns}.
+ * An internal class for extracting {@link Aggregator Aggregators} from {@link 
OldDoFn DoFns}.
  */
 public final class AggregatorRetriever {
   private AggregatorRetriever() {
@@ -29,17 +28,9 @@ public final class AggregatorRetriever {
   }
 
   /**
-   * Returns the {@link Aggregator Aggregators} created by the provided {@link 
DoFn}.
+   * Returns the {@link Aggregator Aggregators} created by the provided {@link 
OldDoFn}.
    */
   public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
     return fn.getAggregators();
   }
-
-  /**
-   * Returns the {@link DelegatingAggregator delegating aggregators} created 
by the provided {@link
-   * DoFn}.
-   */
-  public static Map<String, DelegatingAggregator<?, ?>> 
getDelegatingAggregators(DoFn<?, ?> fn) {
-    return fn.aggregators;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
new file mode 100644
index 0000000..e15b08b
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.io.IOException;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
+ *
+ * @deprecated This class will go away when we start running {@link DoFn}'s 
directly (using {@link
+ *     DoFnInvoker}) rather than via {@link OldDoFn}.
+ */
+@Deprecated
+public class DoFnAdapters {
+  /** Should not be instantiated. */
+  private DoFnAdapters() {}
+
+  /**
+   * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns 
the class of the
+   * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
+   */
+  public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
+    if (fn instanceof SimpleDoFnAdapter) {
+      return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
+    } else {
+      return fn.getClass();
+    }
+  }
+
+  /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT> 
toOldDoFn(DoFn<InputT, OutputT> fn) {
+    DoFnSignature signature = DoFnSignatures.getSignature((Class) 
fn.getClass());
+    if (signature.processElement().observesWindow()) {
+      return new WindowDoFnAdapter<>(fn);
+    } else {
+      return new SimpleDoFnAdapter<>(fn);
+    }
+  }
+
+  /** Creates a {@link OldDoFn.ProcessContext} from a {@link 
DoFn.ProcessContext}. */
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext 
adaptProcessContext(
+      OldDoFn<InputT, OutputT> fn,
+      final DoFn<InputT, OutputT>.ProcessContext c,
+      final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
+    return fn.new ProcessContext() {
+      @Override
+      public InputT element() {
+        return c.element();
+      }
+
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        return c.sideInput(view);
+      }
+
+      @Override
+      public Instant timestamp() {
+        return c.timestamp();
+      }
+
+      @Override
+      public BoundedWindow window() {
+        return extra.window();
+      }
+
+      @Override
+      public PaneInfo pane() {
+        return c.pane();
+      }
+
+      @Override
+      public WindowingInternals<InputT, OutputT> windowingInternals() {
+        return extra.windowingInternals();
+      }
+
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return c.getPipelineOptions();
+      }
+
+      @Override
+      public void output(OutputT output) {
+        c.output(output);
+      }
+
+      @Override
+      public void outputWithTimestamp(OutputT output, Instant timestamp) {
+        c.outputWithTimestamp(output, timestamp);
+      }
+
+      @Override
+      public <T> void sideOutput(TupleTag<T> tag, T output) {
+        c.sideOutput(tag, output);
+      }
+
+      @Override
+      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
+        c.sideOutputWithTimestamp(tag, output, timestamp);
+      }
+
+      @Override
+      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
+          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+        return c.createAggregator(name, combiner);
+      }
+    };
+  }
+
+  /** Creates a {@link OldDoFn.ProcessContext} from a {@link 
DoFn.ProcessContext}. */
+  public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context 
adaptContext(
+      OldDoFn<InputT, OutputT> fn,
+      final DoFn<InputT, OutputT>.Context c) {
+    return fn.new Context() {
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return c.getPipelineOptions();
+      }
+
+      @Override
+      public void output(OutputT output) {
+        c.output(output);
+      }
+
+      @Override
+      public void outputWithTimestamp(OutputT output, Instant timestamp) {
+        c.outputWithTimestamp(output, timestamp);
+      }
+
+      @Override
+      public <T> void sideOutput(TupleTag<T> tag, T output) {
+        c.sideOutput(tag, output);
+      }
+
+      @Override
+      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
+        c.sideOutputWithTimestamp(tag, output, timestamp);
+      }
+
+      @Override
+      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
+          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+        return c.createAggregator(name, combiner);
+      }
+    };
+  }
+
+  /**
+   * If the fn was created using {@link #toOldDoFn}, returns the original 
{@link DoFn}. Otherwise,
+   * returns {@code null}.
+   */
+  @Nullable
+  public static <InputT, OutputT> DoFn<InputT, OutputT> 
getDoFn(OldDoFn<InputT, OutputT> fn) {
+    if (fn instanceof SimpleDoFnAdapter) {
+      return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} 
as an {@link
+   * OldDoFn}.
+   */
+  private static class SimpleDoFnAdapter<InputT, OutputT> extends 
OldDoFn<InputT, OutputT> {
+    private final DoFn<InputT, OutputT> fn;
+    private transient DoFnInvoker<InputT, OutputT> invoker;
+
+    SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(fn.aggregators);
+      this.fn = fn;
+      this.invoker = DoFnInvokers.invokerFor(fn);
+    }
+
+    @Override
+    public void setup() throws Exception {
+      this.invoker.invokeSetup();
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      fn.prepareForProcessing();
+      invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
+    }
+
+    @Override
+    public void teardown() throws Exception {
+      this.invoker.invokeTeardown();
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      ProcessContextAdapter<InputT, OutputT> adapter = new 
ProcessContextAdapter<>(fn, c);
+      invoker.invokeProcessElement(adapter);
+    }
+
+    @Override
+    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+      return fn.getInputTypeDescriptor();
+    }
+
+    @Override
+    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+      return fn.getOutputTypeDescriptor();
+    }
+
+    @Override
+    Collection<Aggregator<?, ?>> getAggregators() {
+      return fn.getAggregators();
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return fn.getAllowedTimestampSkew();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(fn);
+    }
+
+    private void readObject(java.io.ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      this.invoker = DoFnInvokers.invokerFor(fn);
+    }
+  }
+
+  /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an 
{@link OldDoFn}. */
+  private static class WindowDoFnAdapter<InputT, OutputT> extends 
SimpleDoFnAdapter<InputT, OutputT>
+      implements OldDoFn.RequiresWindowAccess {
+
+    WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
+      super(fn);
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.Context} as a {@link 
DoFnInvoker.ArgumentProvider} inside a {@link
+   * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the 
extra context is
+   * unavailable.
+   */
+  private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, 
OutputT>.Context
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.Context context;
+
+    private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, 
OutputT>.Context context) {
+      fn.super();
+      this.context = context;
+      super.setupDelegateAggregators();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
+        String name,
+        CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
+    public BoundedWindow window() {
+      // The OldDoFn doesn't allow us to ask for these outside processElement, 
so this
+      // should be unreachable.
+      throw new UnsupportedOperationException(
+          "Can only get the window in processElement; elsewhere there is no 
defined window.");
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Can only get a ProcessContext in processElement");
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Timers are not supported for OldDoFn");
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      // The OldDoFn doesn't allow us to ask for these outside 
ProcessElements, so this
+      // should be unreachable.
+      throw new UnsupportedOperationException(
+          "Can only get WindowingInternals in processElement");
+    }
+
+    @Override
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for 
testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only 
for testing");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this 
runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by 
this runner");
+    }
+  }
+
+  /**
+   * Wraps an {@link OldDoFn.ProcessContext} as a {@link 
DoFnInvoker.ArgumentProvider} method.
+   */
+  private static class ProcessContextAdapter<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.ProcessContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private OldDoFn<InputT, OutputT>.ProcessContext context;
+
+    private ProcessContextAdapter(
+        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext 
context) {
+      fn.super();
+      this.context = context;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      return context.sideInput(view);
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.output(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      context.outputWithTimestamp(output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      context.sideOutput(tag, output);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      context.sideOutputWithTimestamp(tag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+
+    @Override
+    public InputT element() {
+      return context.element();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return context.timestamp();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return context.pane();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return context.window();
+    }
+
+    @Override
+    public Context context(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("Timers are not supported for 
OldDoFn");
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return context.windowingInternals();
+    }
+
+    @Override
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for 
testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only 
for testing");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+      throw new UnsupportedOperationException("This is a non-splittable DoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("State is not supported by this 
runner");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("Timers are not supported by 
this runner");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index d1bb42b..2d2c1fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -208,7 +208,7 @@ public abstract class OldDoFn<InputT, OutputT> implements 
Serializable, HasDispl
      *         context
      */
     @Experimental(Kind.AGGREGATOR)
-    public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+    protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, 
AggOutputT>
         createAggregatorInternal(String name, CombineFn<AggInputT, ?, 
AggOutputT> combiner);
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
index 0db130d..504480b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
@@ -63,7 +63,7 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, 
OutputT> {
         Instant timestamp) {
     }
     @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
         createAggregatorInternal(String name, CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
       return null;
     }

Reply via email to