post-merge fix

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8f013cb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8f013cb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8f013cb7

Branch: refs/heads/gearpump-runner
Commit: 8f013cb76fb85421da00eb8df0074dac0a8233fa
Parents: 9dc9be9
Author: manuzhang <[email protected]>
Authored: Wed Oct 26 11:22:56 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Oct 26 14:02:54 2016 +0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 586 +++++++++++++++++++
 .../beam/runners/direct/DirectRunner.java       |  22 -
 .../beam/runners/direct/KeyedResourcePool.java  |  47 --
 .../runners/direct/LockedKeyedResourcePool.java |  95 ---
 .../direct/ParDoSingleEvaluatorFactory.java     |   2 +-
 .../direct/TransformEvaluatorRegistry.java      |  37 --
 .../direct/LockedKeyedResourcePoolTest.java     | 163 ------
 .../flink/examples/streaming/package-info.java  |  22 +
 .../apache/beam/runners/flink/package-info.java |  22 -
 runners/gearpump/pom.xml                        |   2 +-
 .../gearpump/GearpumpPipelineResult.java        |   6 +
 .../gearpump/GearpumpPipelineRunner.java        |   2 -
 .../translators/TransformTranslator.java        |  30 +
 .../translators/io/UnboundedSourceWrapper.java  |  45 ++
 .../translators/utils/GearpumpDoFnRunner.java   |  16 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    |   1 -
 .../runners/spark/translation/DoFnFunction.java |   2 -
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  15 +-
 .../apache/beam/sdk/runners/PipelineRunner.java |   1 -
 .../apache/beam/sdk/testing/TestPipeline.java   |   2 -
 .../org/apache/beam/sdk/transforms/DoFn.java    |  14 -
 .../org/apache/beam/sdk/transforms/ParDo.java   |  12 -
 .../beam/sdk/transforms/LatestFnTests.java      | 233 --------
 .../beam/sdk/io/kinesis/package-info.java       |  22 -
 .../beam/sdk/io/mongodb/package-info.java       |  22 -
 26 files changed, 703 insertions(+), 720 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
new file mode 100644
index 0000000..dec9905
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.InputProvider;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+/**
+ * Runs a {@link DoFn} by constructing the appropriate contexts and passing 
them in.
+ *
+ * @param <InputT> the type of the {@link DoFn} (main) input elements
+ * @param <OutputT> the type of the {@link DoFn} (main) output elements
+ */
+public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, 
OutputT> {
+
+  /** The {@link DoFn} being run. */
+  private final DoFn<InputT, OutputT> fn;
+
+  /** The {@link DoFnInvoker} being run. */
+  private final DoFnInvoker<InputT, OutputT> invoker;
+
+  /** The context used for running the {@link DoFn}. */
+  private final DoFnContext<InputT, OutputT> context;
+
+  private final OutputManager outputManager;
+
+  private final TupleTag<OutputT> mainOutputTag;
+
+  private final boolean observesWindow;
+
+  public SimpleDoFnRunner(
+      PipelineOptions options,
+      DoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      AggregatorFactory aggregatorFactory,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    this.fn = fn;
+    this.observesWindow =
+        
DoFnSignatures.INSTANCE.getSignature(fn.getClass()).processElement().observesWindow();
+    this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+    this.outputManager = outputManager;
+    this.mainOutputTag = mainOutputTag;
+    this.context =
+        new DoFnContext<>(
+            options,
+            fn,
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            sideOutputTags,
+            stepContext,
+            aggregatorFactory,
+            windowingStrategy == null ? null : 
windowingStrategy.getWindowFn());
+  }
+
+  @Override
+  public void startBundle() {
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      invoker.invokeStartBundle(context);
+    } catch (Throwable t) {
+      // Exception in user code.
+      throw wrapUserCodeException(t);
+    }
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> compressedElem) {
+    if (observesWindow) {
+      for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
+        invokeProcessElement(elem);
+      }
+    } else {
+      invokeProcessElement(compressedElem);
+    }
+  }
+
+  private void invokeProcessElement(WindowedValue<InputT> elem) {
+    final DoFn<InputT, OutputT>.ProcessContext processContext = 
createProcessContext(elem);
+
+    // Note that if the element must be exploded into all its windows, that 
has to be done outside
+    // of this runner.
+    final DoFn.ExtraContextFactory<InputT, OutputT> extraContextFactory =
+        createExtraContextFactory(elem);
+
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      invoker.invokeProcessElement(processContext, extraContextFactory);
+    } catch (Exception ex) {
+      throw wrapUserCodeException(ex);
+    }
+  }
+
+  @Override
+  public void finishBundle() {
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      invoker.invokeFinishBundle(context);
+    } catch (Throwable t) {
+      // Exception in user code.
+      throw wrapUserCodeException(t);
+    }
+  }
+
+  /** Returns a new {@link DoFn.ProcessContext} for the given element. */
+  private DoFn<InputT, OutputT>.ProcessContext 
createProcessContext(WindowedValue<InputT> elem) {
+    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
+  }
+
+  private DoFn.ExtraContextFactory<InputT, OutputT> createExtraContextFactory(
+      WindowedValue<InputT> elem) {
+    return new DoFnExtraContextFactory<InputT, OutputT>(elem.getWindows(), 
elem.getPane());
+  }
+
+  private RuntimeException wrapUserCodeException(Throwable t) {
+    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+  }
+
+  private boolean isSystemDoFn() {
+    return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+  }
+
+  /**
+   * A concrete implementation of {@code DoFn.Context} used for running a 
{@link DoFn}.
+   *
+   * @param <InputT> the type of the {@link DoFn} (main) input elements
+   * @param <OutputT> the type of the {@link DoFn} (main) output elements
+   */
+  private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, 
OutputT>.Context {
+    private static final int MAX_SIDE_OUTPUTS = 1000;
+
+    final PipelineOptions options;
+    final DoFn<InputT, OutputT> fn;
+    final SideInputReader sideInputReader;
+    final OutputManager outputManager;
+    final TupleTag<OutputT> mainOutputTag;
+    final StepContext stepContext;
+    final AggregatorFactory aggregatorFactory;
+    final WindowFn<?, ?> windowFn;
+
+    /**
+     * The set of known output tags, some of which may be undeclared, so we 
can throw an exception
+     * when it exceeds {@link #MAX_SIDE_OUTPUTS}.
+     */
+    private Set<TupleTag<?>> outputTags;
+
+    public DoFnContext(
+        PipelineOptions options,
+        DoFn<InputT, OutputT> fn,
+        SideInputReader sideInputReader,
+        OutputManager outputManager,
+        TupleTag<OutputT> mainOutputTag,
+        List<TupleTag<?>> sideOutputTags,
+        StepContext stepContext,
+        AggregatorFactory aggregatorFactory,
+        WindowFn<?, ?> windowFn) {
+      fn.super();
+      this.options = options;
+      this.fn = fn;
+      this.sideInputReader = sideInputReader;
+      this.outputManager = outputManager;
+      this.mainOutputTag = mainOutputTag;
+      this.outputTags = Sets.newHashSet();
+
+      outputTags.add(mainOutputTag);
+      for (TupleTag<?> sideOutputTag : sideOutputTags) {
+        outputTags.add(sideOutputTag);
+      }
+
+      this.stepContext = stepContext;
+      this.aggregatorFactory = aggregatorFactory;
+      this.windowFn = windowFn;
+      super.setupDelegateAggregators();
+    }
+
+    
//////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
+        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
+      final Instant inputTimestamp = timestamp;
+
+      if (timestamp == null) {
+        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+
+      if (windows == null) {
+        try {
+          // The windowFn can never succeed at accessing the element, so its 
type does not
+          // matter here
+          @SuppressWarnings("unchecked")
+          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
+          windows =
+              objectWindowFn.assignWindows(
+                  objectWindowFn.new AssignContext() {
+                    @Override
+                    public Object element() {
+                      throw new UnsupportedOperationException(
+                          "WindowFn attempted to access input element when 
none was available");
+                    }
+
+                    @Override
+                    public Instant timestamp() {
+                      if (inputTimestamp == null) {
+                        throw new UnsupportedOperationException(
+                            "WindowFn attempted to access input timestamp when 
none was available");
+                      }
+                      return inputTimestamp;
+                    }
+
+                    @Override
+                    public W window() {
+                      throw new UnsupportedOperationException(
+                          "WindowFn attempted to access input windows when 
none were available");
+                    }
+                  });
+        } catch (Exception e) {
+          throw UserCodeException.wrap(e);
+        }
+      }
+
+      return WindowedValue.of(output, timestamp, windows, pane);
+    }
+
+    public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
+      if (!sideInputReader.contains(view)) {
+        throw new IllegalArgumentException("calling sideInput() with unknown 
view");
+      }
+      BoundedWindow sideInputWindow =
+          
view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+      return sideInputReader.get(view, sideInputWindow);
+    }
+
+    void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
+    }
+
+    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
+      outputManager.output(mainOutputTag, windowedElem);
+      if (stepContext != null) {
+        stepContext.noteOutput(windowedElem);
+      }
+    }
+
+    private <T> void sideOutputWindowedValue(
+        TupleTag<T> tag,
+        T output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, 
windows, pane));
+    }
+
+    private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> 
windowedElem) {
+      if (!outputTags.contains(tag)) {
+        // This tag wasn't declared nor was it seen before during this 
execution.
+        // Thus, this must be a new, undeclared and unconsumed output.
+        // To prevent likely user errors, enforce the limit on the number of 
side
+        // outputs.
+        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
+          throw new IllegalArgumentException(
+              "the number of side outputs has exceeded a limit of " + 
MAX_SIDE_OUTPUTS);
+        }
+        outputTags.add(tag);
+      }
+
+      outputManager.output(tag, windowedElem);
+      if (stepContext != null) {
+        stepContext.noteSideOutput(tag, windowedElem);
+      }
+    }
+
+    // Following implementations of output, outputWithTimestamp, and sideOutput
+    // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will 
be shadowed by
+    // ProcessContext's versions in DoFn.processElement.
+    @Override
+    public void output(OutputT output) {
+      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
+      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be 
null");
+      sideOutputWindowedValue(tag, output, timestamp, null, 
PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregator(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      checkNotNull(combiner, "Combiner passed to createAggregator cannot be 
null");
+      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), 
stepContext, name, combiner);
+    }
+  }
+
+  /**
+   * A concrete implementation of {@link DoFn.ProcessContext} used for running 
a {@link DoFn} over a
+   * single element.
+   *
+   * @param <InputT> the type of the {@link DoFn} (main) input elements
+   * @param <OutputT> the type of the {@link DoFn} (main) output elements
+   */
+  private static class DoFnProcessContext<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.ProcessContext {
+
+    final DoFn<InputT, OutputT> fn;
+    final DoFnContext<InputT, OutputT> context;
+    final WindowedValue<InputT> windowedValue;
+
+    public DoFnProcessContext(
+        DoFn<InputT, OutputT> fn,
+        DoFnContext<InputT, OutputT> context,
+        WindowedValue<InputT> windowedValue) {
+      fn.super();
+      this.fn = fn;
+      this.context = context;
+      this.windowedValue = windowedValue;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public InputT element() {
+      return windowedValue.getValue();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      checkNotNull(view, "View passed to sideInput cannot be null");
+      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
+      BoundedWindow window;
+      if (!windowIter.hasNext()) {
+        if (context.windowFn instanceof GlobalWindows) {
+          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
+          // without windows
+          window = GlobalWindow.INSTANCE;
+        } else {
+          throw new IllegalStateException(
+              "sideInput called when main input element is not in any 
windows");
+        }
+      } else {
+        window = windowIter.next();
+        if (windowIter.hasNext()) {
+          throw new IllegalStateException(
+              "sideInput called when main input element is in multiple 
windows");
+        }
+      }
+      return context.sideInput(view, window);
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return windowedValue.getPane();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.outputWindowedValue(windowedValue.withValue(output));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkTimestamp(timestamp);
+      context.outputWindowedValue(
+          output, timestamp, windowedValue.getWindows(), 
windowedValue.getPane());
+    }
+
+    void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      context.outputWindowedValue(output, timestamp, windows, pane);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      checkNotNull(tag, "Tag passed to sideOutput cannot be null");
+      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be 
null");
+      checkTimestamp(timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, windowedValue.getWindows(), 
windowedValue.getPane());
+    }
+
+    @Override
+    public Instant timestamp() {
+      return windowedValue.getTimestamp();
+    }
+
+    public Collection<? extends BoundedWindow> windows() {
+      return windowedValue.getWindows();
+    }
+
+    private void checkTimestamp(Instant timestamp) {
+      if 
(timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew())))
 {
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
+                    + "timestamp of the current input (%s) minus the allowed 
skew (%s). See the "
+                    + "DoFn#getAllowedTimestampSkew() Javadoc for details on 
changing the allowed "
+                    + "skew.",
+                timestamp,
+                windowedValue.getTimestamp(),
+                
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+      }
+    }
+
+    @Override
+    protected <AggregatorInputT, AggregatorOutputT>
+        Aggregator<AggregatorInputT, AggregatorOutputT> createAggregator(
+            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> 
combiner) {
+      return context.createAggregator(name, combiner);
+    }
+  }
+
+  private class DoFnExtraContextFactory<InputT, OutputT>
+      implements DoFn.ExtraContextFactory<InputT, OutputT> {
+
+    /** The windows of the current element. */
+    private final Collection<? extends BoundedWindow> windows;
+
+    /** The pane of the current element. */
+    private final PaneInfo pane;
+
+    public DoFnExtraContextFactory(Collection<? extends BoundedWindow> 
windows, PaneInfo pane) {
+      this.windows = windows;
+      this.pane = pane;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(windows);
+    }
+
+    @Override
+    public InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("InputProvider parameters are 
not supported.");
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("OutputReceiver parameters are 
not supported.");
+    }
+
+    @Override
+    public <RestrictionT> RestrictionTracker<RestrictionT> 
restrictionTracker() {
+      throw new UnsupportedOperationException("RestrictionTracker parameters 
are not supported.");
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return new WindowingInternals<InputT, OutputT>() {
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          return windows;
+        }
+
+        @Override
+        public PaneInfo pane() {
+          return pane;
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return context.stepContext.timerInternals();
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(
+            TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> 
elemCoder)
+            throws IOException {
+          @SuppressWarnings("unchecked")
+          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) 
context.windowFn.windowCoder();
+
+          context.stepContext.writePCollectionViewData(
+              tag,
+              data,
+              IterableCoder.of(WindowedValue.getFullCoder(elemCoder, 
windowCoder)),
+              window(),
+              windowCoder);
+        }
+
+        @Override
+        public StateInternals<?> stateInternals() {
+          return context.stepContext.stateInternals();
+        }
+
+        @Override
+        public void outputWindowedValue(
+            OutputT output,
+            Instant timestamp,
+            Collection<? extends BoundedWindow> windows,
+            PaneInfo pane) {}
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
+          return context.sideInput(view, mainInputWindow);
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f87f1c1..e02c8a6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -465,26 +465,4 @@ public class DirectRunner
       return NanosOffsetClock.create();
     }
   }
-
-  /**
-   * A {@link Supplier} that creates a {@link ExecutorService} based on
-   * {@link Executors#newFixedThreadPool(int)}.
-   */
-  private static class FixedThreadPoolSupplier implements 
Supplier<ExecutorService> {
-    @Override
-    public ExecutorService get() {
-      return 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-    }
-  }
-
-
-  /**
-   * A {@link Supplier} that creates a {@link NanosOffsetClock}.
-   */
-  private static class NanosOffsetClockSupplier implements Supplier<Clock> {
-    @Override
-    public Clock get() {
-      return NanosOffsetClock.create();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
deleted file mode 100644
index b976b69..0000000
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import com.google.common.base.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A pool of resources associated with specific keys. Implementations enforce 
specific use patterns,
- * such as limiting the the number of outstanding elements available per key.
- */
-interface KeyedResourcePool<K, V> {
-  /**
-   * Tries to acquire a value for the provided key, loading it via the 
provided loader if necessary.
-   *
-   * <p>If the returned {@link Optional} contains a value, the caller obtains 
ownership of that
-   * value. The value should be released back to this {@link 
KeyedResourcePool} after the
-   * caller no longer has use of it using {@link #release(Object, Object)}.
-   *
-   * <p>The provided {@link Callable} <b>must not</b> return null; it may 
either return a non-null
-   * value or throw an exception.
-   */
-  Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException;
-
-  /**
-   * Release the provided value, relinquishing ownership of it. Future calls to
-   * {@link #tryAcquire(Object, Callable)} may return the released value.
-   */
-  void release(K key, V value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
deleted file mode 100644
index 8b1e0b1..0000000
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ExecutionError;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A {@link KeyedResourcePool} which is limited to at most one outstanding 
instance at a time for
- * each key.
- */
-class LockedKeyedResourcePool<K, V> implements KeyedResourcePool<K, V> {
-  /**
-   * A map from each key to an {@link Optional} of the associated value. At 
most one value is stored
-   * per key, and it is obtained by at most one thread at a time.
-   *
-   * <p>For each key in this map:
-   *
-   * <ul>
-   * <li>If there is no associated value, then no value has been stored yet.
-   * <li>If the value is {@code Optional.absent()} then the value is currently 
in use.
-   * <li>If the value is {@code Optional.present()} then the contained value 
is available for use.
-   * </ul>
-   */
-  public static <K, V> LockedKeyedResourcePool<K, V> create() {
-    return new LockedKeyedResourcePool<>();
-  }
-
-  private final ConcurrentMap<K, Optional<V>> cache;
-
-  private LockedKeyedResourcePool() {
-    cache = new ConcurrentHashMap<>();
-  }
-
-  @Override
-  public Optional<V> tryAcquire(K key, Callable<V> loader) throws 
ExecutionException {
-    Optional<V> value = cache.replace(key, Optional.<V>absent());
-    if (value == null) {
-      // No value already existed, so populate the cache with the value 
returned by the loader
-      cache.putIfAbsent(key, Optional.of(load(loader)));
-      // Some other thread may obtain the result after the putIfAbsent, so 
retry acquisition
-      value = cache.replace(key, Optional.<V>absent());
-    }
-    return value;
-  }
-
-  private V load(Callable<V> loader) throws ExecutionException {
-    try {
-      return loader.call();
-    } catch (Error t) {
-      throw new ExecutionError(t);
-    } catch (RuntimeException e) {
-      throw new UncheckedExecutionException(e);
-    } catch (Exception e) {
-      throw new ExecutionException(e);
-    }
-  }
-
-  @Override
-  public void release(K key, V value) {
-    Optional<V> replaced = cache.replace(key, Optional.of(value));
-    checkNotNull(replaced, "Tried to release before a value was acquired");
-    checkState(
-        !replaced.isPresent(),
-        "Released a value to a %s where there is already a value present for 
key %s (%s). "
-            + "At most one value may be present at a time.",
-        LockedKeyedResourcePool.class.getSimpleName(),
-        key,
-        replaced);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 9a08e8f..0584e41 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -91,7 +91,7 @@ class ParDoSingleEvaluatorFactory implements 
TransformEvaluatorFactory {
               stepContext,
               inputBundle,
               application,
-              (OldDoFn) fnLocal.get(),
+              fnLocal.get(),
               application.getTransform().getSideInputs(),
               mainOutputTag,
               Collections.<TupleTag<?>>emptyList(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 6485714..3dd44a7 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -116,41 +116,4 @@ class TransformEvaluatorRegistry implements 
TransformEvaluatorFactory {
       throw toThrow;
     }
   }
-
-  @Override
-  public void cleanup() throws Exception {
-    Collection<Exception> thrownInCleanup = new ArrayList<>();
-    for (TransformEvaluatorFactory factory : factories.values()) {
-      try {
-        factory.cleanup();
-      } catch (Exception e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        thrownInCleanup.add(e);
-      }
-    }
-    finished.set(true);
-    if (!thrownInCleanup.isEmpty()) {
-      LOG.error("Exceptions {} thrown while cleaning up evaluators", 
thrownInCleanup);
-      Exception toThrow = null;
-      for (Exception e : thrownInCleanup) {
-        if (toThrow == null) {
-          toThrow = e;
-        } else {
-          toThrow.addSuppressed(e);
-        }
-      }
-      throw toThrow;
-    }
-  }
-
-  /**
-   * A factory to create Transform Evaluator Registries.
-   */
-  public static class Factory {
-    public TransformEvaluatorRegistry create() {
-      return TransformEvaluatorRegistry.defaultRegistry();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
deleted file mode 100644
index e1e24a3..0000000
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ExecutionError;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link LockedKeyedResourcePool}.
- */
-@RunWith(JUnit4.class)
-public class LockedKeyedResourcePoolTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private LockedKeyedResourcePool<String, Integer> cache =
-      LockedKeyedResourcePool.create();
-
-  @Test
-  public void acquireReleaseAcquireLastLoadedOrReleased() throws 
ExecutionException {
-    Optional<Integer> returned = cache.tryAcquire("foo", new 
Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    assertThat(returned.get(), equalTo(3));
-
-    cache.release("foo", 4);
-    Optional<Integer> reacquired = cache.tryAcquire("foo", new 
Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 5;
-      }
-    });
-    assertThat(reacquired.get(), equalTo(4));
-  }
-
-  @Test
-  public void acquireReleaseReleaseThrows() throws ExecutionException {
-    Optional<Integer> returned = cache.tryAcquire("foo", new 
Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    assertThat(returned.get(), equalTo(3));
-
-    cache.release("foo", 4);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("already a value present");
-    thrown.expectMessage("At most one");
-    cache.release("foo", 4);
-  }
-
-  @Test
-  public void releaseBeforeAcquireThrows() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("before a value was acquired");
-    cache.release("bar", 3);
-  }
-
-  @Test
-  public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException {
-    Optional<Integer> returned = cache.tryAcquire("foo", new 
Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    Optional<Integer> secondReturned = cache.tryAcquire("foo", new 
Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    assertThat(secondReturned.isPresent(), is(false));
-  }
-
-  @Test
-  public void acquireMultipleKeysSucceeds() throws ExecutionException {
-    Optional<Integer> returned = cache.tryAcquire("foo", new 
Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 3;
-      }
-    });
-    Optional<Integer> secondReturned = cache.tryAcquire("bar", new 
Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        return 4;
-      }
-    });
-
-    assertThat(returned.get(), equalTo(3));
-    assertThat(secondReturned.get(), equalTo(4));
-  }
-
-  @Test
-  public void acquireThrowsExceptionWrapped() throws ExecutionException {
-    final Exception cause = new Exception("checkedException");
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(equalTo(cause));
-    cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        throw cause;
-      }
-    });
-  }
-
-  @Test
-  public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException 
{
-    final RuntimeException cause = new RuntimeException("UncheckedException");
-    thrown.expect(UncheckedExecutionException.class);
-    thrown.expectCause(equalTo(cause));
-    cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        throw cause;
-      }
-    });
-  }
-
-  @Test
-  public void acquireThrowsErrorWrapped() throws ExecutionException {
-    final Error cause = new Error("Error");
-    thrown.expect(ExecutionError.class);
-    thrown.expectCause(equalTo(cause));
-    cache.tryAcquire("foo", new Callable<Integer>() {
-      @Override
-      public Integer call() throws Exception {
-        throw cause;
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
new file mode 100644
index 0000000..58f41b6
--- /dev/null
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Flink Beam runner exemple.
+ */
+package org.apache.beam.runners.flink.examples.streaming;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
deleted file mode 100644
index 57f1e59..0000000
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 6576ba6..6c104eb 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -225,7 +225,7 @@
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
-      <type>test-jar</type>
+      <classifier>tests</classifier>
       <scope>test</scope>
       <exclusions>
         <exclusion>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 2011a4b..e7c621e 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 
 import org.joda.time.Duration;
@@ -60,4 +61,9 @@ public class GearpumpPipelineResult implements PipelineResult 
{
         new UnsupportedOperationException());
   }
 
+  @Override
+  public MetricResults metrics() {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
index ad7bb3e..9e32227 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
@@ -53,8 +53,6 @@ import 
org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
  * A {@link PipelineRunner} that executes the operations in the
  * pipeline by first translating them to Gearpump Stream DSL
  * and then executing them on a Gearpump cluster.
- * <p>>
- * This is based on DataflowPipelineRunner.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class GearpumpPipelineRunner extends 
PipelineRunner<GearpumpPipelineResult> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
new file mode 100644
index 0000000..c8587d3
--- /dev/null
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * translates {@link PTransform} to Gearpump functions.
+ */
+public interface TransformTranslator<T extends PTransform> extends 
Serializable {
+  void translate(T transform, TranslationContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
new file mode 100644
index 0000000..dfdecb2
--- /dev/null
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * wrapper over UnboundedSource for Gearpump DataSource API.
+ */
+public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends 
UnboundedSource.CheckpointMark>
+    extends GearpumpSource<OutputT> {
+
+  private final UnboundedSource<OutputT, CheckpointMarkT> source;
+
+  public UnboundedSourceWrapper(UnboundedSource<OutputT, CheckpointMarkT> 
source,
+      PipelineOptions options) {
+    super(options);
+    this.source = source;
+  }
+
+  @Override
+  protected Source.Reader<OutputT> createReader(PipelineOptions options) 
throws IOException {
+    return source.createReader(options, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
index e205575..ec86a8d 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
@@ -18,7 +18,8 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
-import com.google.common.base.Preconditions;
+import static 
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
@@ -59,6 +60,7 @@ import org.apache.beam.sdk.values.TupleTag;
 
 import org.joda.time.Instant;
 
+
 /**
  * a serializable {@link SimpleDoFnRunner}.
  */
@@ -330,20 +332,20 @@ public class GearpumpDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, O
 
     @Override
     public <T> void sideOutput(TupleTag<T> tag, T output) {
-      Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be 
null");
+      checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
       sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
     }
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      Preconditions.checkNotNull(tag, "TupleTag passed to 
sideOutputWithTimestamp cannot be null");
+      checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be 
null");
       sideOutputWindowedValue(tag, output, timestamp, null, 
PaneInfo.NO_FIRING);
     }
 
     @Override
     protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
         String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      Preconditions.checkNotNull(combiner,
+      checkNotNull(combiner,
           "Combiner passed to createAggregator cannot be null");
       throw new UnsupportedOperationException("aggregator not supported in 
Gearpump runner");
     }
@@ -386,7 +388,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, O
 
     @Override
     public <T> T sideInput(PCollectionView<T> view) {
-      Preconditions.checkNotNull(view, "View passed to sideInput cannot be 
null");
+      checkNotNull(view, "View passed to sideInput cannot be null");
       Iterator<? extends BoundedWindow> windowIter = windows().iterator();
       BoundedWindow window;
       if (!windowIter.hasNext()) {
@@ -435,13 +437,13 @@ public class GearpumpDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, O
 
     @Override
     public <T> void sideOutput(TupleTag<T> tag, T output) {
-      Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be 
null");
+      checkNotNull(tag, "Tag passed to sideOutput cannot be null");
       context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
     }
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp 
cannot be null");
+      checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be 
null");
       context.sideOutputWindowedValue(
           tag, output, timestamp, windowedValue.getWindows(), 
windowedValue.getPane());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index b84a1a8..b211c04 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -77,4 +77,3 @@ public class DoFnInfo<InputT, OutputT> implements 
Serializable {
     return outputMap;
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 69c450e..4dfbee6 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -45,8 +45,6 @@ public class DoFnFunction<InputT, OutputT>
     implements FlatMapFunction<Iterator<WindowedValue<InputT>>, 
WindowedValue<OutputT>> {
   private final Accumulator<NamedAggregators> accum;
   private final OldDoFn<InputT, OutputT> mFunction;
-  private static final Logger LOG = 
LoggerFactory.getLogger(DoFnFunction.class);
-
   private final SparkRuntimeContext mRuntimeContext;
   private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
BroadcastHelper<?>>> mSideInputs;
   private final WindowFn<Object, ?> windowFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 0776786..2286832 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -134,7 +134,7 @@ public class Pipeline {
    */
   public static Pipeline create(PipelineOptions options) {
     Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), 
options);
-    LOG.info("Creating {}", pipeline);
+    LOG.debug("Creating {}", pipeline);
     return pipeline;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 78ea988..2dbcda7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -33,20 +34,8 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
 import java.util.regex.Pattern;
+
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 1ec4103..ede1507 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -57,7 +57,6 @@ public abstract class PipelineRunner<ResultT extends 
PipelineResult> {
         .fromFactoryMethod("fromOptions")
         .withArg(PipelineOptions.class, options)
         .build();
-    System.out.println("runner: " + result.getClass().getName());
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 3202000..f1bf09d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -95,7 +95,6 @@ public class TestPipeline extends Pipeline {
   }
 
   public static TestPipeline fromOptions(PipelineOptions options) {
-    System.out.println(options);
     return new TestPipeline(PipelineRunner.fromOptions(options), options);
   }
 
@@ -134,7 +133,6 @@ public class TestPipeline extends Pipeline {
       @Nullable String beamTestPipelineOptions =
           System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
 
-      System.out.println("options " + beamTestPipelineOptions);
       PipelineOptions options =
           Strings.isNullOrEmpty(beamTestPipelineOptions)
               ? PipelineOptionsFactory.create()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index f2fa87c..018877f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -423,20 +423,6 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
 
   /////////////////////////////////////////////////////////////////////////////
 
-
-  /**
-   * Annotation for the method to use to prepare an instance for processing 
bundles of elements. The
-   * method annotated with this must satisfy the following constraints
-   * <ul>
-   *   <li>It must have zero arguments.
-   * </ul>
-   */
-  @Documented
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  public @interface Setup {
-  }
-
   /**
    * Annotation for declaring and dereferencing state cells.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index a7dc136..a3a306a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -670,18 +670,6 @@ public class ParDo {
 
     /**
      * Returns a new {@link ParDo} {@link PTransform} that's like this
-     * transform but which will invoke the given {@link DoFn}
-     * function, and which has its input and output types bound. Does
-     * not modify this transform. The resulting {@link PTransform} is
-     * sufficiently specified to be applied, but more properties can
-     * still be specified.
-     */
-    public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> 
fn) {
-      return of(adapt(fn), fn.getClass());
-    }
-
-    /**
-     * Returns a new {@link ParDo} {@link PTransform} that's like this
      * transform but that will invoke the given {@link OldDoFn}
      * function, and that has its input and output types bound. Does
      * not modify this transform. The resulting {@link PTransform} is

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
deleted file mode 100644
index 84b5b68..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.isOneOf;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link Latest.LatestFn}.
- * */
-@RunWith(JUnit4.class)
-public class LatestFnTests {
-  private static final Instant INSTANT = new Instant(100);
-  private static final long VALUE = 100 * INSTANT.getMillis();
-
-  private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, 
INSTANT);
-  private static final TimestampedValue<Long> TV_MINUS_TEN =
-      TimestampedValue.of(VALUE - 10, INSTANT.minus(10));
-  private static final TimestampedValue<Long> TV_PLUS_TEN =
-      TimestampedValue.of(VALUE + 10, INSTANT.plus(10));
-
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-
-  private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
-  private final Instant baseTimestamp = Instant.now();
-
-  @Test
-  public void testDefaultValue() {
-    assertThat(fn.defaultValue(), nullValue());
-  }
-
-  @Test
-  public void testCreateAccumulator() {
-    assertEquals(TimestampedValue.atMinimumTimestamp(null), 
fn.createAccumulator());
-  }
-
-  @Test
-  public void testAddInputInitialAdd() {
-    TimestampedValue<Long> input = TV;
-    assertEquals(input, fn.addInput(fn.createAccumulator(), input));
-  }
-
-  @Test
-  public void testAddInputMinTimestamp() {
-    TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L);
-    assertEquals(input, fn.addInput(fn.createAccumulator(), input));
-  }
-
-  @Test
-  public void testAddInputEarlierValue() {
-    assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN));
-  }
-
-  @Test
-  public void testAddInputLaterValue() {
-    assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN));
-  }
-
-  @Test
-  public void testAddInputSameTimestamp() {
-    TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT);
-    TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT);
-
-    assertThat("Latest for values with the same timestamp is chosen 
arbitrarily",
-        fn.addInput(accum, input), isOneOf(accum, input));
-  }
-
-  @Test
-  public void testAddInputNullAccumulator() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("accumulators");
-    fn.addInput(null, TV);
-  }
-
-  @Test
-  public void testAddInputNullInput() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("input");
-    fn.addInput(TV, null);
-  }
-
-  @Test
-  public void testAddInputNullValue() {
-    TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10));
-    assertEquals("Null values are allowed", input, fn.addInput(TV, input));
-  }
-
-  @Test
-  public void testMergeAccumulatorsMultipleValues() {
-    Iterable<TimestampedValue<Long>> accums = Lists.newArrayList(
-        TV,
-        TV_PLUS_TEN,
-        TV_MINUS_TEN
-    );
-
-    assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums));
-  }
-
-  @Test
-  public void testMergeAccumulatorsSingleValue() {
-    assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV)));
-  }
-
-  @Test
-  public void testMergeAccumulatorsEmptyIterable() {
-    ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList();
-    assertEquals(TimestampedValue.atMinimumTimestamp(null), 
fn.mergeAccumulators(emptyAccums));
-  }
-
-  @Test
-  public void testMergeAccumulatorsDefaultAccumulator() {
-    TimestampedValue<Long> defaultAccum = fn.createAccumulator();
-    assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, 
defaultAccum)));
-  }
-
-  @Test
-  public void testMergeAccumulatorsAllDefaultAccumulators() {
-    TimestampedValue<Long> defaultAccum = fn.createAccumulator();
-    assertEquals(defaultAccum, fn.mergeAccumulators(
-        Lists.newArrayList(defaultAccum, defaultAccum)));
-  }
-
-  @Test
-  public void testMergeAccumulatorsNullIterable() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("accumulators");
-    fn.mergeAccumulators(null);
-  }
-
-  @Test
-  public void testExtractOutput() {
-    assertEquals(TV.getValue(), fn.extractOutput(TV));
-  }
-
-  @Test
-  public void testExtractOutputDefaultAggregator() {
-    TimestampedValue<Long> accum = fn.createAccumulator();
-    assertThat(fn.extractOutput(accum), nullValue());
-  }
-
-  @Test
-  public void testExtractOutputNullValue() {
-    TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp);
-    assertEquals(null, fn.extractOutput(accum));
-  }
-
-  @Test
-  public void testAggregator() throws Exception {
-    LatestAggregatorsFn<Long> doFn = new 
LatestAggregatorsFn<>(TV_MINUS_TEN.getValue());
-    DoFnTester<Long, Long> harness = DoFnTester.of(doFn);
-    for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, 
TV_MINUS_TEN)) {
-      harness.processTimestampedElement(element);
-    }
-
-    assertEquals(TV_PLUS_TEN.getValue(), 
harness.getAggregatorValue(doFn.allValuesAgg));
-    assertEquals(TV_MINUS_TEN.getValue(), 
harness.getAggregatorValue(doFn.specialValueAgg));
-    assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue());
-  }
-
-  @Test
-  public void testDefaultCoderHandlesNull() throws CannotProvideCoderException 
{
-    Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
-
-    CoderRegistry registry = new CoderRegistry();
-    TimestampedValue.TimestampedValueCoder<Long> inputCoder =
-        TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of());
-
-    assertThat("Default output coder should handle null values",
-        fn.getDefaultOutputCoder(registry, inputCoder), 
instanceOf(NullableCoder.class));
-    assertThat("Default accumulator coder should handle null values",
-        fn.getAccumulatorCoder(registry, inputCoder), 
instanceOf(NullableCoder.class));
-  }
-
-  static class LatestAggregatorsFn<T> extends DoFn<T, T> {
-    private final T specialValue;
-    LatestAggregatorsFn(T specialValue) {
-      this.specialValue = specialValue;
-    }
-
-    Aggregator<TimestampedValue<T>, T> allValuesAgg =
-        createAggregator("allValues", new Latest.LatestFn<T>());
-
-    Aggregator<TimestampedValue<T>, T> specialValueAgg =
-        createAggregator("oneValue", new Latest.LatestFn<T>());
-
-    Aggregator<TimestampedValue<T>, T> noValuesAgg =
-        createAggregator("noValues", new Latest.LatestFn<T>());
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      TimestampedValue<T> val = TimestampedValue.of(c.element(), 
c.timestamp());
-      allValuesAgg.addValue(val);
-      if (Objects.equals(c.element(), specialValue)) {
-        specialValueAgg.addValue(val);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
deleted file mode 100644
index 44dbf4a..0000000
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing from Amazon Kinesis.
- */
-package org.apache.beam.sdk.io.kinesis;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
deleted file mode 100644
index fd08b58..0000000
--- 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing from MongoDB.
- */
-package org.apache.beam.sdk.io.mongodb;

Reply via email to