http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java
deleted file mode 100644
index 56980a1..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.cloudera.dataflow.spark.ShardNameBuilder.replaceShardNumber;
-
-public final class ShardNameTemplateHelper {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ShardNameTemplateHelper.class);
-
-  public static final String OUTPUT_FILE_PREFIX = 
"spark.dataflow.fileoutputformat.prefix";
-  public static final String OUTPUT_FILE_TEMPLATE = 
"spark.dataflow.fileoutputformat.template";
-  public static final String OUTPUT_FILE_SUFFIX = 
"spark.dataflow.fileoutputformat.suffix";
-
-  private ShardNameTemplateHelper() {
-  }
-
-  public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format,
-      TaskAttemptContext context) throws IOException {
-    FileOutputCommitter committer =
-        (FileOutputCommitter) format.getOutputCommitter(context);
-    return new Path(committer.getWorkPath(), getOutputFile(context));
-  }
-
-  private static String getOutputFile(TaskAttemptContext context) {
-    TaskID taskId = context.getTaskAttemptID().getTaskID();
-    int partition = taskId.getId();
-
-    String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX);
-    String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE);
-    String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX);
-    return filePrefix + replaceShardNumber(fileTemplate, partition) + 
fileSuffix;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
deleted file mode 100644
index d3e8c9b..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.serializer.KryoSerializer;
-
-final class SparkContextFactory {
-
-  /**
-   * If the property {@code dataflow.spark.test.reuseSparkContext} is set to
-   * {@code true} then the Spark context will be reused for dataflow pipelines.
-   * This property should only be enabled for tests.
-   */
-  static final String TEST_REUSE_SPARK_CONTEXT =
-      "dataflow.spark.test.reuseSparkContext";
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-
-  private SparkContextFactory() {
-  }
-
-  static synchronized JavaSparkContext getSparkContext(String master, String 
appName) {
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
-      if (sparkContext == null) {
-        sparkContext = createSparkContext(master, appName);
-        sparkMaster = master;
-      } else if (!master.equals(sparkMaster)) {
-        throw new IllegalArgumentException(String.format("Cannot reuse spark 
context " +
-                "with different spark master URL. Existing: %s, requested: 
%s.",
-            sparkMaster, master));
-      }
-      return sparkContext;
-    } else {
-      return createSparkContext(master, appName);
-    }
-  }
-
-  static synchronized void stopSparkContext(JavaSparkContext context) {
-    if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
-      context.stop();
-    }
-  }
-
-  private static JavaSparkContext createSparkContext(String master, String 
appName) {
-    SparkConf conf = new SparkConf();
-    conf.setMaster(master);
-    conf.setAppName(appName);
-    conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
-    return new JavaSparkContext(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java
deleted file mode 100644
index 6762180..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-/**
- * Pipeline {@link SparkPipelineRunner.Evaluator} for Spark.
- */
-public final class SparkPipelineEvaluator extends 
SparkPipelineRunner.Evaluator {
-
-  private final EvaluationContext ctxt;
-
-  public SparkPipelineEvaluator(EvaluationContext ctxt, 
SparkPipelineTranslator translator) {
-    super(translator);
-    this.ctxt = ctxt;
-  }
-
-  @Override
-  protected <PT extends PTransform<? super PInput, POutput>> void 
doVisitTransform(TransformTreeNode
-      node) {
-    @SuppressWarnings("unchecked")
-    PT transform = (PT) node.getTransform();
-    @SuppressWarnings("unchecked")
-    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
-    @SuppressWarnings("unchecked") TransformEvaluator<PT> evaluator =
-        (TransformEvaluator<PT>) translator.translate(transformClass);
-    LOG.info("Evaluating {}", transform);
-    AppliedPTransform<PInput, POutput, PT> appliedTransform =
-        AppliedPTransform.of(node.getFullName(), node.getInput(), 
node.getOutput(), transform);
-    ctxt.setCurrentTransform(appliedTransform);
-    evaluator.evaluate(transform, ctxt);
-    ctxt.setCurrentTransform(null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java
deleted file mode 100644
index e96162e..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.StreamingOptions;
-
-public interface SparkPipelineOptions extends PipelineOptions, 
StreamingOptions,
-                                              ApplicationNameOptions {
-  @Description("The url of the spark master to connect to, (e.g. 
spark://host:port, local[4]).")
-  @Default.String("local[1]")
-  String getSparkMaster();
-
-  void setSparkMaster(String master);
-
-  @Override
-  @Default.Boolean(false)
-  boolean isStreaming();
-
-  @Override
-  @Default.String("spark dataflow pipeline job")
-  String getAppName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsFactory.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsFactory.java
deleted file mode 100644
index 89cd030..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-
-public final class SparkPipelineOptionsFactory {
-  private SparkPipelineOptionsFactory() {
-  }
-
-  public static SparkPipelineOptions create() {
-    return PipelineOptionsFactory.as(SparkPipelineOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsRegistrar.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsRegistrar.java
deleted file mode 100644
index 21fe693..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsRegistrar.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-public class SparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar 
{
-  @Override
-  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-    return ImmutableList.<Class<? extends 
PipelineOptions>>of(SparkPipelineOptions.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
deleted file mode 100644
index a9c2d86..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import org.apache.spark.SparkException;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptions;
-import com.cloudera.dataflow.spark.streaming.StreamingEvaluationContext;
-import com.cloudera.dataflow.spark.streaming.StreamingTransformTranslator;
-import com.cloudera.dataflow.spark.streaming.StreamingWindowPipelineDetector;
-
-/**
- * The SparkPipelineRunner translate operations defined on a pipeline to a 
representation
- * executable by Spark, and then submitting the job to Spark to be executed. 
If we wanted to run
- * a dataflow pipeline with the default options of a single threaded spark 
instance in local mode,
- * we would do the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = SparkPipelineRunner.create().run(p);
- * }
- *
- * To create a pipeline runner to run against a different spark cluster, with 
a custom master url
- * we would do the following:
- *
- * {@code
- * Pipeline p = [logic for pipeline creation]
- * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = SparkPipelineRunner.create(options).run(p);
- * }
- *
- * To create a Spark streaming pipeline runner use {@link 
SparkStreamingPipelineOptions}
- */
-public final class SparkPipelineRunner extends 
PipelineRunner<EvaluationResult> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(SparkPipelineRunner.class);
-  /**
-   * Options used in this pipeline runner.
-   */
-  private final SparkPipelineOptions mOptions;
-
-  /**
-   * Creates and returns a new SparkPipelineRunner with default options. In 
particular, against a
-   * spark instance running in local mode.
-   *
-   * @return A pipeline runner with default options.
-   */
-  public static SparkPipelineRunner create() {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    return new SparkPipelineRunner(options);
-  }
-
-  /**
-   * Creates and returns a new SparkPipelineRunner with specified options.
-   *
-   * @param options The SparkPipelineOptions to use when executing the job.
-   * @return A pipeline runner that will execute with specified options.
-   */
-  public static SparkPipelineRunner create(SparkPipelineOptions options) {
-    return new SparkPipelineRunner(options);
-  }
-
-  /**
-   * Creates and returns a new SparkPipelineRunner with specified options.
-   *
-   * @param options The PipelineOptions to use when executing the job.
-   * @return A pipeline runner that will execute with specified options.
-   */
-  public static SparkPipelineRunner fromOptions(PipelineOptions options) {
-    SparkPipelineOptions sparkOptions =
-        PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
-    return new SparkPipelineRunner(sparkOptions);
-  }
-
-  /**
-   * No parameter constructor defaults to running this pipeline in Spark's 
local mode, in a single
-   * thread.
-   */
-  private SparkPipelineRunner(SparkPipelineOptions options) {
-    mOptions = options;
-  }
-
-
-  @Override
-  public EvaluationResult run(Pipeline pipeline) {
-    try {
-      // validate streaming configuration
-      if (mOptions.isStreaming() && !(mOptions instanceof 
SparkStreamingPipelineOptions)) {
-        throw new RuntimeException("A streaming job must be configured with " +
-            SparkStreamingPipelineOptions.class.getSimpleName() + ", found " +
-            mOptions.getClass().getSimpleName());
-      }
-      LOG.info("Executing pipeline using the SparkPipelineRunner.");
-      JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
-              .getSparkMaster(), mOptions.getAppName());
-
-      if (mOptions.isStreaming()) {
-        SparkPipelineTranslator translator =
-                new StreamingTransformTranslator.Translator(new 
TransformTranslator.Translator());
-        // if streaming - fixed window should be defined on all UNBOUNDED 
inputs
-        StreamingWindowPipelineDetector streamingWindowPipelineDetector =
-            new StreamingWindowPipelineDetector(translator);
-        pipeline.traverseTopologically(streamingWindowPipelineDetector);
-        if (!streamingWindowPipelineDetector.isWindowing()) {
-          throw new IllegalStateException("Spark streaming pipeline must be 
windowed!");
-        }
-
-        Duration batchInterval = 
streamingWindowPipelineDetector.getBatchDuration();
-        LOG.info("Setting Spark streaming batchInterval to {} msec", 
batchInterval.milliseconds());
-        EvaluationContext ctxt = createStreamingEvaluationContext(jsc, 
pipeline, batchInterval);
-
-        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, 
translator));
-        ctxt.computeOutputs();
-
-        LOG.info("Streaming pipeline construction complete. Starting 
execution..");
-        ((StreamingEvaluationContext) ctxt).getStreamingContext().start();
-
-        return ctxt;
-      } else {
-        EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
-        SparkPipelineTranslator translator = new 
TransformTranslator.Translator();
-        pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, 
translator));
-        ctxt.computeOutputs();
-
-        LOG.info("Pipeline execution complete.");
-
-        return ctxt;
-      }
-    } catch (Exception e) {
-      // Scala doesn't declare checked exceptions in the bytecode, and the 
Java compiler
-      // won't let you catch something that is not declared, so we can't catch
-      // SparkException here. Instead we do an instanceof check.
-      // Then we find the cause by seeing if it's a user exception (wrapped by 
our
-      // SparkProcessException), or just use the SparkException cause.
-      if (e instanceof SparkException && e.getCause() != null) {
-        if (e.getCause() instanceof SparkProcessContext.SparkProcessException 
&&
-                e.getCause().getCause() != null) {
-          throw new RuntimeException(e.getCause().getCause());
-        } else {
-          throw new RuntimeException(e.getCause());
-        }
-      }
-      // otherwise just wrap in a RuntimeException
-      throw new RuntimeException(e);
-    }
-  }
-
-  private EvaluationContext
-      createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
-      Duration batchDuration) {
-    SparkStreamingPipelineOptions streamingOptions = 
(SparkStreamingPipelineOptions) mOptions;
-    JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
-    return new StreamingEvaluationContext(jsc, pipeline, jssc, 
streamingOptions.getTimeout());
-  }
-
-  public abstract static class Evaluator implements Pipeline.PipelineVisitor {
-    protected static final Logger LOG = 
LoggerFactory.getLogger(Evaluator.class);
-
-    protected final SparkPipelineTranslator translator;
-
-    protected Evaluator(SparkPipelineTranslator translator) {
-      this.translator = translator;
-    }
-
-    // Set upon entering a composite node which can be directly mapped to a 
single
-    // TransformEvaluator.
-    private TransformTreeNode currentTranslatedCompositeNode;
-
-    /**
-     * If true, we're currently inside a subtree of a composite node which 
directly maps to a
-     * single
-     * TransformEvaluator; children nodes are ignored, and upon post-visiting 
the translated
-     * composite node, the associated TransformEvaluator will be visited.
-     */
-    private boolean inTranslatedCompositeNode() {
-      return currentTranslatedCompositeNode != null;
-    }
-
-    @Override
-    public void enterCompositeTransform(TransformTreeNode node) {
-      if (!inTranslatedCompositeNode() && node.getTransform() != null) {
-        @SuppressWarnings("unchecked")
-        Class<PTransform<?, ?>> transformClass =
-            (Class<PTransform<?, ?>>) node.getTransform().getClass();
-        if (translator.hasTranslation(transformClass)) {
-          LOG.info("Entering directly-translatable composite transform: '{}'", 
node.getFullName());
-          LOG.debug("Composite transform class: '{}'", transformClass);
-          currentTranslatedCompositeNode = node;
-        }
-      }
-    }
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
-      // NB: We depend on enterCompositeTransform and leaveCompositeTransform 
providing 'node'
-      // objects for which Object.equals() returns true iff they are the same 
logical node
-      // within the tree.
-      if (inTranslatedCompositeNode() && 
node.equals(currentTranslatedCompositeNode)) {
-        LOG.info("Post-visiting directly-translatable composite transform: 
'{}'",
-                node.getFullName());
-        doVisitTransform(node);
-        currentTranslatedCompositeNode = null;
-      }
-    }
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
-      if (inTranslatedCompositeNode()) {
-        LOG.info("Skipping '{}'; already in composite transform.", 
node.getFullName());
-        return;
-      }
-      doVisitTransform(node);
-    }
-
-    protected abstract <PT extends PTransform<? super PInput, POutput>> void
-        doVisitTransform(TransformTreeNode node);
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunnerRegistrar.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunnerRegistrar.java
deleted file mode 100644
index 5bdd322..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunnerRegistrar.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
-import com.google.common.collect.ImmutableList;
-
-public class SparkPipelineRunnerRegistrar implements PipelineRunnerRegistrar {
-  @Override
-  public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-    return ImmutableList.<Class<? extends 
PipelineRunner<?>>>of(SparkPipelineRunner.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
deleted file mode 100644
index d90363f..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-/**
- * Translator to support translation between Dataflow transformations and 
Spark transformations.
- */
-public interface SparkPipelineTranslator {
-
-  boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
-
-  <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> 
clazz);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
deleted file mode 100644
index 73cec25..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(SparkProcessContext.class);
-
-  private final DoFn<I, O> fn;
-  private final SparkRuntimeContext mRuntimeContext;
-  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
-
-  protected WindowedValue<I> windowedValue;
-
-  SparkProcessContext(DoFn<I, O> fn,
-      SparkRuntimeContext runtime,
-      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
-    fn.super();
-    this.fn = fn;
-    this.mRuntimeContext = runtime;
-    this.mSideInputs = sideInputs;
-  }
-
-  void setup() {
-    setupDelegateAggregators();
-  }
-
-  @Override
-  public PipelineOptions getPipelineOptions() {
-    return mRuntimeContext.getPipelineOptions();
-  }
-
-  @Override
-  public <T> T sideInput(PCollectionView<T> view) {
-    @SuppressWarnings("unchecked")
-    BroadcastHelper<Iterable<WindowedValue<?>>> broadcastHelper =
-        (BroadcastHelper<Iterable<WindowedValue<?>>>) 
mSideInputs.get(view.getTagInternal());
-    Iterable<WindowedValue<?>> contents = broadcastHelper.getValue();
-    return view.fromIterableInternal(contents);
-  }
-
-  @Override
-  public abstract void output(O output);
-
-  public abstract void output(WindowedValue<O> output);
-
-  @Override
-  public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
-    String message = "sideOutput is an unsupported operation for doFunctions, 
use a " +
-        "MultiDoFunction instead.";
-    LOG.warn(message);
-    throw new UnsupportedOperationException(message);
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant 
instant) {
-    String message =
-        "sideOutputWithTimestamp is an unsupported operation for doFunctions, 
use a " +
-            "MultiDoFunction instead.";
-    LOG.warn(message);
-    throw new UnsupportedOperationException(message);
-  }
-
-  @Override
-  public <AI, AO> Aggregator<AI, AO> createAggregatorInternal(
-      String named,
-      Combine.CombineFn<AI, ?, AO> combineFn) {
-    return mRuntimeContext.createAggregator(named, combineFn);
-  }
-
-  @Override
-  public I element() {
-    return windowedValue.getValue();
-  }
-
-  @Override
-  public void outputWithTimestamp(O output, Instant timestamp) {
-    output(WindowedValue.of(output, timestamp,
-        windowedValue.getWindows(), windowedValue.getPane()));
-  }
-
-  @Override
-  public Instant timestamp() {
-    return windowedValue.getTimestamp();
-  }
-
-  @Override
-  public BoundedWindow window() {
-    if (!(fn instanceof DoFn.RequiresWindowAccess)) {
-      throw new UnsupportedOperationException(
-          "window() is only available in the context of a DoFn marked as 
RequiresWindow.");
-    }
-    return Iterables.getOnlyElement(windowedValue.getWindows());
-  }
-
-  @Override
-  public PaneInfo pane() {
-    return windowedValue.getPane();
-  }
-
-  @Override
-  public WindowingInternals<I, O> windowingInternals() {
-    return new WindowingInternals<I, O>() {
-
-      @Override
-      public Collection<? extends BoundedWindow> windows() {
-        return windowedValue.getWindows();
-      }
-
-      @Override
-      public void outputWindowedValue(O output, Instant timestamp, Collection<?
-          extends BoundedWindow> windows, PaneInfo paneInfo) {
-        output(WindowedValue.of(output, timestamp, windows, paneInfo));
-      }
-
-      @Override
-      public StateInternals stateInternals() {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#stateInternals() is not yet supported.");
-      }
-
-      @Override
-      public TimerInternals timerInternals() {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#timerInternals() is not yet supported.");
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return windowedValue.getPane();
-      }
-
-      @Override
-      public <T> void writePCollectionViewData(TupleTag<?> tag,
-          Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws 
IOException {
-        throw new UnsupportedOperationException(
-            "WindowingInternals#writePCollectionViewData() is not yet 
supported.");
-      }
-    };
-  }
-
-  protected abstract void clearOutput();
-  protected abstract Iterator<V> getOutputIterator();
-
-  protected Iterable<V> getOutputIterable(final Iterator<WindowedValue<I>> 
iter,
-      final DoFn<I, O> doFn) {
-    return new Iterable<V>() {
-      @Override
-      public Iterator<V> iterator() {
-        return new ProcCtxtIterator(iter, doFn);
-      }
-    };
-  }
-
-  private class ProcCtxtIterator extends AbstractIterator<V> {
-
-    private final Iterator<WindowedValue<I>> inputIterator;
-    private final DoFn<I, O> doFn;
-    private Iterator<V> outputIterator;
-    private boolean calledFinish;
-
-    ProcCtxtIterator(Iterator<WindowedValue<I>> iterator, DoFn<I, O> doFn) {
-      this.inputIterator = iterator;
-      this.doFn = doFn;
-      this.outputIterator = getOutputIterator();
-    }
-
-    @Override
-    protected V computeNext() {
-      // Process each element from the (input) iterator, which produces, zero, 
one or more
-      // output elements (of type V) in the output iterator. Note that the 
output
-      // collection (and iterator) is reset between each call to 
processElement, so the
-      // collection only holds the output values for each call to 
processElement, rather
-      // than for the whole partition (which would use too much memory).
-      while (true) {
-        if (outputIterator.hasNext()) {
-          return outputIterator.next();
-        } else if (inputIterator.hasNext()) {
-          clearOutput();
-          windowedValue = inputIterator.next();
-          try {
-            doFn.processElement(SparkProcessContext.this);
-          } catch (Exception e) {
-            throw new SparkProcessException(e);
-          }
-          outputIterator = getOutputIterator();
-        } else {
-          // no more input to consume, but finishBundle can produce more output
-          if (!calledFinish) {
-            clearOutput();
-            try {
-              calledFinish = true;
-              doFn.finishBundle(SparkProcessContext.this);
-            } catch (Exception e) {
-              throw new SparkProcessException(e);
-            }
-            outputIterator = getOutputIterator();
-            continue; // try to consume outputIterator from start of loop
-          }
-          return endOfData();
-        }
-      }
-    }
-  }
-
-  static class SparkProcessException extends RuntimeException {
-    SparkProcessException(Throwable t) {
-      super(t);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
deleted file mode 100644
index ec590a9..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.Min;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableList;
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import com.cloudera.dataflow.spark.aggregators.AggAccumParam;
-import com.cloudera.dataflow.spark.aggregators.NamedAggregators;
-
-/**
- * The SparkRuntimeContext allows us to define useful features on the client 
side before our
- * data flow program is launched.
- */
-public class SparkRuntimeContext implements Serializable {
-  /**
-   * An accumulator that is a map from names to aggregators.
-   */
-  private final Accumulator<NamedAggregators> accum;
-
-  private final String serializedPipelineOptions;
-
-  /**
-   * Map fo names to dataflow aggregators.
-   */
-  private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
-  private transient CoderRegistry coderRegistry;
-
-  SparkRuntimeContext(JavaSparkContext jsc, Pipeline pipeline) {
-    this.accum = jsc.accumulator(new NamedAggregators(), new AggAccumParam());
-    this.serializedPipelineOptions = 
serializePipelineOptions(pipeline.getOptions());
-  }
-
-  private static String serializePipelineOptions(PipelineOptions 
pipelineOptions) {
-    try {
-      return new ObjectMapper().writeValueAsString(pipelineOptions);
-    } catch (JsonProcessingException e) {
-      throw new IllegalStateException("Failed to serialize the pipeline 
options.", e);
-    }
-  }
-
-  private static PipelineOptions deserializePipelineOptions(String 
serializedPipelineOptions) {
-    try {
-      return new ObjectMapper().readValue(serializedPipelineOptions, 
PipelineOptions.class);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to deserialize the pipeline 
options.", e);
-    }
-  }
-
-  /**
-   * Retrieves corresponding value of an aggregator.
-   *
-   * @param aggregatorName Name of the aggregator to retrieve the value of.
-   * @param typeClass      Type class of value to be retrieved.
-   * @param <T>            Type of object to be returned.
-   * @return The value of the aggregator.
-   */
-  public <T> T getAggregatorValue(String aggregatorName, Class<T> typeClass) {
-    return accum.value().getValue(aggregatorName, typeClass);
-  }
-
-  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> 
aggregator) {
-    @SuppressWarnings("unchecked")
-    Class<T> aggValueClass = (Class<T>) 
aggregator.getCombineFn().getOutputType().getRawType();
-    final T aggregatorValue = getAggregatorValue(aggregator.getName(), 
aggValueClass);
-    return new AggregatorValues<T>() {
-      @Override
-      public Collection<T> getValues() {
-        return ImmutableList.of(aggregatorValue);
-      }
-
-      @Override
-      public Map<String, T> getValuesAtSteps() {
-        throw new UnsupportedOperationException("getValuesAtSteps is not 
supported.");
-      }
-    };
-  }
-
-  public synchronized PipelineOptions getPipelineOptions() {
-    return deserializePipelineOptions(serializedPipelineOptions);
-  }
-
-  /**
-   * Creates and aggregator and associates it with the specified name.
-   *
-   * @param named     Name of aggregator.
-   * @param combineFn Combine function used in aggregation.
-   * @param <IN>      Type of inputs to aggregator.
-   * @param <INTER>   Intermediate data type
-   * @param <OUT>     Type of aggregator outputs.
-   * @return Specified aggregator
-   */
-  public synchronized <IN, INTER, OUT> Aggregator<IN, OUT> createAggregator(
-      String named,
-      Combine.CombineFn<? super IN, INTER, OUT> combineFn) {
-    @SuppressWarnings("unchecked")
-    Aggregator<IN, OUT> aggregator = (Aggregator<IN, OUT>) 
aggregators.get(named);
-    if (aggregator == null) {
-      @SuppressWarnings("unchecked")
-      NamedAggregators.CombineFunctionState<IN, INTER, OUT> state =
-          new NamedAggregators.CombineFunctionState<>(
-              (Combine.CombineFn<IN, INTER, OUT>) combineFn,
-              (Coder<IN>) getCoder(combineFn),
-              this);
-      accum.add(new NamedAggregators(named, state));
-      aggregator = new SparkAggregator<>(named, state);
-      aggregators.put(named, aggregator);
-    }
-    return aggregator;
-  }
-
-  public CoderRegistry getCoderRegistry() {
-    if (coderRegistry == null) {
-      coderRegistry = new CoderRegistry();
-      coderRegistry.registerStandardCoders();
-    }
-    return coderRegistry;
-  }
-
-  private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
-    try {
-      if (combiner.getClass() == Sum.SumIntegerFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Sum.SumLongFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else if (combiner.getClass() == Min.MinIntegerFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Min.MinLongFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Min.MinDoubleFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
-      } else if (combiner.getClass() == Max.MaxLongFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
-      } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
-        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
-      } else {
-        throw new IllegalArgumentException("unsupported combiner in 
Aggregator: "
-            + combiner.getClass().getName());
-      }
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalStateException("Could not determine default coder for 
combiner", e);
-    }
-  }
-
-  /**
-   * Initialize spark aggregators exactly once.
-   *
-   * @param <IN> Type of element fed in to aggregator.
-   */
-  private static class SparkAggregator<IN, OUT> implements Aggregator<IN, 
OUT>, Serializable {
-    private final String name;
-    private final NamedAggregators.State<IN, ?, OUT> state;
-
-    SparkAggregator(String name, NamedAggregators.State<IN, ?, OUT> state) {
-      this.name = name;
-      this.state = state;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public void addValue(IN elem) {
-      state.update(elem);
-    }
-
-    @Override
-    public Combine.CombineFn<IN, ?, OUT> getCombineFn() {
-      return state.getCombineFn();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java
deleted file mode 100644
index ef24137..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T>
-    implements ShardNameTemplateAware {
-
-  @Override
-  public void checkOutputSpecs(JobContext job) {
-    // don't fail if the output already exists
-  }
-
-  @Override
-  protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) 
throws IOException {
-    Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context);
-    return path.getFileSystem(context.getConfiguration()).create(path);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java
deleted file mode 100644
index 3ab07b5..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-public class TemplatedSequenceFileOutputFormat<K, V> extends 
SequenceFileOutputFormat<K, V>
-    implements ShardNameTemplateAware {
-
-  @Override
-  public void checkOutputSpecs(JobContext job) {
-    // don't fail if the output already exists
-  }
-
-  @Override
-  public Path getDefaultWorkFile(TaskAttemptContext context,
-      String extension) throws IOException {
-    // note that the passed-in extension is ignored since it comes from the 
template
-    return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java
deleted file mode 100644
index a8e218d..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V>
-    implements ShardNameTemplateAware {
-
-  @Override
-  public void checkOutputSpecs(JobContext job) {
-    // don't fail if the output already exists
-  }
-
-  @Override
-  public Path getDefaultWorkFile(TaskAttemptContext context,
-      String extension) throws IOException {
-    // note that the passed-in extension is ignored since it comes from the 
template
-    return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
deleted file mode 100644
index 52842d5..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.Serializable;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-public interface TransformEvaluator<PT extends PTransform<?, ?>> extends 
Serializable {
-  void evaluate(PT transform, EvaluationContext context);
-}

Reply via email to