[BEAM-11] Spark runner directory structure and pom setup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41c4ca6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41c4ca6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41c4ca6a Branch: refs/heads/master Commit: 41c4ca6ae284692bf3abd35491b4ed638b32c283 Parents: 46412e5 Author: Sela <[email protected]> Authored: Sat Mar 12 00:37:55 2016 +0200 Committer: Sela <[email protected]> Committed: Tue Mar 15 20:38:26 2016 +0200 ---------------------------------------------------------------------- runners/spark/pom.xml | 272 +++---- .../com/cloudera/dataflow/hadoop/HadoopIO.java | 202 ----- .../dataflow/hadoop/NullWritableCoder.java | 71 -- .../cloudera/dataflow/hadoop/WritableCoder.java | 120 --- .../com/cloudera/dataflow/io/ConsoleIO.java | 60 -- .../com/cloudera/dataflow/io/CreateStream.java | 66 -- .../java/com/cloudera/dataflow/io/KafkaIO.java | 128 --- .../dataflow/spark/BroadcastHelper.java | 121 --- .../com/cloudera/dataflow/spark/ByteArray.java | 52 -- .../cloudera/dataflow/spark/CoderHelpers.java | 185 ----- .../cloudera/dataflow/spark/DoFnFunction.java | 93 --- .../dataflow/spark/EvaluationContext.java | 283 ------- .../dataflow/spark/EvaluationResult.java | 62 -- .../dataflow/spark/MultiDoFnFunction.java | 115 --- .../dataflow/spark/ShardNameBuilder.java | 106 --- .../dataflow/spark/ShardNameTemplateAware.java | 28 - .../dataflow/spark/ShardNameTemplateHelper.java | 58 -- .../dataflow/spark/SparkContextFactory.java | 66 -- .../dataflow/spark/SparkPipelineEvaluator.java | 52 -- .../dataflow/spark/SparkPipelineOptions.java | 39 - .../spark/SparkPipelineOptionsFactory.java | 27 - .../spark/SparkPipelineOptionsRegistrar.java | 27 - .../dataflow/spark/SparkPipelineRunner.java | 252 ------ .../spark/SparkPipelineRunnerRegistrar.java | 27 - .../dataflow/spark/SparkPipelineTranslator.java | 27 - .../dataflow/spark/SparkProcessContext.java | 250 ------ .../dataflow/spark/SparkRuntimeContext.java | 212 ----- .../spark/TemplatedAvroKeyOutputFormat.java | 40 - .../TemplatedSequenceFileOutputFormat.java | 40 - .../spark/TemplatedTextOutputFormat.java | 40 - .../dataflow/spark/TransformEvaluator.java | 24 - .../dataflow/spark/TransformTranslator.java | 800 ------------------ .../dataflow/spark/WindowingHelpers.java | 59 -- .../spark/aggregators/AggAccumParam.java | 35 - .../spark/aggregators/NamedAggregators.java | 202 ----- .../SparkStreamingPipelineOptions.java | 40 - .../SparkStreamingPipelineOptionsFactory.java | 27 - .../SparkStreamingPipelineOptionsRegistrar.java | 28 - .../streaming/StreamingEvaluationContext.java | 226 ------ .../streaming/StreamingTransformTranslator.java | 414 ---------- .../StreamingWindowPipelineDetector.java | 100 --- .../apache/beam/runners/spark/DoFnFunction.java | 94 +++ .../beam/runners/spark/EvaluationContext.java | 284 +++++++ .../beam/runners/spark/EvaluationResult.java | 62 ++ .../beam/runners/spark/MultiDoFnFunction.java | 116 +++ .../beam/runners/spark/SparkContextFactory.java | 66 ++ .../runners/spark/SparkPipelineEvaluator.java | 52 ++ .../runners/spark/SparkPipelineOptions.java | 39 + .../spark/SparkPipelineOptionsFactory.java | 27 + .../spark/SparkPipelineOptionsRegistrar.java | 27 + .../beam/runners/spark/SparkPipelineRunner.java | 252 ++++++ .../spark/SparkPipelineRunnerRegistrar.java | 27 + .../runners/spark/SparkPipelineTranslator.java | 27 + .../beam/runners/spark/SparkProcessContext.java | 257 ++++++ .../beam/runners/spark/SparkRuntimeContext.java | 214 +++++ .../beam/runners/spark/TransformEvaluator.java | 24 + .../beam/runners/spark/TransformTranslator.java | 805 +++++++++++++++++++ .../beam/runners/spark/WindowingHelpers.java | 59 ++ .../spark/aggregators/AggAccumParam.java | 35 + .../spark/aggregators/NamedAggregators.java | 202 +++++ .../beam/runners/spark/coders/CoderHelpers.java | 186 +++++ .../runners/spark/coders/NullWritableCoder.java | 71 ++ .../runners/spark/coders/WritableCoder.java | 120 +++ .../apache/beam/runners/spark/io/ConsoleIO.java | 60 ++ .../beam/runners/spark/io/CreateStream.java | 66 ++ .../apache/beam/runners/spark/io/KafkaIO.java | 128 +++ .../beam/runners/spark/io/hadoop/HadoopIO.java | 200 +++++ .../spark/io/hadoop/ShardNameBuilder.java | 106 +++ .../spark/io/hadoop/ShardNameTemplateAware.java | 28 + .../io/hadoop/ShardNameTemplateHelper.java | 58 ++ .../io/hadoop/TemplatedAvroKeyOutputFormat.java | 40 + .../TemplatedSequenceFileOutputFormat.java | 40 + .../io/hadoop/TemplatedTextOutputFormat.java | 40 + .../SparkStreamingPipelineOptions.java | 40 + .../SparkStreamingPipelineOptionsFactory.java | 27 + .../SparkStreamingPipelineOptionsRegistrar.java | 28 + .../streaming/StreamingEvaluationContext.java | 226 ++++++ .../streaming/StreamingTransformTranslator.java | 415 ++++++++++ .../StreamingWindowPipelineDetector.java | 101 +++ .../runners/spark/util/BroadcastHelper.java | 122 +++ .../beam/runners/spark/util/ByteArray.java | 52 ++ ...ataflow.sdk.options.PipelineOptionsRegistrar | 4 +- ...dataflow.sdk.runners.PipelineRunnerRegistrar | 2 +- .../dataflow/hadoop/WritableCoderTest.java | 42 - .../dataflow/spark/AvroPipelineTest.java | 103 --- .../dataflow/spark/CombineGloballyTest.java | 87 -- .../dataflow/spark/CombinePerKeyTest.java | 69 -- .../com/cloudera/dataflow/spark/DeDupTest.java | 55 -- .../cloudera/dataflow/spark/DoFnOutputTest.java | 57 -- .../cloudera/dataflow/spark/EmptyInputTest.java | 64 -- .../spark/HadoopFileFormatPipelineTest.java | 105 --- .../spark/MultiOutputWordCountTest.java | 148 ---- .../cloudera/dataflow/spark/NumShardsTest.java | 89 -- .../dataflow/spark/SerializationTest.java | 183 ----- .../dataflow/spark/ShardNameBuilderTest.java | 82 -- .../dataflow/spark/SideEffectsTest.java | 77 -- .../dataflow/spark/SimpleWordCountTest.java | 117 --- .../spark/TestSparkPipelineOptionsFactory.java | 34 - .../com/cloudera/dataflow/spark/TfIdfTest.java | 60 -- .../dataflow/spark/TransformTranslatorTest.java | 95 --- .../dataflow/spark/WindowedWordCountTest.java | 63 -- .../spark/streaming/FlattenStreamingTest.java | 84 -- .../spark/streaming/KafkaStreamingTest.java | 133 --- .../streaming/SimpleStreamingWordCountTest.java | 73 -- .../utils/DataflowAssertStreaming.java | 39 - .../streaming/utils/EmbeddedKafkaCluster.java | 314 -------- .../beam/runners/spark/CombineGloballyTest.java | 88 ++ .../beam/runners/spark/CombinePerKeyTest.java | 65 ++ .../apache/beam/runners/spark/DeDupTest.java | 56 ++ .../beam/runners/spark/DoFnOutputTest.java | 58 ++ .../beam/runners/spark/EmptyInputTest.java | 65 ++ .../runners/spark/MultiOutputWordCountTest.java | 132 +++ .../beam/runners/spark/SerializationTest.java | 177 ++++ .../beam/runners/spark/SideEffectsTest.java | 76 ++ .../beam/runners/spark/SimpleWordCountTest.java | 111 +++ .../spark/TestSparkPipelineOptionsFactory.java | 34 + .../apache/beam/runners/spark/TfIdfTest.java | 61 ++ .../runners/spark/TransformTranslatorTest.java | 95 +++ .../runners/spark/WindowedWordCountTest.java | 64 ++ .../runners/spark/coders/WritableCoderTest.java | 42 + .../beam/runners/spark/io/AvroPipelineTest.java | 105 +++ .../beam/runners/spark/io/NumShardsTest.java | 93 +++ .../io/hadoop/HadoopFileFormatPipelineTest.java | 107 +++ .../spark/io/hadoop/ShardNameBuilderTest.java | 82 ++ .../spark/streaming/FlattenStreamingTest.java | 84 ++ .../spark/streaming/KafkaStreamingTest.java | 133 +++ .../streaming/SimpleStreamingWordCountTest.java | 73 ++ .../utils/DataflowAssertStreaming.java | 39 + .../streaming/utils/EmbeddedKafkaCluster.java | 314 ++++++++ 129 files changed, 7113 insertions(+), 7119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 399e9e7..a060161 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -11,21 +11,136 @@ the specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> - <name>Dataflow on Spark</name> - <groupId>com.cloudera.dataflow.spark</groupId> - <artifactId>spark-dataflow</artifactId> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>runners</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>spark-runner</artifactId> <version>0.4.3-SNAPSHOT</version> + + <name>Spark Beam Runner</name> <packaging>jar</packaging> + <inceptionYear>2014</inceptionYear> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.7</java.version> <spark.version>1.5.2</spark.version> - <google-cloud-dataflow-version>1.3.0</google-cloud-dataflow-version> + <beam.version>1.5.0-SNAPSHOT</beam.version> </properties> + <repositories> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_2.10</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.2.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> + <version>${beam.version}</version> + <exclusions> + <!-- Use Hadoop/Spark's backend logger --> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-examples-all</artifactId> + <version>${beam.version}</version> + <exclusions> + <!-- Use Hadoop/Spark's backend logger --> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <version>1.7.7</version> + <classifier>hadoop2</classifier> + <exclusions> + <!-- exclude old Jetty version of servlet API --> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> <pluginManagement> <plugins> @@ -231,20 +346,20 @@ License. <goals> <goal>shade</goal> </goals> - <configuration> - <relocations> - <!-- relocate Guava used by Dataflow (v18) since it conflicts with version used by Hadoop (v11) --> - <relocation> - <pattern>com.google.common</pattern> - <shadedPattern>com.cloudera.dataflow.spark.relocated.com.google.common</shadedPattern> - </relocation> - </relocations> - <shadedArtifactAttached>true</shadedArtifactAttached> - <shadedClassifierName>spark-app</shadedClassifierName> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - </transformers> - </configuration> + <!--<configuration>--> + <!--<relocations>--> + <!--<!– relocate Guava used by Dataflow (v18) since it conflicts with version used by Hadoop (v11) –>--> + <!--<relocation>--> + <!--<pattern>com.google.common</pattern>--> + <!--<shadedPattern>com.cloudera.dataflow.spark.relocated.com.google.common</shadedPattern>--> + <!--</relocation>--> + <!--</relocations>--> + <!--<shadedArtifactAttached>true</shadedArtifactAttached>--> + <!--<shadedClassifierName>spark-app</shadedClassifierName>--> + <!--<transformers>--> + <!--<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />--> + <!--</transformers>--> + <!--</configuration>--> </execution> </executions> </plugin> @@ -274,89 +389,6 @@ License. </plugins> </build> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.10</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kafka_2.10</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.8.2.1</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>18.0</version> - </dependency> - <dependency> - <groupId>com.google.cloud.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> - <version>${google-cloud-dataflow-version}</version> - <exclusions> - <!-- Use Hadoop/Spark's backend logger --> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.google.cloud.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-examples-all</artifactId> - <version>${google-cloud-dataflow-version}</version> - <exclusions> - <!-- Use Hadoop/Spark's backend logger --> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <version>1.7.7</version> - <classifier>hadoop2</classifier> - <exclusions> - <!-- exclude old Jetty version of servlet API --> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- test dependencies --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.12</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>1.3</version> - <scope>test</scope> - </dependency> - </dependencies> - <reporting> <plugins> <plugin> @@ -380,25 +412,12 @@ License. </reporting> - <url>http://github.com/cloudera/spark-dataflow</url> - <inceptionYear>2014</inceptionYear> - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - </license> - </licenses> <developers> <developer> <name>Cloudera, Inc.</name> </developer> </developers> - <issueManagement> - <system>GitHub</system> - <url>https://github.com/cloudera/spark-dataflow/issues</url> - </issueManagement> <scm> <connection>scm:git:https://github.com/cloudera/spark-dataflow.git</connection> <developerConnection>scm:git:https://github.com/cloudera/spark-dataflow.git</developerConnection> @@ -410,31 +429,6 @@ License. <maven>3.2.1</maven> </prerequisites> - <repositories> - <repository> - <id>cloudera.repo</id> - <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> - <name>Cloudera Repositories</name> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>true</enabled> - </snapshots> - </repository> - </repositories> - - <distributionManagement> - <repository> - <id>cloudera.repo</id> - <url>https://repository.cloudera.com/artifactory/libs-release-local</url> - </repository> - <snapshotRepository> - <id>cloudera.snapshots.repo</id> - <url>https://repository.cloudera.com/artifactory/libs-snapshot-local</url> - </snapshotRepository> - </distributionManagement> - <profiles> <profile> <id>release-sign-artifacts</id> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java deleted file mode 100644 index c79f211..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.dataflow.hadoop; - -import java.util.HashMap; -import java.util.Map; - -import com.google.cloud.dataflow.sdk.io.ShardNameTemplate; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PDone; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.common.base.Preconditions; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -import com.cloudera.dataflow.spark.ShardNameTemplateAware; - -public final class HadoopIO { - - private HadoopIO() { - } - - public static final class Read { - - private Read() { - } - - public static <K, V> Bound<K, V> from(String filepattern, - Class<? extends FileInputFormat<K, V>> format, Class<K> key, Class<V> value) { - return new Bound<>(filepattern, format, key, value); - } - - public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> { - - private final String filepattern; - private final Class<? extends FileInputFormat<K, V>> formatClass; - private final Class<K> keyClass; - private final Class<V> valueClass; - - Bound(String filepattern, Class<? extends FileInputFormat<K, V>> format, Class<K> key, - Class<V> value) { - Preconditions.checkNotNull(filepattern, - "need to set the filepattern of an HadoopIO.Read transform"); - Preconditions.checkNotNull(format, - "need to set the format class of an HadoopIO.Read transform"); - Preconditions.checkNotNull(key, - "need to set the key class of an HadoopIO.Read transform"); - Preconditions.checkNotNull(value, - "need to set the value class of an HadoopIO.Read transform"); - this.filepattern = filepattern; - this.formatClass = format; - this.keyClass = key; - this.valueClass = value; - } - - public String getFilepattern() { - return filepattern; - } - - public Class<? extends FileInputFormat<K, V>> getFormatClass() { - return formatClass; - } - - public Class<V> getValueClass() { - return valueClass; - } - - public Class<K> getKeyClass() { - return keyClass; - } - - @Override - public PCollection<KV<K, V>> apply(PInput input) { - return PCollection.createPrimitiveOutputInternal(input.getPipeline(), - WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED); - } - - } - - } - - public static final class Write { - - private Write() { - } - - public static <K, V> Bound<K, V> to(String filenamePrefix, - Class<? extends FileOutputFormat<K, V>> format, Class<K> key, Class<V> value) { - return new Bound<>(filenamePrefix, format, key, value); - } - - public static class Bound<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> { - - /** The filename to write to. */ - private final String filenamePrefix; - /** Suffix to use for each filename. */ - private final String filenameSuffix; - /** Requested number of shards. 0 for automatic. */ - private final int numShards; - /** Shard template string. */ - private final String shardTemplate; - private final Class<? extends FileOutputFormat<K, V>> formatClass; - private final Class<K> keyClass; - private final Class<V> valueClass; - private final Map<String, String> configurationProperties; - - Bound(String filenamePrefix, Class<? extends FileOutputFormat<K, V>> format, - Class<K> key, - Class<V> value) { - this(filenamePrefix, "", 0, ShardNameTemplate.INDEX_OF_MAX, format, key, value, - new HashMap<String, String>()); - } - - Bound(String filenamePrefix, String filenameSuffix, int numShards, - String shardTemplate, Class<? extends FileOutputFormat<K, V>> format, - Class<K> key, Class<V> value, Map<String, String> configurationProperties) { - this.filenamePrefix = filenamePrefix; - this.filenameSuffix = filenameSuffix; - this.numShards = numShards; - this.shardTemplate = shardTemplate; - this.formatClass = format; - this.keyClass = key; - this.valueClass = value; - this.configurationProperties = configurationProperties; - } - - public Bound<K, V> withoutSharding() { - return new Bound<>(filenamePrefix, filenameSuffix, 1, "", formatClass, - keyClass, valueClass, configurationProperties); - } - - public Bound<K, V> withConfigurationProperty(String key, String value) { - configurationProperties.put(key, value); - return this; - } - - public String getFilenamePrefix() { - return filenamePrefix; - } - - public String getShardTemplate() { - return shardTemplate; - } - - public int getNumShards() { - return numShards; - } - - public String getFilenameSuffix() { - return filenameSuffix; - } - - public Class<? extends FileOutputFormat<K, V>> getFormatClass() { - return formatClass; - } - - public Class<V> getValueClass() { - return valueClass; - } - - public Class<K> getKeyClass() { - return keyClass; - } - - public Map<String, String> getConfigurationProperties() { - return configurationProperties; - } - - @Override - public PDone apply(PCollection<KV<K, V>> input) { - Preconditions.checkNotNull(filenamePrefix, - "need to set the filename prefix of an HadoopIO.Write transform"); - Preconditions.checkNotNull(formatClass, - "need to set the format class of an HadoopIO.Write transform"); - Preconditions.checkNotNull(keyClass, - "need to set the key class of an HadoopIO.Write transform"); - Preconditions.checkNotNull(valueClass, - "need to set the value class of an HadoopIO.Write transform"); - - Preconditions.checkArgument(ShardNameTemplateAware.class.isAssignableFrom(formatClass), - "Format class must implement " + ShardNameTemplateAware.class.getName()); - - return PDone.in(input.getPipeline()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java deleted file mode 100644 index 5e5d391..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.hadoop; - -import java.io.InputStream; -import java.io.OutputStream; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.google.cloud.dataflow.sdk.coders.Coder; -import org.apache.hadoop.io.NullWritable; - -public final class NullWritableCoder extends WritableCoder<NullWritable> { - private static final long serialVersionUID = 1L; - - @JsonCreator - public static NullWritableCoder of() { - return INSTANCE; - } - - private static final NullWritableCoder INSTANCE = new NullWritableCoder(); - - private NullWritableCoder() { - super(NullWritable.class); - } - - @Override - public void encode(NullWritable value, OutputStream outStream, Context context) { - // nothing to write - } - - @Override - public NullWritable decode(InputStream inStream, Context context) { - return NullWritable.get(); - } - - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * Returns true since registerByteSizeObserver() runs in constant time. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(NullWritable value, Context context) { - return true; - } - - @Override - protected long getEncodedElementByteSize(NullWritable value, Context context) { - return 0; - } - - @Override - public void verifyDeterministic() throws Coder.NonDeterministicException { - // NullWritableCoder is deterministic - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java deleted file mode 100644 index 324b203..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.hadoop; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.InvocationTargetException; -import java.util.List; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderException; -import com.google.cloud.dataflow.sdk.coders.StandardCoder; -import com.google.cloud.dataflow.sdk.util.CloudObject; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - -/** - * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}. - * - * <p> To use, specify the coder type on a PCollection: - * <pre> - * {@code - * PCollection<MyRecord> records = - * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class)); - * } - * </pre> - * - * @param <T> the type of elements handled by this coder - */ -public class WritableCoder<T extends Writable> extends StandardCoder<T> { - private static final long serialVersionUID = 0L; - - /** - * Returns a {@code WritableCoder} instance for the provided element class. - * @param <T> the element type - * @param clazz the element class - * @return a {@code WritableCoder} instance for the provided element class - */ - public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) { - if (clazz.equals(NullWritable.class)) { - @SuppressWarnings("unchecked") - WritableCoder<T> result = (WritableCoder<T>) NullWritableCoder.of(); - return result; - } - return new WritableCoder<>(clazz); - } - - @JsonCreator - @SuppressWarnings("unchecked") - public static WritableCoder<?> of(@JsonProperty("type") String classType) - throws ClassNotFoundException { - Class<?> clazz = Class.forName(classType); - if (!Writable.class.isAssignableFrom(clazz)) { - throw new ClassNotFoundException( - "Class " + classType + " does not implement Writable"); - } - return of((Class<? extends Writable>) clazz); - } - - private final Class<T> type; - - public WritableCoder(Class<T> type) { - this.type = type; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - value.write(new DataOutputStream(outStream)); - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - try { - T t = type.getConstructor().newInstance(); - t.readFields(new DataInputStream(inStream)); - return t; - } catch (NoSuchMethodException | InstantiationException | IllegalAccessException e) { - throw new CoderException("unable to deserialize record", e); - } catch (InvocationTargetException ite) { - throw new CoderException("unable to deserialize record", ite.getCause()); - } - } - - @Override - public List<Coder<?>> getCoderArguments() { - return null; - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - result.put("type", type.getName()); - return result; - } - - @Override - public void verifyDeterministic() throws Coder.NonDeterministicException { - throw new NonDeterministicException(this, - "Hadoop Writable may be non-deterministic."); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java deleted file mode 100644 index bc19b39..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.dataflow.io; - -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PDone; - -/** - * Print to console. - */ -public final class ConsoleIO { - - private ConsoleIO() { - } - - public static final class Write { - - private Write() { - } - - public static <T> Unbound<T> from() { - return new Unbound<>(10); - } - - public static <T> Unbound<T> from(int num) { - return new Unbound<>(num); - } - - public static class Unbound<T> extends PTransform<PCollection<T>, PDone> { - - private final int num; - - Unbound(int num) { - this.num = num; - } - - public int getNum() { - return num; - } - - @Override - public PDone apply(PCollection<T> input) { - return PDone.in(input.getPipeline()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java deleted file mode 100644 index 9a99278..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.dataflow.io; - -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.common.base.Preconditions; - -/** - * Create an input stream from Queue. - * - * @param <T> stream type - */ -public final class CreateStream<T> { - - private CreateStream() { - } - - /** - * Define the input stream to create from queue. - * - * @param queuedValues defines the input stream - * @param <T> stream type - * @return the queue that defines the input stream - */ - public static <T> QueuedValues<T> fromQueue(Iterable<Iterable<T>> queuedValues) { - return new QueuedValues<>(queuedValues); - } - - public static final class QueuedValues<T> extends PTransform<PInput, PCollection<T>> { - - private final Iterable<Iterable<T>> queuedValues; - - QueuedValues(Iterable<Iterable<T>> queuedValues) { - Preconditions.checkNotNull(queuedValues, - "need to set the queuedValues of an Create.QueuedValues transform"); - this.queuedValues = queuedValues; - } - - public Iterable<Iterable<T>> getQueuedValues() { - return queuedValues; - } - - @Override - public PCollection<T> apply(PInput input) { - // Spark streaming micro batches are bounded by default - return PCollection.createPrimitiveOutputInternal(input.getPipeline(), - WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java deleted file mode 100644 index 154e6da..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.dataflow.io; - -import java.util.Map; -import java.util.Set; - -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.common.base.Preconditions; - -import kafka.serializer.Decoder; - -/** - * Read stream from Kafka. - */ -public final class KafkaIO { - - private KafkaIO() { - } - - public static final class Read { - - private Read() { - } - - /** - * Define the Kafka consumption. - * - * @param keyDecoder {@link Decoder} to decode the Kafka message key - * @param valueDecoder {@link Decoder} to decode the Kafka message value - * @param key Kafka message key Class - * @param value Kafka message value Class - * @param topics Kafka topics to subscribe - * @param kafkaParams map of Kafka parameters - * @param <K> Kafka message key Class type - * @param <V> Kafka message value Class type - * @return KafkaIO Unbound input - */ - public static <K, V> Unbound<K, V> from(Class<? extends Decoder<K>> keyDecoder, - Class<? extends Decoder<V>> valueDecoder, - Class<K> key, - Class<V> value, Set<String> topics, - Map<String, String> kafkaParams) { - return new Unbound<>(keyDecoder, valueDecoder, key, value, topics, kafkaParams); - } - - public static class Unbound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> { - - private final Class<? extends Decoder<K>> keyDecoderClass; - private final Class<? extends Decoder<V>> valueDecoderClass; - private final Class<K> keyClass; - private final Class<V> valueClass; - private final Set<String> topics; - private final Map<String, String> kafkaParams; - - Unbound(Class<? extends Decoder<K>> keyDecoder, - Class<? extends Decoder<V>> valueDecoder, Class<K> key, - Class<V> value, Set<String> topics, Map<String, String> kafkaParams) { - Preconditions.checkNotNull(keyDecoder, - "need to set the key decoder class of a KafkaIO.Read transform"); - Preconditions.checkNotNull(valueDecoder, - "need to set the value decoder class of a KafkaIO.Read transform"); - Preconditions.checkNotNull(key, - "need to set the key class of aKafkaIO.Read transform"); - Preconditions.checkNotNull(value, - "need to set the value class of a KafkaIO.Read transform"); - Preconditions.checkNotNull(topics, - "need to set the topics of a KafkaIO.Read transform"); - Preconditions.checkNotNull(kafkaParams, - "need to set the kafkaParams of a KafkaIO.Read transform"); - this.keyDecoderClass = keyDecoder; - this.valueDecoderClass = valueDecoder; - this.keyClass = key; - this.valueClass = value; - this.topics = topics; - this.kafkaParams = kafkaParams; - } - - public Class<? extends Decoder<K>> getKeyDecoderClass() { - return keyDecoderClass; - } - - public Class<? extends Decoder<V>> getValueDecoderClass() { - return valueDecoderClass; - } - - public Class<V> getValueClass() { - return valueClass; - } - - public Class<K> getKeyClass() { - return keyClass; - } - - public Set<String> getTopics() { - return topics; - } - - public Map<String, String> getKafkaParams() { - return kafkaParams; - } - - @Override - public PCollection<KV<K, V>> apply(PInput input) { - // Spark streaming micro batches are bounded by default - return PCollection.createPrimitiveOutputInternal(input.getPipeline(), - WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java deleted file mode 100644 index 8dca939..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.Serializable; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.broadcast.Broadcast; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract class BroadcastHelper<T> implements Serializable { - - /** - * If the property {@code dataflow.spark.directBroadcast} is set to - * {@code true} then Spark serialization (Kryo) will be used to broadcast values - * in View objects. By default this property is not set, and values are coded using - * the appropriate {@link Coder}. - */ - public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast"; - - private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class); - - public static <T> BroadcastHelper<T> create(T value, Coder<T> coder) { - if (Boolean.parseBoolean(System.getProperty(DIRECT_BROADCAST, "false"))) { - return new DirectBroadcastHelper<>(value); - } - return new CodedBroadcastHelper<>(value, coder); - } - - public abstract T getValue(); - - public abstract void broadcast(JavaSparkContext jsc); - - /** - * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that relies on the underlying - * Spark serialization (Kryo) to broadcast values. This is appropriate when - * broadcasting very large values, since no copy of the object is made. - * @param <T> - */ - static class DirectBroadcastHelper<T> extends BroadcastHelper<T> { - private Broadcast<T> bcast; - private transient T value; - - DirectBroadcastHelper(T value) { - this.value = value; - } - - @Override - public synchronized T getValue() { - if (value == null) { - value = bcast.getValue(); - } - return value; - } - - @Override - public void broadcast(JavaSparkContext jsc) { - this.bcast = jsc.broadcast(value); - } - } - - /** - * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that uses a - * {@link Coder} to encode values as byte arrays - * before broadcasting. - * @param <T> - */ - static class CodedBroadcastHelper<T> extends BroadcastHelper<T> { - private Broadcast<byte[]> bcast; - private final Coder<T> coder; - private transient T value; - - CodedBroadcastHelper(T value, Coder<T> coder) { - this.value = value; - this.coder = coder; - } - - @Override - public synchronized T getValue() { - if (value == null) { - value = deserialize(); - } - return value; - } - - @Override - public void broadcast(JavaSparkContext jsc) { - this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder)); - } - - private T deserialize() { - T val; - try { - val = coder.decode(new ByteArrayInputStream(bcast.value()), - new Coder.Context(true)); - } catch (IOException ioe) { - // this should not ever happen, log it if it does. - LOG.warn(ioe.getMessage()); - val = null; - } - return val; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java deleted file mode 100644 index 06db572..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.dataflow.spark; - -import java.io.Serializable; -import java.util.Arrays; - -import com.google.common.primitives.UnsignedBytes; - -class ByteArray implements Serializable, Comparable<ByteArray> { - - private final byte[] value; - - ByteArray(byte[] value) { - this.value = value; - } - - public byte[] getValue() { - return value; - } - - @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) { - return false; - } - ByteArray byteArray = (ByteArray) o; - return Arrays.equals(value, byteArray.value); - } - - @Override - public int hashCode() { - return value != null ? Arrays.hashCode(value) : 0; - } - - @Override - public int compareTo(ByteArray other) { - return UnsignedBytes.lexicographicalComparator().compare(value, other.value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java deleted file mode 100644 index 0ae06c1..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.common.collect.Iterables; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; -import scala.Tuple2; - -/** - * Serialization utility class. - */ -public final class CoderHelpers { - private CoderHelpers() { - } - - /** - * Utility method for serializing an object using the specified coder. - * - * @param value Value to serialize. - * @param coder Coder to serialize with. - * @param <T> type of value that is serialized - * @return Byte array representing serialized object. - */ - static <T> byte[] toByteArray(T value, Coder<T> coder) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - coder.encode(value, baos, new Coder.Context(true)); - } catch (IOException e) { - throw new IllegalStateException("Error encoding value: " + value, e); - } - return baos.toByteArray(); - } - - /** - * Utility method for serializing a Iterable of values using the specified coder. - * - * @param values Values to serialize. - * @param coder Coder to serialize with. - * @param <T> type of value that is serialized - * @return List of bytes representing serialized objects. - */ - static <T> List<byte[]> toByteArrays(Iterable<T> values, Coder<T> coder) { - List<byte[]> res = new LinkedList<>(); - for (T value : values) { - res.add(toByteArray(value, coder)); - } - return res; - } - - /** - * Utility method for deserializing a byte array using the specified coder. - * - * @param serialized bytearray to be deserialized. - * @param coder Coder to deserialize with. - * @param <T> Type of object to be returned. - * @return Deserialized object. - */ - static <T> T fromByteArray(byte[] serialized, Coder<T> coder) { - ByteArrayInputStream bais = new ByteArrayInputStream(serialized); - try { - return coder.decode(bais, new Coder.Context(true)); - } catch (IOException e) { - throw new IllegalStateException("Error decoding bytes for coder: " + coder, e); - } - } - - /** - * A function wrapper for converting an object to a bytearray. - * - * @param coder Coder to serialize with. - * @param <T> The type of the object being serialized. - * @return A function that accepts an object and returns its coder-serialized form. - */ - static <T> Function<T, byte[]> toByteFunction(final Coder<T> coder) { - return new Function<T, byte[]>() { - @Override - public byte[] call(T t) throws Exception { - return toByteArray(t, coder); - } - }; - } - - /** - * A function wrapper for converting a byte array to an object. - * - * @param coder Coder to deserialize with. - * @param <T> The type of the object being deserialized. - * @return A function that accepts a byte array and returns its corresponding object. - */ - static <T> Function<byte[], T> fromByteFunction(final Coder<T> coder) { - return new Function<byte[], T>() { - @Override - public T call(byte[] bytes) throws Exception { - return fromByteArray(bytes, coder); - } - }; - } - - /** - * A function wrapper for converting a key-value pair to a byte array pair. - * - * @param keyCoder Coder to serialize keys. - * @param valueCoder Coder to serialize values. - * @param <K> The type of the key being serialized. - * @param <V> The type of the value being serialized. - * @return A function that accepts a key-value pair and returns a pair of byte arrays. - */ - static <K, V> PairFunction<Tuple2<K, V>, ByteArray, byte[]> toByteFunction( - final Coder<K> keyCoder, final Coder<V> valueCoder) { - return new PairFunction<Tuple2<K, V>, ByteArray, byte[]>() { - @Override - public Tuple2<ByteArray, byte[]> call(Tuple2<K, V> kv) { - return new Tuple2<>(new ByteArray(toByteArray(kv._1(), keyCoder)), toByteArray(kv._2(), - valueCoder)); - } - }; - } - - /** - * A function wrapper for converting a byte array pair to a key-value pair. - * - * @param keyCoder Coder to deserialize keys. - * @param valueCoder Coder to deserialize values. - * @param <K> The type of the key being deserialized. - * @param <V> The type of the value being deserialized. - * @return A function that accepts a pair of byte arrays and returns a key-value pair. - */ - static <K, V> PairFunction<Tuple2<ByteArray, byte[]>, K, V> fromByteFunction( - final Coder<K> keyCoder, final Coder<V> valueCoder) { - return new PairFunction<Tuple2<ByteArray, byte[]>, K, V>() { - @Override - public Tuple2<K, V> call(Tuple2<ByteArray, byte[]> tuple) { - return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder), - fromByteArray(tuple._2(), valueCoder)); - } - }; - } - - /** - * A function wrapper for converting a byte array pair to a key-value pair, where - * values are {@link Iterable}. - * - * @param keyCoder Coder to deserialize keys. - * @param valueCoder Coder to deserialize values. - * @param <K> The type of the key being deserialized. - * @param <V> The type of the value being deserialized. - * @return A function that accepts a pair of byte arrays and returns a key-value pair. - */ - static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>> - fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) { - return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() { - @Override - public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) { - return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder), - Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() { - @Override - public V apply(byte[] bytes) { - return fromByteArray(bytes, valueCoder); - } - })); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java deleted file mode 100644 index 2bcfec3..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import org.apache.spark.api.java.function.FlatMapFunction; - -/** - * Dataflow's Do functions correspond to Spark's FlatMap functions. - * - * @param <I> Input element type. - * @param <O> Output element type. - */ -public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValue<I>>, - WindowedValue<O>> { - private final DoFn<I, O> mFunction; - private final SparkRuntimeContext mRuntimeContext; - private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; - - /** - * @param fn DoFunction to be wrapped. - * @param runtime Runtime to apply function in. - * @param sideInputs Side inputs used in DoFunction. - */ - public DoFnFunction(DoFn<I, O> fn, - SparkRuntimeContext runtime, - Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { - this.mFunction = fn; - this.mRuntimeContext = runtime; - this.mSideInputs = sideInputs; - } - - @Override - public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) throws - Exception { - ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs); - ctxt.setup(); - mFunction.startBundle(ctxt); - return ctxt.getOutputIterable(iter, mFunction); - } - - private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> { - - private final List<WindowedValue<O>> outputs = new LinkedList<>(); - - ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, - BroadcastHelper<?>> sideInputs) { - super(fn, runtimeContext, sideInputs); - } - - @Override - public synchronized void output(O o) { - outputs.add(windowedValue != null ? windowedValue.withValue(o) : - WindowedValue.valueInEmptyWindows(o)); - } - - @Override - public synchronized void output(WindowedValue<O> o) { - outputs.add(o); - } - - @Override - protected void clearOutput() { - outputs.clear(); - } - - @Override - protected Iterator<WindowedValue<O>> getOutputIterator() { - return outputs.iterator(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java deleted file mode 100644 index a6ac6c2..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException; -import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.cloud.dataflow.sdk.values.PValue; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import org.apache.spark.api.java.JavaRDDLike; -import org.apache.spark.api.java.JavaSparkContext; - - -/** - * Evaluation context allows us to define how pipeline instructions. - */ -public class EvaluationContext implements EvaluationResult { - private final JavaSparkContext jsc; - private final Pipeline pipeline; - private final SparkRuntimeContext runtime; - private final Map<PValue, RDDHolder<?>> pcollections = new LinkedHashMap<>(); - private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>(); - private final Set<PValue> multireads = new LinkedHashSet<>(); - private final Map<PValue, Object> pobjects = new LinkedHashMap<>(); - private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>(); - protected AppliedPTransform<?, ?, ?> currentTransform; - - public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) { - this.jsc = jsc; - this.pipeline = pipeline; - this.runtime = new SparkRuntimeContext(jsc, pipeline); - } - - /** - * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are - * sometimes created from a collection of objects (using RDD parallelize) and then - * only used to create View objects; in which case they do not need to be - * converted to bytes since they are not transferred across the network until they are - * broadcast. - */ - private class RDDHolder<T> { - - private Iterable<T> values; - private Coder<T> coder; - private JavaRDDLike<WindowedValue<T>, ?> rdd; - - RDDHolder(Iterable<T> values, Coder<T> coder) { - this.values = values; - this.coder = coder; - } - - RDDHolder(JavaRDDLike<WindowedValue<T>, ?> rdd) { - this.rdd = rdd; - } - - JavaRDDLike<WindowedValue<T>, ?> getRDD() { - if (rdd == null) { - Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values, - new Function<T, WindowedValue<T>>() { - @Override - public WindowedValue<T> apply(T t) { - // TODO: this is wrong if T is a TimestampedValue - return WindowedValue.valueInEmptyWindows(t); - } - }); - WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = - WindowedValue.getValueOnlyCoder(coder); - rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) - .map(CoderHelpers.fromByteFunction(windowCoder)); - } - return rdd; - } - - Iterable<T> getValues(PCollection<T> pcollection) { - if (values == null) { - coder = pcollection.getCoder(); - JavaRDDLike<byte[], ?> bytesRDD = rdd.map(WindowingHelpers.<T>unwindowFunction()) - .map(CoderHelpers.toByteFunction(coder)); - List<byte[]> clientBytes = bytesRDD.collect(); - values = Iterables.transform(clientBytes, new Function<byte[], T>() { - @Override - public T apply(byte[] bytes) { - return CoderHelpers.fromByteArray(bytes, coder); - } - }); - } - return values; - } - - Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) { - return Iterables.transform(get(pcollection), new Function<T, WindowedValue<T>>() { - @Override - public WindowedValue<T> apply(T t) { - return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place? - } - }); - } - } - - protected JavaSparkContext getSparkContext() { - return jsc; - } - - protected Pipeline getPipeline() { - return pipeline; - } - - protected SparkRuntimeContext getRuntimeContext() { - return runtime; - } - - protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { - this.currentTransform = transform; - } - - protected AppliedPTransform<?, ?, ?> getCurrentTransform() { - return currentTransform; - } - - protected <I extends PInput> I getInput(PTransform<I, ?> transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - @SuppressWarnings("unchecked") - I input = (I) currentTransform.getInput(); - return input; - } - - protected <O extends POutput> O getOutput(PTransform<?, O> transform) { - checkArgument(currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); - @SuppressWarnings("unchecked") - O output = (O) currentTransform.getOutput(); - return output; - } - - protected <T> void setOutputRDD(PTransform<?, ?> transform, - JavaRDDLike<WindowedValue<T>, ?> rdd) { - setRDD((PValue) getOutput(transform), rdd); - } - - protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values, - Coder<T> coder) { - pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder)); - } - - void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) { - pview.put(view, value); - } - - protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) { - PValue pvalue = (PValue) getOutput(transform); - return pcollections.containsKey(pvalue); - } - - protected JavaRDDLike<?, ?> getRDD(PValue pvalue) { - RDDHolder<?> rddHolder = pcollections.get(pvalue); - JavaRDDLike<?, ?> rdd = rddHolder.getRDD(); - leafRdds.remove(rddHolder); - if (multireads.contains(pvalue)) { - // Ensure the RDD is marked as cached - rdd.rdd().cache(); - } else { - multireads.add(pvalue); - } - return rdd; - } - - protected <T> void setRDD(PValue pvalue, JavaRDDLike<WindowedValue<T>, ?> rdd) { - try { - rdd.rdd().setName(pvalue.getName()); - } catch (IllegalStateException e) { - // name not set, ignore - } - RDDHolder<T> rddHolder = new RDDHolder<>(rdd); - pcollections.put(pvalue, rddHolder); - leafRdds.add(rddHolder); - } - - JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) { - return getRDD((PValue) getInput(transform)); - } - - - <T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) { - return pview.get(view); - } - - /** - * Computes the outputs for all RDDs that are leaves in the DAG and do not have any - * actions (like saving to a file) registered on them (i.e. they are performed for side - * effects). - */ - protected void computeOutputs() { - for (RDDHolder<?> rddHolder : leafRdds) { - JavaRDDLike<?, ?> rdd = rddHolder.getRDD(); - rdd.rdd().cache(); // cache so that any subsequent get() is cheap - rdd.count(); // force the RDD to be computed - } - } - - @Override - public <T> T get(PValue value) { - if (pobjects.containsKey(value)) { - @SuppressWarnings("unchecked") - T result = (T) pobjects.get(value); - return result; - } - if (pcollections.containsKey(value)) { - JavaRDDLike<?, ?> rdd = pcollections.get(value).getRDD(); - @SuppressWarnings("unchecked") - T res = (T) Iterables.getOnlyElement(rdd.collect()); - pobjects.put(value, res); - return res; - } - throw new IllegalStateException("Cannot resolve un-known PObject: " + value); - } - - @Override - public <T> T getAggregatorValue(String named, Class<T> resultType) { - return runtime.getAggregatorValue(named, resultType); - } - - @Override - public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - return runtime.getAggregatorValues(aggregator); - } - - @Override - public <T> Iterable<T> get(PCollection<T> pcollection) { - @SuppressWarnings("unchecked") - RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); - return rddHolder.getValues(pcollection); - } - - <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) { - @SuppressWarnings("unchecked") - RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); - return rddHolder.getWindowedValues(pcollection); - } - - @Override - public void close() { - SparkContextFactory.stopSparkContext(jsc); - } - - /** The runner is blocking. */ - @Override - public State getState() { - return State.DONE; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java deleted file mode 100644 index aad029a..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PValue; - -/** - * Interface for retrieving the result(s) of running a pipeline. Allows us to translate between - * {@code PObject<T>}s or {@code PCollection<T>}s and Ts or collections of Ts. - */ -public interface EvaluationResult extends PipelineResult { - /** - * Retrieves an iterable of results associated with the PCollection passed in. - * - * @param pcollection Collection we wish to translate. - * @param <T> Type of elements contained in collection. - * @return Natively types result associated with collection. - */ - <T> Iterable<T> get(PCollection<T> pcollection); - - /** - * Retrieve an object of Type T associated with the PValue passed in. - * - * @param pval PValue to retrieve associated data for. - * @param <T> Type of object to return. - * @return Native object. - */ - <T> T get(PValue pval); - - /** - * Retrieves the final value of the aggregator. - * - * @param aggName name of aggregator. - * @param resultType Class of final result of aggregation. - * @param <T> Type of final result of aggregation. - * @return Result of aggregation associated with specified name. - */ - <T> T getAggregatorValue(String aggName, Class<T> resultType); - - /** - * Releases any runtime resources, including distributed-execution contexts currently held by - * this EvaluationResult; once close() has been called, - * {@link EvaluationResult#get(PCollection)} might - * not work for subsequent calls. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java deleted file mode 100644 index d269788..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.util.Iterator; -import java.util.Map; - -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.base.Function; -import com.google.common.collect.Iterators; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Multimap; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.joda.time.Instant; -import scala.Tuple2; - -/** - * DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the - * underlying data with multiple TupleTags. - * - * @param <I> Input type for DoFunction. - * @param <O> Output type for DoFunction. - */ -class MultiDoFnFunction<I, O> - implements PairFlatMapFunction<Iterator<WindowedValue<I>>, TupleTag<?>, WindowedValue<?>> { - private final DoFn<I, O> mFunction; - private final SparkRuntimeContext mRuntimeContext; - private final TupleTag<O> mMainOutputTag; - private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; - - MultiDoFnFunction( - DoFn<I, O> fn, - SparkRuntimeContext runtimeContext, - TupleTag<O> mainOutputTag, - Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { - this.mFunction = fn; - this.mRuntimeContext = runtimeContext; - this.mMainOutputTag = mainOutputTag; - this.mSideInputs = sideInputs; - } - - @Override - public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>> - call(Iterator<WindowedValue<I>> iter) throws Exception { - ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs); - mFunction.startBundle(ctxt); - ctxt.setup(); - return ctxt.getOutputIterable(iter, mFunction); - } - - private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, WindowedValue<?>>> { - - private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create(); - - ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, - BroadcastHelper<?>> sideInputs) { - super(fn, runtimeContext, sideInputs); - } - - @Override - public synchronized void output(O o) { - outputs.put(mMainOutputTag, windowedValue.withValue(o)); - } - - @Override - public synchronized void output(WindowedValue<O> o) { - outputs.put(mMainOutputTag, o); - } - - @Override - public synchronized <T> void sideOutput(TupleTag<T> tag, T t) { - outputs.put(tag, windowedValue.withValue(t)); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) { - outputs.put(tupleTag, WindowedValue.of(t, instant, - windowedValue.getWindows(), windowedValue.getPane())); - } - - @Override - protected void clearOutput() { - outputs.clear(); - } - - @Override - protected Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> getOutputIterator() { - return Iterators.transform(outputs.entries().iterator(), - new Function<Map.Entry<TupleTag<?>, WindowedValue<?>>, - Tuple2<TupleTag<?>, WindowedValue<?>>>() { - @Override - public Tuple2<TupleTag<?>, WindowedValue<?>> apply(Map.Entry<TupleTag<?>, - WindowedValue<?>> input) { - return new Tuple2<TupleTag<?>, WindowedValue<?>>(input.getKey(), input.getValue()); - } - }); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameBuilder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameBuilder.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameBuilder.java deleted file mode 100644 index f53b6d9..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameBuilder.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.fs.Path; - -final class ShardNameBuilder { - - private ShardNameBuilder() { - } - - /** - * Replace occurrences of uppercase letters 'N' with the given {code}shardCount{code}, - * left-padded with zeros if necessary. - * @see com.google.cloud.dataflow.sdk.io.ShardNameTemplate - * @param template the string template containing uppercase letters 'N' - * @param shardCount the total number of shards - * @return a string template with 'N' replaced by the shard count - */ - public static String replaceShardCount(String template, int shardCount) { - return replaceShardPattern(template, "N+", shardCount); - } - - /** - * Replace occurrences of uppercase letters 'S' with the given {code}shardNumber{code}, - * left-padded with zeros if necessary. - * @see com.google.cloud.dataflow.sdk.io.ShardNameTemplate - * @param template the string template containing uppercase letters 'S' - * @param shardNumber the number of a particular shard - * @return a string template with 'S' replaced by the shard number - */ - public static String replaceShardNumber(String template, int shardNumber) { - return replaceShardPattern(template, "S+", shardNumber); - } - - private static String replaceShardPattern(String template, String pattern, int n) { - Pattern p = Pattern.compile(pattern); - Matcher m = p.matcher(template); - StringBuffer sb = new StringBuffer(); - while (m.find()) { - // replace pattern with a String format string: - // index 1, zero-padding flag (0), width length of matched pattern, decimal conversion - m.appendReplacement(sb, "%1\\$0" + m.group().length() + "d"); - } - m.appendTail(sb); - return String.format(sb.toString(), n); - } - - /** - * @param pathPrefix a relative or absolute path - * @param template a template string - * @return the output directory for the given prefix, template and suffix - */ - public static String getOutputDirectory(String pathPrefix, String template) { - String out = new Path(pathPrefix + template).getParent().toString(); - if (out.isEmpty()) { - return "./"; - } - return out; - } - - /** - * @param pathPrefix a relative or absolute path - * @param template a template string - * @return the prefix of the output filename for the given path prefix and template - */ - public static String getOutputFilePrefix(String pathPrefix, String template) { - String name = new Path(pathPrefix + template).getName(); - if (name.endsWith(template)) { - return name.substring(0, name.length() - template.length()); - } else { - return ""; - } - } - - /** - * @param pathPrefix a relative or absolute path - * @param template a template string - * @return the template for the output filename for the given path prefix and - * template - */ - public static String getOutputFileTemplate(String pathPrefix, String template) { - String name = new Path(pathPrefix + template).getName(); - if (name.endsWith(template)) { - return template; - } else { - return name; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java deleted file mode 100644 index bb9a7a5..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -/** - * A marker interface that implementations of - * {@link org.apache.hadoop.mapreduce.lib.output.FileOutputFormat} implement to indicate - * that they produce shard names that adhere to the template in - * {@link com.cloudera.dataflow.hadoop.HadoopIO.Write}. - * - * Some common shard names are defined in - * {@link com.google.cloud.dataflow.sdk.io.ShardNameTemplate}. - */ -public interface ShardNameTemplateAware { -}
