[BEAM-11] Spark runner directory structure and pom setup.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41c4ca6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41c4ca6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41c4ca6a

Branch: refs/heads/master
Commit: 41c4ca6ae284692bf3abd35491b4ed638b32c283
Parents: 46412e5
Author: Sela <[email protected]>
Authored: Sat Mar 12 00:37:55 2016 +0200
Committer: Sela <[email protected]>
Committed: Tue Mar 15 20:38:26 2016 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           | 272 +++----
 .../com/cloudera/dataflow/hadoop/HadoopIO.java  | 202 -----
 .../dataflow/hadoop/NullWritableCoder.java      |  71 --
 .../cloudera/dataflow/hadoop/WritableCoder.java | 120 ---
 .../com/cloudera/dataflow/io/ConsoleIO.java     |  60 --
 .../com/cloudera/dataflow/io/CreateStream.java  |  66 --
 .../java/com/cloudera/dataflow/io/KafkaIO.java  | 128 ---
 .../dataflow/spark/BroadcastHelper.java         | 121 ---
 .../com/cloudera/dataflow/spark/ByteArray.java  |  52 --
 .../cloudera/dataflow/spark/CoderHelpers.java   | 185 -----
 .../cloudera/dataflow/spark/DoFnFunction.java   |  93 ---
 .../dataflow/spark/EvaluationContext.java       | 283 -------
 .../dataflow/spark/EvaluationResult.java        |  62 --
 .../dataflow/spark/MultiDoFnFunction.java       | 115 ---
 .../dataflow/spark/ShardNameBuilder.java        | 106 ---
 .../dataflow/spark/ShardNameTemplateAware.java  |  28 -
 .../dataflow/spark/ShardNameTemplateHelper.java |  58 --
 .../dataflow/spark/SparkContextFactory.java     |  66 --
 .../dataflow/spark/SparkPipelineEvaluator.java  |  52 --
 .../dataflow/spark/SparkPipelineOptions.java    |  39 -
 .../spark/SparkPipelineOptionsFactory.java      |  27 -
 .../spark/SparkPipelineOptionsRegistrar.java    |  27 -
 .../dataflow/spark/SparkPipelineRunner.java     | 252 ------
 .../spark/SparkPipelineRunnerRegistrar.java     |  27 -
 .../dataflow/spark/SparkPipelineTranslator.java |  27 -
 .../dataflow/spark/SparkProcessContext.java     | 250 ------
 .../dataflow/spark/SparkRuntimeContext.java     | 212 -----
 .../spark/TemplatedAvroKeyOutputFormat.java     |  40 -
 .../TemplatedSequenceFileOutputFormat.java      |  40 -
 .../spark/TemplatedTextOutputFormat.java        |  40 -
 .../dataflow/spark/TransformEvaluator.java      |  24 -
 .../dataflow/spark/TransformTranslator.java     | 800 ------------------
 .../dataflow/spark/WindowingHelpers.java        |  59 --
 .../spark/aggregators/AggAccumParam.java        |  35 -
 .../spark/aggregators/NamedAggregators.java     | 202 -----
 .../SparkStreamingPipelineOptions.java          |  40 -
 .../SparkStreamingPipelineOptionsFactory.java   |  27 -
 .../SparkStreamingPipelineOptionsRegistrar.java |  28 -
 .../streaming/StreamingEvaluationContext.java   | 226 ------
 .../streaming/StreamingTransformTranslator.java | 414 ----------
 .../StreamingWindowPipelineDetector.java        | 100 ---
 .../apache/beam/runners/spark/DoFnFunction.java |  94 +++
 .../beam/runners/spark/EvaluationContext.java   | 284 +++++++
 .../beam/runners/spark/EvaluationResult.java    |  62 ++
 .../beam/runners/spark/MultiDoFnFunction.java   | 116 +++
 .../beam/runners/spark/SparkContextFactory.java |  66 ++
 .../runners/spark/SparkPipelineEvaluator.java   |  52 ++
 .../runners/spark/SparkPipelineOptions.java     |  39 +
 .../spark/SparkPipelineOptionsFactory.java      |  27 +
 .../spark/SparkPipelineOptionsRegistrar.java    |  27 +
 .../beam/runners/spark/SparkPipelineRunner.java | 252 ++++++
 .../spark/SparkPipelineRunnerRegistrar.java     |  27 +
 .../runners/spark/SparkPipelineTranslator.java  |  27 +
 .../beam/runners/spark/SparkProcessContext.java | 257 ++++++
 .../beam/runners/spark/SparkRuntimeContext.java | 214 +++++
 .../beam/runners/spark/TransformEvaluator.java  |  24 +
 .../beam/runners/spark/TransformTranslator.java | 805 +++++++++++++++++++
 .../beam/runners/spark/WindowingHelpers.java    |  59 ++
 .../spark/aggregators/AggAccumParam.java        |  35 +
 .../spark/aggregators/NamedAggregators.java     | 202 +++++
 .../beam/runners/spark/coders/CoderHelpers.java | 186 +++++
 .../runners/spark/coders/NullWritableCoder.java |  71 ++
 .../runners/spark/coders/WritableCoder.java     | 120 +++
 .../apache/beam/runners/spark/io/ConsoleIO.java |  60 ++
 .../beam/runners/spark/io/CreateStream.java     |  66 ++
 .../apache/beam/runners/spark/io/KafkaIO.java   | 128 +++
 .../beam/runners/spark/io/hadoop/HadoopIO.java  | 200 +++++
 .../spark/io/hadoop/ShardNameBuilder.java       | 106 +++
 .../spark/io/hadoop/ShardNameTemplateAware.java |  28 +
 .../io/hadoop/ShardNameTemplateHelper.java      |  58 ++
 .../io/hadoop/TemplatedAvroKeyOutputFormat.java |  40 +
 .../TemplatedSequenceFileOutputFormat.java      |  40 +
 .../io/hadoop/TemplatedTextOutputFormat.java    |  40 +
 .../SparkStreamingPipelineOptions.java          |  40 +
 .../SparkStreamingPipelineOptionsFactory.java   |  27 +
 .../SparkStreamingPipelineOptionsRegistrar.java |  28 +
 .../streaming/StreamingEvaluationContext.java   | 226 ++++++
 .../streaming/StreamingTransformTranslator.java | 415 ++++++++++
 .../StreamingWindowPipelineDetector.java        | 101 +++
 .../runners/spark/util/BroadcastHelper.java     | 122 +++
 .../beam/runners/spark/util/ByteArray.java      |  52 ++
 ...ataflow.sdk.options.PipelineOptionsRegistrar |   4 +-
 ...dataflow.sdk.runners.PipelineRunnerRegistrar |   2 +-
 .../dataflow/hadoop/WritableCoderTest.java      |  42 -
 .../dataflow/spark/AvroPipelineTest.java        | 103 ---
 .../dataflow/spark/CombineGloballyTest.java     |  87 --
 .../dataflow/spark/CombinePerKeyTest.java       |  69 --
 .../com/cloudera/dataflow/spark/DeDupTest.java  |  55 --
 .../cloudera/dataflow/spark/DoFnOutputTest.java |  57 --
 .../cloudera/dataflow/spark/EmptyInputTest.java |  64 --
 .../spark/HadoopFileFormatPipelineTest.java     | 105 ---
 .../spark/MultiOutputWordCountTest.java         | 148 ----
 .../cloudera/dataflow/spark/NumShardsTest.java  |  89 --
 .../dataflow/spark/SerializationTest.java       | 183 -----
 .../dataflow/spark/ShardNameBuilderTest.java    |  82 --
 .../dataflow/spark/SideEffectsTest.java         |  77 --
 .../dataflow/spark/SimpleWordCountTest.java     | 117 ---
 .../spark/TestSparkPipelineOptionsFactory.java  |  34 -
 .../com/cloudera/dataflow/spark/TfIdfTest.java  |  60 --
 .../dataflow/spark/TransformTranslatorTest.java |  95 ---
 .../dataflow/spark/WindowedWordCountTest.java   |  63 --
 .../spark/streaming/FlattenStreamingTest.java   |  84 --
 .../spark/streaming/KafkaStreamingTest.java     | 133 ---
 .../streaming/SimpleStreamingWordCountTest.java |  73 --
 .../utils/DataflowAssertStreaming.java          |  39 -
 .../streaming/utils/EmbeddedKafkaCluster.java   | 314 --------
 .../beam/runners/spark/CombineGloballyTest.java |  88 ++
 .../beam/runners/spark/CombinePerKeyTest.java   |  65 ++
 .../apache/beam/runners/spark/DeDupTest.java    |  56 ++
 .../beam/runners/spark/DoFnOutputTest.java      |  58 ++
 .../beam/runners/spark/EmptyInputTest.java      |  65 ++
 .../runners/spark/MultiOutputWordCountTest.java | 132 +++
 .../beam/runners/spark/SerializationTest.java   | 177 ++++
 .../beam/runners/spark/SideEffectsTest.java     |  76 ++
 .../beam/runners/spark/SimpleWordCountTest.java | 111 +++
 .../spark/TestSparkPipelineOptionsFactory.java  |  34 +
 .../apache/beam/runners/spark/TfIdfTest.java    |  61 ++
 .../runners/spark/TransformTranslatorTest.java  |  95 +++
 .../runners/spark/WindowedWordCountTest.java    |  64 ++
 .../runners/spark/coders/WritableCoderTest.java |  42 +
 .../beam/runners/spark/io/AvroPipelineTest.java | 105 +++
 .../beam/runners/spark/io/NumShardsTest.java    |  93 +++
 .../io/hadoop/HadoopFileFormatPipelineTest.java | 107 +++
 .../spark/io/hadoop/ShardNameBuilderTest.java   |  82 ++
 .../spark/streaming/FlattenStreamingTest.java   |  84 ++
 .../spark/streaming/KafkaStreamingTest.java     | 133 +++
 .../streaming/SimpleStreamingWordCountTest.java |  73 ++
 .../utils/DataflowAssertStreaming.java          |  39 +
 .../streaming/utils/EmbeddedKafkaCluster.java   | 314 ++++++++
 129 files changed, 7113 insertions(+), 7119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 399e9e7..a060161 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -11,21 +11,136 @@ the specific language governing permissions and 
limitations under the
 License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
     <modelVersion>4.0.0</modelVersion>
-    <name>Dataflow on Spark</name>
-    <groupId>com.cloudera.dataflow.spark</groupId>
-    <artifactId>spark-dataflow</artifactId>
+
+    <parent>
+        <groupId>org.apache.beam</groupId>
+        <artifactId>runners</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>spark-runner</artifactId>
     <version>0.4.3-SNAPSHOT</version>
+
+    <name>Spark Beam Runner</name>
     <packaging>jar</packaging>
 
+    <inceptionYear>2014</inceptionYear>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <java.version>1.7</java.version>
         <spark.version>1.5.2</spark.version>
-        <google-cloud-dataflow-version>1.3.0</google-cloud-dataflow-version>
+        <beam.version>1.5.0-SNAPSHOT</beam.version>
     </properties>
 
+    <repositories>
+        <repository>
+            <id>apache.snapshots</id>
+            <name>Apache Development Snapshot Repository</name>
+            
<url>https://repository.apache.org/content/repositories/snapshots/</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.8.2.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.cloud.dataflow</groupId>
+            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
+            <version>${beam.version}</version>
+            <exclusions>
+                <!-- Use Hadoop/Spark's backend logger -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-jdk14</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.cloud.dataflow</groupId>
+            <artifactId>google-cloud-dataflow-java-examples-all</artifactId>
+            <version>${beam.version}</version>
+            <exclusions>
+                <!-- Use Hadoop/Spark's backend logger -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-jdk14</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-mapred</artifactId>
+            <version>1.7.7</version>
+            <classifier>hadoop2</classifier>
+            <exclusions>
+                <!-- exclude old Jetty version of servlet API -->
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
     <build>
         <pluginManagement>
             <plugins>
@@ -231,20 +346,20 @@ License.
                             <goals>
                                 <goal>shade</goal>
                             </goals>
-                            <configuration>
-                                <relocations>
-                                    <!-- relocate Guava used by Dataflow (v18) 
since it conflicts with version used by Hadoop (v11) -->
-                                    <relocation>
-                                        <pattern>com.google.common</pattern>
-                                        
<shadedPattern>com.cloudera.dataflow.spark.relocated.com.google.common</shadedPattern>
-                                    </relocation>
-                                </relocations>
-                                
<shadedArtifactAttached>true</shadedArtifactAttached>
-                                
<shadedClassifierName>spark-app</shadedClassifierName>
-                                <transformers>
-                                    <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
-                                </transformers>
-                            </configuration>
+                            <!--<configuration>-->
+                                <!--<relocations>-->
+                                    <!--&lt;!&ndash; relocate Guava used by 
Dataflow (v18) since it conflicts with version used by Hadoop (v11) 
&ndash;&gt;-->
+                                    <!--<relocation>-->
+                                        
<!--<pattern>com.google.common</pattern>-->
+                                        
<!--<shadedPattern>com.cloudera.dataflow.spark.relocated.com.google.common</shadedPattern>-->
+                                    <!--</relocation>-->
+                                <!--</relocations>-->
+                                
<!--<shadedArtifactAttached>true</shadedArtifactAttached>-->
+                                
<!--<shadedClassifierName>spark-app</shadedClassifierName>-->
+                                <!--<transformers>-->
+                                    <!--<transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />-->
+                                <!--</transformers>-->
+                            <!--</configuration>-->
                         </execution>
                     </executions>
                 </plugin>
@@ -274,89 +389,6 @@ License.
         </plugins>
     </build>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.10</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming_2.10</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming-kafka_2.10</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
-            <version>0.8.2.1</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>18.0</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.cloud.dataflow</groupId>
-            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
-            <version>${google-cloud-dataflow-version}</version>
-            <exclusions>
-                <!-- Use Hadoop/Spark's backend logger -->
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-jdk14</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>com.google.cloud.dataflow</groupId>
-            <artifactId>google-cloud-dataflow-java-examples-all</artifactId>
-            <version>${google-cloud-dataflow-version}</version>
-            <exclusions>
-                <!-- Use Hadoop/Spark's backend logger -->
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-jdk14</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro-mapred</artifactId>
-            <version>1.7.7</version>
-            <classifier>hadoop2</classifier>
-            <exclusions>
-                <!-- exclude old Jetty version of servlet API -->
-                <exclusion>
-                    <groupId>org.mortbay.jetty</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <!-- test dependencies -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
     <reporting>
         <plugins>
             <plugin>
@@ -380,25 +412,12 @@ License.
     </reporting>
 
 
-    <url>http://github.com/cloudera/spark-dataflow</url>
-    <inceptionYear>2014</inceptionYear>
-    <licenses>
-        <license>
-            <name>The Apache Software License, Version 2.0</name>
-            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-            <distribution>repo</distribution>
-        </license>
-    </licenses>
     <developers>
         <developer>
             <name>Cloudera, Inc.</name>
         </developer>
     </developers>
 
-    <issueManagement>
-        <system>GitHub</system>
-        <url>https://github.com/cloudera/spark-dataflow/issues</url>
-    </issueManagement>
     <scm>
         
<connection>scm:git:https://github.com/cloudera/spark-dataflow.git</connection>
         
<developerConnection>scm:git:https://github.com/cloudera/spark-dataflow.git</developerConnection>
@@ -410,31 +429,6 @@ License.
         <maven>3.2.1</maven>
     </prerequisites>
 
-    <repositories>
-        <repository>
-            <id>cloudera.repo</id>
-            
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
-            <name>Cloudera Repositories</name>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-            <snapshots>
-                <enabled>true</enabled>
-            </snapshots>
-        </repository>
-    </repositories>
-
-    <distributionManagement>
-        <repository>
-            <id>cloudera.repo</id>
-            
<url>https://repository.cloudera.com/artifactory/libs-release-local</url>
-        </repository>
-        <snapshotRepository>
-            <id>cloudera.snapshots.repo</id>
-            
<url>https://repository.cloudera.com/artifactory/libs-snapshot-local</url>
-        </snapshotRepository>
-    </distributionManagement>
-
     <profiles>
         <profile>
             <id>release-sign-artifacts</id>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
deleted file mode 100644
index c79f211..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.hadoop;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.cloud.dataflow.sdk.io.ShardNameTemplate;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import com.cloudera.dataflow.spark.ShardNameTemplateAware;
-
-public final class HadoopIO {
-
-  private HadoopIO() {
-  }
-
-  public static final class Read {
-
-    private Read() {
-    }
-
-    public static <K, V> Bound<K, V> from(String filepattern,
-        Class<? extends FileInputFormat<K, V>> format, Class<K> key, Class<V> 
value) {
-      return new Bound<>(filepattern, format, key, value);
-    }
-
-    public static class Bound<K, V> extends PTransform<PInput, 
PCollection<KV<K, V>>> {
-
-      private final String filepattern;
-      private final Class<? extends FileInputFormat<K, V>> formatClass;
-      private final Class<K> keyClass;
-      private final Class<V> valueClass;
-
-      Bound(String filepattern, Class<? extends FileInputFormat<K, V>> format, 
Class<K> key,
-          Class<V> value) {
-        Preconditions.checkNotNull(filepattern,
-                                   "need to set the filepattern of an 
HadoopIO.Read transform");
-        Preconditions.checkNotNull(format,
-                                   "need to set the format class of an 
HadoopIO.Read transform");
-        Preconditions.checkNotNull(key,
-                                   "need to set the key class of an 
HadoopIO.Read transform");
-        Preconditions.checkNotNull(value,
-                                   "need to set the value class of an 
HadoopIO.Read transform");
-        this.filepattern = filepattern;
-        this.formatClass = format;
-        this.keyClass = key;
-        this.valueClass = value;
-      }
-
-      public String getFilepattern() {
-        return filepattern;
-      }
-
-      public Class<? extends FileInputFormat<K, V>> getFormatClass() {
-        return formatClass;
-      }
-
-      public Class<V> getValueClass() {
-        return valueClass;
-      }
-
-      public Class<K> getKeyClass() {
-        return keyClass;
-      }
-
-      @Override
-      public PCollection<KV<K, V>> apply(PInput input) {
-        return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-            WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
-      }
-
-    }
-
-  }
-
-  public static final class Write {
-
-    private Write() {
-    }
-
-    public static <K, V> Bound<K, V> to(String filenamePrefix,
-        Class<? extends FileOutputFormat<K, V>> format, Class<K> key, Class<V> 
value) {
-      return new Bound<>(filenamePrefix, format, key, value);
-    }
-
-    public static class Bound<K, V> extends PTransform<PCollection<KV<K, V>>, 
PDone> {
-
-      /** The filename to write to. */
-      private final String filenamePrefix;
-      /** Suffix to use for each filename. */
-      private final String filenameSuffix;
-      /** Requested number of shards.  0 for automatic. */
-      private final int numShards;
-      /** Shard template string. */
-      private final String shardTemplate;
-      private final Class<? extends FileOutputFormat<K, V>> formatClass;
-      private final Class<K> keyClass;
-      private final Class<V> valueClass;
-      private final Map<String, String> configurationProperties;
-
-      Bound(String filenamePrefix, Class<? extends FileOutputFormat<K, V>> 
format,
-          Class<K> key,
-          Class<V> value) {
-        this(filenamePrefix, "", 0, ShardNameTemplate.INDEX_OF_MAX, format, 
key, value,
-            new HashMap<String, String>());
-      }
-
-      Bound(String filenamePrefix, String filenameSuffix, int numShards,
-          String shardTemplate, Class<? extends FileOutputFormat<K, V>> format,
-          Class<K> key, Class<V> value, Map<String, String> 
configurationProperties) {
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.formatClass = format;
-        this.keyClass = key;
-        this.valueClass = value;
-        this.configurationProperties = configurationProperties;
-      }
-
-      public Bound<K, V> withoutSharding() {
-        return new Bound<>(filenamePrefix, filenameSuffix, 1, "", formatClass,
-            keyClass, valueClass, configurationProperties);
-      }
-
-      public Bound<K, V> withConfigurationProperty(String key, String value) {
-        configurationProperties.put(key, value);
-        return this;
-      }
-
-      public String getFilenamePrefix() {
-        return filenamePrefix;
-      }
-
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
-
-      public int getNumShards() {
-        return numShards;
-      }
-
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
-
-      public Class<? extends FileOutputFormat<K, V>> getFormatClass() {
-        return formatClass;
-      }
-
-      public Class<V> getValueClass() {
-        return valueClass;
-      }
-
-      public Class<K> getKeyClass() {
-        return keyClass;
-      }
-
-      public Map<String, String> getConfigurationProperties() {
-        return configurationProperties;
-      }
-
-      @Override
-      public PDone apply(PCollection<KV<K, V>> input) {
-        Preconditions.checkNotNull(filenamePrefix,
-            "need to set the filename prefix of an HadoopIO.Write transform");
-        Preconditions.checkNotNull(formatClass,
-            "need to set the format class of an HadoopIO.Write transform");
-        Preconditions.checkNotNull(keyClass,
-            "need to set the key class of an HadoopIO.Write transform");
-        Preconditions.checkNotNull(valueClass,
-            "need to set the value class of an HadoopIO.Write transform");
-
-        
Preconditions.checkArgument(ShardNameTemplateAware.class.isAssignableFrom(formatClass),
-            "Format class must implement " + 
ShardNameTemplateAware.class.getName());
-
-        return PDone.in(input.getPipeline());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java
deleted file mode 100644
index 5e5d391..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.hadoop;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import org.apache.hadoop.io.NullWritable;
-
-public final class NullWritableCoder extends WritableCoder<NullWritable> {
-  private static final long serialVersionUID = 1L;
-
-  @JsonCreator
-  public static NullWritableCoder of() {
-    return INSTANCE;
-  }
-
-  private static final NullWritableCoder INSTANCE = new NullWritableCoder();
-
-  private NullWritableCoder() {
-    super(NullWritable.class);
-  }
-
-  @Override
-  public void encode(NullWritable value, OutputStream outStream, Context 
context) {
-    // nothing to write
-  }
-
-  @Override
-  public NullWritable decode(InputStream inStream, Context context) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * Returns true since registerByteSizeObserver() runs in constant time.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(NullWritable value, Context 
context) {
-    return true;
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(NullWritable value, Context 
context) {
-    return 0;
-  }
-
-  @Override
-  public void verifyDeterministic() throws Coder.NonDeterministicException {
-    // NullWritableCoder is deterministic
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
deleted file mode 100644
index 324b203..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.hadoop;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
-/**
- * A {@code WritableCoder} is a {@link Coder} for a Java class that implements 
{@link Writable}.
- *
- * <p> To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- *   PCollection<MyRecord> records =
- *       foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder
- */
-public class WritableCoder<T extends Writable> extends StandardCoder<T> {
-  private static final long serialVersionUID = 0L;
-
-  /**
-   * Returns a {@code WritableCoder} instance for the provided element class.
-   * @param <T> the element type
-   * @param clazz the element class
-   * @return a {@code WritableCoder} instance for the provided element class
-   */
-  public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) {
-    if (clazz.equals(NullWritable.class)) {
-      @SuppressWarnings("unchecked")
-      WritableCoder<T> result = (WritableCoder<T>) NullWritableCoder.of();
-      return result;
-    }
-    return new WritableCoder<>(clazz);
-  }
-
-  @JsonCreator
-  @SuppressWarnings("unchecked")
-  public static WritableCoder<?> of(@JsonProperty("type") String classType)
-      throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(classType);
-    if (!Writable.class.isAssignableFrom(clazz)) {
-      throw new ClassNotFoundException(
-          "Class " + classType + " does not implement Writable");
-    }
-    return of((Class<? extends Writable>) clazz);
-  }
-
-  private final Class<T> type;
-
-  public WritableCoder(Class<T> type) {
-    this.type = type;
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws 
IOException {
-    value.write(new DataOutputStream(outStream));
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    try {
-      T t = type.getConstructor().newInstance();
-      t.readFields(new DataInputStream(inStream));
-      return t;
-    } catch (NoSuchMethodException | InstantiationException | 
IllegalAccessException e) {
-      throw new CoderException("unable to deserialize record", e);
-    } catch (InvocationTargetException ite) {
-      throw new CoderException("unable to deserialize record", ite.getCause());
-    }
-  }
-
-  @Override
-  public List<Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    result.put("type", type.getName());
-    return result;
-  }
-
-  @Override
-  public void verifyDeterministic() throws Coder.NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Hadoop Writable may be non-deterministic.");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java
deleted file mode 100644
index bc19b39..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.io;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-/**
- * Print to console.
- */
-public final class ConsoleIO {
-
-  private ConsoleIO() {
-  }
-
-  public static final class Write {
-
-    private Write() {
-    }
-
-    public static <T> Unbound<T> from() {
-      return new Unbound<>(10);
-    }
-
-    public static <T> Unbound<T> from(int num) {
-      return new Unbound<>(num);
-    }
-
-    public static class Unbound<T> extends PTransform<PCollection<T>, PDone> {
-
-      private final int num;
-
-      Unbound(int num) {
-        this.num = num;
-      }
-
-      public int getNum() {
-        return num;
-      }
-
-      @Override
-      public PDone apply(PCollection<T> input) {
-        return PDone.in(input.getPipeline());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java
deleted file mode 100644
index 9a99278..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.io;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.base.Preconditions;
-
-/**
- * Create an input stream from Queue.
- *
- * @param <T> stream type
- */
-public final class CreateStream<T> {
-
-  private CreateStream() {
-  }
-
-  /**
-   * Define the input stream to create from queue.
-   *
-   * @param queuedValues  defines the input stream
-   * @param <T>           stream type
-   * @return the queue that defines the input stream
-   */
-  public static <T> QueuedValues<T> fromQueue(Iterable<Iterable<T>> 
queuedValues) {
-    return new QueuedValues<>(queuedValues);
-  }
-
-  public static final class QueuedValues<T> extends PTransform<PInput, 
PCollection<T>> {
-
-    private final Iterable<Iterable<T>> queuedValues;
-
-    QueuedValues(Iterable<Iterable<T>> queuedValues) {
-      Preconditions.checkNotNull(queuedValues,
-              "need to set the queuedValues of an Create.QueuedValues 
transform");
-      this.queuedValues = queuedValues;
-    }
-
-    public Iterable<Iterable<T>> getQueuedValues() {
-      return queuedValues;
-    }
-
-    @Override
-    public PCollection<T> apply(PInput input) {
-      // Spark streaming micro batches are bounded by default
-      return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-          WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java
deleted file mode 100644
index 154e6da..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.io;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.base.Preconditions;
-
-import kafka.serializer.Decoder;
-
-/**
- * Read stream from Kafka.
- */
-public final class KafkaIO {
-
-  private KafkaIO() {
-  }
-
-  public static final class Read {
-
-    private Read() {
-    }
-
-    /**
-     * Define the Kafka consumption.
-     *
-     * @param keyDecoder    {@link Decoder} to decode the Kafka message key
-     * @param valueDecoder  {@link Decoder} to decode the Kafka message value
-     * @param key           Kafka message key Class
-     * @param value         Kafka message value Class
-     * @param topics        Kafka topics to subscribe
-     * @param kafkaParams   map of Kafka parameters
-     * @param <K>           Kafka message key Class type
-     * @param <V>           Kafka message value Class type
-     * @return KafkaIO Unbound input
-     */
-    public static <K, V> Unbound<K, V> from(Class<? extends Decoder<K>> 
keyDecoder,
-                                            Class<? extends Decoder<V>> 
valueDecoder,
-                                            Class<K> key,
-                                            Class<V> value, Set<String> topics,
-                                            Map<String, String> kafkaParams) {
-      return new Unbound<>(keyDecoder, valueDecoder, key, value, topics, 
kafkaParams);
-    }
-
-    public static class Unbound<K, V> extends PTransform<PInput, 
PCollection<KV<K, V>>> {
-
-      private final Class<? extends Decoder<K>> keyDecoderClass;
-      private final Class<? extends Decoder<V>> valueDecoderClass;
-      private final Class<K> keyClass;
-      private final Class<V> valueClass;
-      private final Set<String> topics;
-      private final Map<String, String> kafkaParams;
-
-      Unbound(Class<? extends Decoder<K>> keyDecoder,
-              Class<? extends Decoder<V>> valueDecoder, Class<K> key,
-              Class<V> value, Set<String> topics, Map<String, String> 
kafkaParams) {
-        Preconditions.checkNotNull(keyDecoder,
-            "need to set the key decoder class of a KafkaIO.Read transform");
-        Preconditions.checkNotNull(valueDecoder,
-            "need to set the value decoder class of a KafkaIO.Read transform");
-        Preconditions.checkNotNull(key,
-            "need to set the key class of aKafkaIO.Read transform");
-        Preconditions.checkNotNull(value,
-            "need to set the value class of a KafkaIO.Read transform");
-        Preconditions.checkNotNull(topics,
-            "need to set the topics of a KafkaIO.Read transform");
-        Preconditions.checkNotNull(kafkaParams,
-            "need to set the kafkaParams of a KafkaIO.Read transform");
-        this.keyDecoderClass = keyDecoder;
-        this.valueDecoderClass = valueDecoder;
-        this.keyClass = key;
-        this.valueClass = value;
-        this.topics = topics;
-        this.kafkaParams = kafkaParams;
-      }
-
-      public Class<? extends Decoder<K>> getKeyDecoderClass() {
-        return keyDecoderClass;
-      }
-
-      public Class<? extends Decoder<V>> getValueDecoderClass() {
-        return valueDecoderClass;
-      }
-
-      public Class<V> getValueClass() {
-        return valueClass;
-      }
-
-      public Class<K> getKeyClass() {
-        return keyClass;
-      }
-
-      public Set<String> getTopics() {
-        return topics;
-      }
-
-      public Map<String, String> getKafkaParams() {
-        return kafkaParams;
-      }
-
-      @Override
-      public PCollection<KV<K, V>> apply(PInput input) {
-        // Spark streaming micro batches are bounded by default
-        return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-            WindowingStrategy.globalDefault(), 
PCollection.IsBounded.UNBOUNDED);
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
deleted file mode 100644
index 8dca939..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract class BroadcastHelper<T> implements Serializable {
-
-  /**
-   * If the property {@code dataflow.spark.directBroadcast} is set to
-   * {@code true} then Spark serialization (Kryo) will be used to broadcast 
values
-   * in View objects. By default this property is not set, and values are 
coded using
-   * the appropriate {@link Coder}.
-   */
-  public static final String DIRECT_BROADCAST = 
"dataflow.spark.directBroadcast";
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(BroadcastHelper.class);
-
-  public static <T> BroadcastHelper<T> create(T value, Coder<T> coder) {
-    if (Boolean.parseBoolean(System.getProperty(DIRECT_BROADCAST, "false"))) {
-      return new DirectBroadcastHelper<>(value);
-    }
-    return new CodedBroadcastHelper<>(value, coder);
-  }
-
-  public abstract T getValue();
-
-  public abstract void broadcast(JavaSparkContext jsc);
-
-  /**
-   * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that relies on the 
underlying
-   * Spark serialization (Kryo) to broadcast values. This is appropriate when
-   * broadcasting very large values, since no copy of the object is made.
-   * @param <T>
-   */
-  static class DirectBroadcastHelper<T> extends BroadcastHelper<T> {
-    private Broadcast<T> bcast;
-    private transient T value;
-
-    DirectBroadcastHelper(T value) {
-      this.value = value;
-    }
-
-    @Override
-    public synchronized T getValue() {
-      if (value == null) {
-        value = bcast.getValue();
-      }
-      return value;
-    }
-
-    @Override
-    public void broadcast(JavaSparkContext jsc) {
-      this.bcast = jsc.broadcast(value);
-    }
-  }
-
-  /**
-   * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that uses a
-   * {@link Coder} to encode values as byte arrays
-   * before broadcasting.
-   * @param <T>
-   */
-  static class CodedBroadcastHelper<T> extends BroadcastHelper<T> {
-    private Broadcast<byte[]> bcast;
-    private final Coder<T> coder;
-    private transient T value;
-
-    CodedBroadcastHelper(T value, Coder<T> coder) {
-      this.value = value;
-      this.coder = coder;
-    }
-
-    @Override
-    public synchronized T getValue() {
-      if (value == null) {
-        value = deserialize();
-      }
-      return value;
-    }
-
-    @Override
-    public void broadcast(JavaSparkContext jsc) {
-      this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
-    }
-
-    private T deserialize() {
-      T val;
-      try {
-        val = coder.decode(new ByteArrayInputStream(bcast.value()),
-            new Coder.Context(true));
-      } catch (IOException ioe) {
-        // this should not ever happen, log it if it does.
-        LOG.warn(ioe.getMessage());
-        val = null;
-      }
-      return val;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
deleted file mode 100644
index 06db572..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.dataflow.spark;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-import com.google.common.primitives.UnsignedBytes;
-
-class ByteArray implements Serializable, Comparable<ByteArray> {
-
-  private final byte[] value;
-
-  ByteArray(byte[] value) {
-    this.value = value;
-  }
-
-  public byte[] getValue() {
-    return value;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    ByteArray byteArray = (ByteArray) o;
-    return Arrays.equals(value, byteArray.value);
-  }
-
-  @Override
-  public int hashCode() {
-    return value != null ? Arrays.hashCode(value) : 0;
-  }
-
-  @Override
-  public int compareTo(ByteArray other) {
-    return UnsignedBytes.lexicographicalComparator().compare(value, 
other.value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
deleted file mode 100644
index 0ae06c1..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.common.collect.Iterables;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
-
-/**
- * Serialization utility class.
- */
-public final class CoderHelpers {
-  private CoderHelpers() {
-  }
-
-  /**
-   * Utility method for serializing an object using the specified coder.
-   *
-   * @param value Value to serialize.
-   * @param coder Coder to serialize with.
-   * @param <T> type of value that is serialized
-   * @return Byte array representing serialized object.
-   */
-  static <T> byte[] toByteArray(T value, Coder<T> coder) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try {
-      coder.encode(value, baos, new Coder.Context(true));
-    } catch (IOException e) {
-      throw new IllegalStateException("Error encoding value: " + value, e);
-    }
-    return baos.toByteArray();
-  }
-
-  /**
-   * Utility method for serializing a Iterable of values using the specified 
coder.
-   *
-   * @param values Values to serialize.
-   * @param coder  Coder to serialize with.
-   * @param <T> type of value that is serialized
-   * @return List of bytes representing serialized objects.
-   */
-  static <T> List<byte[]> toByteArrays(Iterable<T> values, Coder<T> coder) {
-    List<byte[]> res = new LinkedList<>();
-    for (T value : values) {
-      res.add(toByteArray(value, coder));
-    }
-    return res;
-  }
-
-  /**
-   * Utility method for deserializing a byte array using the specified coder.
-   *
-   * @param serialized bytearray to be deserialized.
-   * @param coder      Coder to deserialize with.
-   * @param <T>        Type of object to be returned.
-   * @return Deserialized object.
-   */
-  static <T> T fromByteArray(byte[] serialized, Coder<T> coder) {
-    ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
-    try {
-      return coder.decode(bais, new Coder.Context(true));
-    } catch (IOException e) {
-      throw new IllegalStateException("Error decoding bytes for coder: " + 
coder, e);
-    }
-  }
-
-  /**
-   * A function wrapper for converting an object to a bytearray.
-   *
-   * @param coder Coder to serialize with.
-   * @param <T>   The type of the object being serialized.
-   * @return A function that accepts an object and returns its 
coder-serialized form.
-   */
-  static <T> Function<T, byte[]> toByteFunction(final Coder<T> coder) {
-    return new Function<T, byte[]>() {
-      @Override
-      public byte[] call(T t) throws Exception {
-        return toByteArray(t, coder);
-      }
-    };
-  }
-
-  /**
-   * A function wrapper for converting a byte array to an object.
-   *
-   * @param coder Coder to deserialize with.
-   * @param <T>   The type of the object being deserialized.
-   * @return A function that accepts a byte array and returns its 
corresponding object.
-   */
-  static <T> Function<byte[], T> fromByteFunction(final Coder<T> coder) {
-    return new Function<byte[], T>() {
-      @Override
-      public T call(byte[] bytes) throws Exception {
-        return fromByteArray(bytes, coder);
-      }
-    };
-  }
-
-  /**
-   * A function wrapper for converting a key-value pair to a byte array pair.
-   *
-   * @param keyCoder Coder to serialize keys.
-   * @param valueCoder Coder to serialize values.
-   * @param <K>   The type of the key being serialized.
-   * @param <V>   The type of the value being serialized.
-   * @return A function that accepts a key-value pair and returns a pair of 
byte arrays.
-   */
-  static <K, V> PairFunction<Tuple2<K, V>, ByteArray, byte[]> toByteFunction(
-      final Coder<K> keyCoder, final Coder<V> valueCoder) {
-    return new PairFunction<Tuple2<K, V>, ByteArray, byte[]>() {
-      @Override
-      public Tuple2<ByteArray, byte[]> call(Tuple2<K, V> kv) {
-        return new Tuple2<>(new ByteArray(toByteArray(kv._1(), keyCoder)), 
toByteArray(kv._2(),
-            valueCoder));
-      }
-    };
-  }
-
-  /**
-   * A function wrapper for converting a byte array pair to a key-value pair.
-   *
-   * @param keyCoder Coder to deserialize keys.
-   * @param valueCoder Coder to deserialize values.
-   * @param <K>   The type of the key being deserialized.
-   * @param <V>   The type of the value being deserialized.
-   * @return A function that accepts a pair of byte arrays and returns a 
key-value pair.
-   */
-  static <K, V> PairFunction<Tuple2<ByteArray, byte[]>, K, V> fromByteFunction(
-      final Coder<K> keyCoder, final Coder<V> valueCoder) {
-    return new PairFunction<Tuple2<ByteArray, byte[]>, K, V>() {
-      @Override
-      public Tuple2<K, V> call(Tuple2<ByteArray, byte[]> tuple) {
-        return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
-            fromByteArray(tuple._2(), valueCoder));
-      }
-    };
-  }
-
-  /**
-   * A function wrapper for converting a byte array pair to a key-value pair, 
where
-   * values are {@link Iterable}.
-   *
-   * @param keyCoder Coder to deserialize keys.
-   * @param valueCoder Coder to deserialize values.
-   * @param <K>   The type of the key being deserialized.
-   * @param <V>   The type of the value being deserialized.
-   * @return A function that accepts a pair of byte arrays and returns a 
key-value pair.
-   */
-  static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, 
Iterable<V>>
-      fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> 
valueCoder) {
-    return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, 
Iterable<V>>() {
-      @Override
-      public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> 
tuple) {
-        return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
-          Iterables.transform(tuple._2(), new 
com.google.common.base.Function<byte[], V>() {
-            @Override
-            public V apply(byte[] bytes) {
-              return fromByteArray(bytes, valueCoder);
-            }
-          }));
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
deleted file mode 100644
index 2bcfec3..0000000
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import org.apache.spark.api.java.function.FlatMapFunction;
-
-/**
- * Dataflow's Do functions correspond to Spark's FlatMap functions.
- *
- * @param <I> Input element type.
- * @param <O> Output element type.
- */
-public class DoFnFunction<I, O> implements 
FlatMapFunction<Iterator<WindowedValue<I>>,
-    WindowedValue<O>> {
-  private final DoFn<I, O> mFunction;
-  private final SparkRuntimeContext mRuntimeContext;
-  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
-
-  /**
-   * @param fn         DoFunction to be wrapped.
-   * @param runtime    Runtime to apply function in.
-   * @param sideInputs Side inputs used in DoFunction.
-   */
-  public DoFnFunction(DoFn<I, O> fn,
-               SparkRuntimeContext runtime,
-               Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
-    this.mFunction = fn;
-    this.mRuntimeContext = runtime;
-    this.mSideInputs = sideInputs;
-  }
-
-  @Override
-  public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) 
throws
-      Exception {
-    ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
-    ctxt.setup();
-    mFunction.startBundle(ctxt);
-    return ctxt.getOutputIterable(iter, mFunction);
-  }
-
-  private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> {
-
-    private final List<WindowedValue<O>> outputs = new LinkedList<>();
-
-    ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, 
Map<TupleTag<?>,
-        BroadcastHelper<?>> sideInputs) {
-      super(fn, runtimeContext, sideInputs);
-    }
-
-    @Override
-    public synchronized void output(O o) {
-      outputs.add(windowedValue != null ? windowedValue.withValue(o) :
-          WindowedValue.valueInEmptyWindows(o));
-    }
-
-    @Override
-    public synchronized void output(WindowedValue<O> o) {
-      outputs.add(o);
-    }
-
-    @Override
-    protected void clearOutput() {
-      outputs.clear();
-    }
-
-    @Override
-    protected Iterator<WindowedValue<O>> getOutputIterator() {
-      return outputs.iterator();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
deleted file mode 100644
index a6ac6c2..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaSparkContext;
-
-
-/**
- * Evaluation context allows us to define how pipeline instructions.
- */
-public class EvaluationContext implements EvaluationResult {
-  private final JavaSparkContext jsc;
-  private final Pipeline pipeline;
-  private final SparkRuntimeContext runtime;
-  private final Map<PValue, RDDHolder<?>> pcollections = new LinkedHashMap<>();
-  private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>();
-  private final Set<PValue> multireads = new LinkedHashSet<>();
-  private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
-  private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new 
LinkedHashMap<>();
-  protected AppliedPTransform<?, ?, ?> currentTransform;
-
-  public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
-    this.jsc = jsc;
-    this.pipeline = pipeline;
-    this.runtime = new SparkRuntimeContext(jsc, pipeline);
-  }
-
-  /**
-   * Holds an RDD or values for deferred conversion to an RDD if needed. 
PCollections are
-   * sometimes created from a collection of objects (using RDD parallelize) 
and then
-   * only used to create View objects; in which case they do not need to be
-   * converted to bytes since they are not transferred across the network 
until they are
-   * broadcast.
-   */
-  private class RDDHolder<T> {
-
-    private Iterable<T> values;
-    private Coder<T> coder;
-    private JavaRDDLike<WindowedValue<T>, ?> rdd;
-
-    RDDHolder(Iterable<T> values, Coder<T> coder) {
-      this.values = values;
-      this.coder = coder;
-    }
-
-    RDDHolder(JavaRDDLike<WindowedValue<T>, ?> rdd) {
-      this.rdd = rdd;
-    }
-
-    JavaRDDLike<WindowedValue<T>, ?> getRDD() {
-      if (rdd == null) {
-        Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values,
-            new Function<T, WindowedValue<T>>() {
-            @Override
-            public WindowedValue<T> apply(T t) {
-             // TODO: this is wrong if T is a TimestampedValue
-              return WindowedValue.valueInEmptyWindows(t);
-            }
-        });
-        WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
-            WindowedValue.getValueOnlyCoder(coder);
-        rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, 
windowCoder))
-            .map(CoderHelpers.fromByteFunction(windowCoder));
-      }
-      return rdd;
-    }
-
-    Iterable<T> getValues(PCollection<T> pcollection) {
-      if (values == null) {
-        coder = pcollection.getCoder();
-        JavaRDDLike<byte[], ?> bytesRDD = 
rdd.map(WindowingHelpers.<T>unwindowFunction())
-            .map(CoderHelpers.toByteFunction(coder));
-        List<byte[]> clientBytes = bytesRDD.collect();
-        values = Iterables.transform(clientBytes, new Function<byte[], T>() {
-          @Override
-          public T apply(byte[] bytes) {
-            return CoderHelpers.fromByteArray(bytes, coder);
-          }
-        });
-      }
-      return values;
-    }
-
-    Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
-      return Iterables.transform(get(pcollection), new Function<T, 
WindowedValue<T>>() {
-        @Override
-        public WindowedValue<T> apply(T t) {
-          return WindowedValue.valueInEmptyWindows(t); // TODO: not the right 
place?
-        }
-      });
-    }
-  }
-
-  protected JavaSparkContext getSparkContext() {
-    return jsc;
-  }
-
-  protected Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  protected SparkRuntimeContext getRuntimeContext() {
-    return runtime;
-  }
-
-  protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
-    this.currentTransform = transform;
-  }
-
-  protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
-    return currentTransform;
-  }
-
-  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
-    checkArgument(currentTransform != null && currentTransform.getTransform() 
== transform,
-        "can only be called with current transform");
-    @SuppressWarnings("unchecked")
-    I input = (I) currentTransform.getInput();
-    return input;
-  }
-
-  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
-    checkArgument(currentTransform != null && currentTransform.getTransform() 
== transform,
-        "can only be called with current transform");
-    @SuppressWarnings("unchecked")
-    O output = (O) currentTransform.getOutput();
-    return output;
-  }
-
-  protected  <T> void setOutputRDD(PTransform<?, ?> transform,
-      JavaRDDLike<WindowedValue<T>, ?> rdd) {
-    setRDD((PValue) getOutput(transform), rdd);
-  }
-
-  protected  <T> void setOutputRDDFromValues(PTransform<?, ?> transform, 
Iterable<T> values,
-      Coder<T> coder) {
-    pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, 
coder));
-  }
-
-  void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
-    pview.put(view, value);
-  }
-
-  protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
-    PValue pvalue = (PValue) getOutput(transform);
-    return pcollections.containsKey(pvalue);
-  }
-
-  protected JavaRDDLike<?, ?> getRDD(PValue pvalue) {
-    RDDHolder<?> rddHolder = pcollections.get(pvalue);
-    JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
-    leafRdds.remove(rddHolder);
-    if (multireads.contains(pvalue)) {
-      // Ensure the RDD is marked as cached
-      rdd.rdd().cache();
-    } else {
-      multireads.add(pvalue);
-    }
-    return rdd;
-  }
-
-  protected <T> void setRDD(PValue pvalue, JavaRDDLike<WindowedValue<T>, ?> 
rdd) {
-    try {
-      rdd.rdd().setName(pvalue.getName());
-    } catch (IllegalStateException e) {
-      // name not set, ignore
-    }
-    RDDHolder<T> rddHolder = new RDDHolder<>(rdd);
-    pcollections.put(pvalue, rddHolder);
-    leafRdds.add(rddHolder);
-  }
-
-  JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) {
-    return getRDD((PValue) getInput(transform));
-  }
-
-
-  <T> Iterable<? extends WindowedValue<?>> 
getPCollectionView(PCollectionView<T> view) {
-    return pview.get(view);
-  }
-
-  /**
-   * Computes the outputs for all RDDs that are leaves in the DAG and do not 
have any
-   * actions (like saving to a file) registered on them (i.e. they are 
performed for side
-   * effects).
-   */
-  protected void computeOutputs() {
-    for (RDDHolder<?> rddHolder : leafRdds) {
-      JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
-      rdd.rdd().cache(); // cache so that any subsequent get() is cheap
-      rdd.count(); // force the RDD to be computed
-    }
-  }
-
-  @Override
-  public <T> T get(PValue value) {
-    if (pobjects.containsKey(value)) {
-      @SuppressWarnings("unchecked")
-      T result = (T) pobjects.get(value);
-      return result;
-    }
-    if (pcollections.containsKey(value)) {
-      JavaRDDLike<?, ?> rdd = pcollections.get(value).getRDD();
-      @SuppressWarnings("unchecked")
-      T res = (T) Iterables.getOnlyElement(rdd.collect());
-      pobjects.put(value, res);
-      return res;
-    }
-    throw new IllegalStateException("Cannot resolve un-known PObject: " + 
value);
-  }
-
-  @Override
-  public <T> T getAggregatorValue(String named, Class<T> resultType) {
-    return runtime.getAggregatorValue(named, resultType);
-  }
-
-  @Override
-  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> 
aggregator)
-      throws AggregatorRetrievalException {
-    return runtime.getAggregatorValues(aggregator);
-  }
-
-  @Override
-  public <T> Iterable<T> get(PCollection<T> pcollection) {
-    @SuppressWarnings("unchecked")
-    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getValues(pcollection);
-  }
-
-  <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) 
{
-    @SuppressWarnings("unchecked")
-    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getWindowedValues(pcollection);
-  }
-
-  @Override
-  public void close() {
-    SparkContextFactory.stopSparkContext(jsc);
-  }
-
-  /** The runner is blocking. */
-  @Override
-  public State getState() {
-    return State.DONE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
deleted file mode 100644
index aad029a..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-/**
- * Interface for retrieving the result(s) of running a pipeline. Allows us to 
translate between
- * {@code PObject<T>}s or {@code PCollection<T>}s and Ts or collections of Ts.
- */
-public interface EvaluationResult extends PipelineResult {
-  /**
-   * Retrieves an iterable of results associated with the PCollection passed 
in.
-   *
-   * @param pcollection Collection we wish to translate.
-   * @param <T>         Type of elements contained in collection.
-   * @return Natively types result associated with collection.
-   */
-  <T> Iterable<T> get(PCollection<T> pcollection);
-
-  /**
-   * Retrieve an object of Type T associated with the PValue passed in.
-   *
-   * @param pval PValue to retrieve associated data for.
-   * @param <T>  Type of object to return.
-   * @return Native object.
-   */
-  <T> T get(PValue pval);
-
-  /**
-   * Retrieves the final value of the aggregator.
-   *
-   * @param aggName    name of aggregator.
-   * @param resultType Class of final result of aggregation.
-   * @param <T>        Type of final result of aggregation.
-   * @return Result of aggregation associated with specified name.
-   */
-  <T> T getAggregatorValue(String aggName, Class<T> resultType);
-
-  /**
-   * Releases any runtime resources, including distributed-execution contexts 
currently held by
-   * this EvaluationResult; once close() has been called,
-   * {@link EvaluationResult#get(PCollection)} might
-   * not work for subsequent calls.
-   */
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
deleted file mode 100644
index d269788..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.joda.time.Instant;
-import scala.Tuple2;
-
-/**
- * DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by 
enriching the
- * underlying data with multiple TupleTags.
- *
- * @param <I> Input type for DoFunction.
- * @param <O> Output type for DoFunction.
- */
-class MultiDoFnFunction<I, O>
-    implements PairFlatMapFunction<Iterator<WindowedValue<I>>, TupleTag<?>, 
WindowedValue<?>> {
-  private final DoFn<I, O> mFunction;
-  private final SparkRuntimeContext mRuntimeContext;
-  private final TupleTag<O> mMainOutputTag;
-  private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
-
-  MultiDoFnFunction(
-      DoFn<I, O> fn,
-      SparkRuntimeContext runtimeContext,
-      TupleTag<O> mainOutputTag,
-      Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
-    this.mFunction = fn;
-    this.mRuntimeContext = runtimeContext;
-    this.mMainOutputTag = mainOutputTag;
-    this.mSideInputs = sideInputs;
-  }
-
-  @Override
-  public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>>
-      call(Iterator<WindowedValue<I>> iter) throws Exception {
-    ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
-    mFunction.startBundle(ctxt);
-    ctxt.setup();
-    return ctxt.getOutputIterable(iter, mFunction);
-  }
-
-  private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, 
WindowedValue<?>>> {
-
-    private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = 
LinkedListMultimap.create();
-
-    ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, 
Map<TupleTag<?>,
-        BroadcastHelper<?>> sideInputs) {
-      super(fn, runtimeContext, sideInputs);
-    }
-
-    @Override
-    public synchronized void output(O o) {
-      outputs.put(mMainOutputTag, windowedValue.withValue(o));
-    }
-
-    @Override
-    public synchronized void output(WindowedValue<O> o) {
-      outputs.put(mMainOutputTag, o);
-    }
-
-    @Override
-    public synchronized <T> void sideOutput(TupleTag<T> tag, T t) {
-      outputs.put(tag, windowedValue.withValue(t));
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant 
instant) {
-      outputs.put(tupleTag, WindowedValue.of(t, instant,
-          windowedValue.getWindows(), windowedValue.getPane()));
-    }
-
-    @Override
-    protected void clearOutput() {
-      outputs.clear();
-    }
-
-    @Override
-    protected Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> 
getOutputIterator() {
-      return Iterators.transform(outputs.entries().iterator(),
-          new Function<Map.Entry<TupleTag<?>, WindowedValue<?>>,
-              Tuple2<TupleTag<?>, WindowedValue<?>>>() {
-        @Override
-        public Tuple2<TupleTag<?>, WindowedValue<?>> 
apply(Map.Entry<TupleTag<?>,
-            WindowedValue<?>> input) {
-          return new Tuple2<TupleTag<?>, WindowedValue<?>>(input.getKey(), 
input.getValue());
-        }
-      });
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameBuilder.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameBuilder.java 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameBuilder.java
deleted file mode 100644
index f53b6d9..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameBuilder.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.Path;
-
-final class ShardNameBuilder {
-
-  private ShardNameBuilder() {
-  }
-
-  /**
-   * Replace occurrences of uppercase letters 'N' with the given 
{code}shardCount{code},
-   * left-padded with zeros if necessary.
-   * @see com.google.cloud.dataflow.sdk.io.ShardNameTemplate
-   * @param template the string template containing uppercase letters 'N'
-   * @param shardCount the total number of shards
-   * @return a string template with 'N' replaced by the shard count
-   */
-  public static String replaceShardCount(String template, int shardCount) {
-    return replaceShardPattern(template, "N+", shardCount);
-  }
-
-  /**
-   * Replace occurrences of uppercase letters 'S' with the given 
{code}shardNumber{code},
-   * left-padded with zeros if necessary.
-   * @see com.google.cloud.dataflow.sdk.io.ShardNameTemplate
-   * @param template the string template containing uppercase letters 'S'
-   * @param shardNumber the number of a particular shard
-   * @return a string template with 'S' replaced by the shard number
-   */
-  public static String replaceShardNumber(String template, int shardNumber) {
-    return replaceShardPattern(template, "S+", shardNumber);
-  }
-
-  private static String replaceShardPattern(String template, String pattern, 
int n) {
-    Pattern p = Pattern.compile(pattern);
-    Matcher m = p.matcher(template);
-    StringBuffer sb = new StringBuffer();
-    while (m.find()) {
-      // replace pattern with a String format string:
-      // index 1, zero-padding flag (0), width length of matched pattern, 
decimal conversion
-      m.appendReplacement(sb, "%1\\$0" + m.group().length() + "d");
-    }
-    m.appendTail(sb);
-    return String.format(sb.toString(), n);
-  }
-
-  /**
-   * @param pathPrefix a relative or absolute path
-   * @param template a template string
-   * @return the output directory for the given prefix, template and suffix
-   */
-  public static String getOutputDirectory(String pathPrefix, String template) {
-    String out = new Path(pathPrefix + template).getParent().toString();
-    if (out.isEmpty()) {
-      return "./";
-    }
-    return out;
-  }
-
-  /**
-   * @param pathPrefix a relative or absolute path
-   * @param template a template string
-   * @return the prefix of the output filename for the given path prefix and 
template
-   */
-  public static String getOutputFilePrefix(String pathPrefix, String template) 
{
-    String name = new Path(pathPrefix + template).getName();
-    if (name.endsWith(template)) {
-      return name.substring(0, name.length() - template.length());
-    } else {
-      return "";
-    }
-  }
-
-  /**
-   * @param pathPrefix a relative or absolute path
-   * @param template a template string
-   * @return the template for the output filename for the given path prefix and
-   * template
-   */
-  public static String getOutputFileTemplate(String pathPrefix, String 
template) {
-    String name = new Path(pathPrefix + template).getName();
-    if (name.endsWith(template)) {
-      return template;
-    } else {
-      return name;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java
 
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java
deleted file mode 100644
index bb9a7a5..0000000
--- 
a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-/**
- * A marker interface that implementations of
- * {@link org.apache.hadoop.mapreduce.lib.output.FileOutputFormat} implement 
to indicate
- * that they produce shard names that adhere to the template in
- * {@link com.cloudera.dataflow.hadoop.HadoopIO.Write}.
- *
- * Some common shard names are defined in
- * {@link com.google.cloud.dataflow.sdk.io.ShardNameTemplate}.
- */
-public interface ShardNameTemplateAware {
-}

Reply via email to