http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java new file mode 100644 index 0000000..fd6f5da --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.io.hadoop; + +import java.io.IOException; + +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ShardNameTemplateHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class); + + public static final String OUTPUT_FILE_PREFIX = "spark.dataflow.fileoutputformat.prefix"; + public static final String OUTPUT_FILE_TEMPLATE = "spark.dataflow.fileoutputformat.template"; + public static final String OUTPUT_FILE_SUFFIX = "spark.dataflow.fileoutputformat.suffix"; + + private ShardNameTemplateHelper() { + } + + public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format, + TaskAttemptContext context) throws IOException { + FileOutputCommitter committer = + (FileOutputCommitter) format.getOutputCommitter(context); + return new Path(committer.getWorkPath(), getOutputFile(context)); + } + + private static String getOutputFile(TaskAttemptContext context) { + TaskID taskId = context.getTaskAttemptID().getTaskID(); + int partition = taskId.getId(); + + String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX); + String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE); + String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX); + return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java new file mode 100644 index 0000000..4feaff6 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.io.hadoop; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T> + implements ShardNameTemplateAware { + + @Override + public void checkOutputSpecs(JobContext job) { + // don't fail if the output already exists + } + + @Override + protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException { + Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context); + return path.getFileSystem(context.getConfiguration()).create(path); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java new file mode 100644 index 0000000..922b906 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.io.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V> + implements ShardNameTemplateAware { + + @Override + public void checkOutputSpecs(JobContext job) { + // don't fail if the output already exists + } + + @Override + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException { + // note that the passed-in extension is ignored since it comes from the template + return ShardNameTemplateHelper.getDefaultWorkFile(this, context); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java new file mode 100644 index 0000000..1e53dce --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.io.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V> + implements ShardNameTemplateAware { + + @Override + public void checkOutputSpecs(JobContext job) { + // don't fail if the output already exists + } + + @Override + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException { + // note that the passed-in extension is ignored since it comes from the template + return ShardNameTemplateHelper.getDefaultWorkFile(this, context); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java new file mode 100644 index 0000000..17edba3 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming; + +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; + +import org.apache.beam.runners.spark.SparkPipelineOptions; + +/** + * Options used to configure Spark streaming. + */ +public interface SparkStreamingPipelineOptions extends SparkPipelineOptions { + @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " + + "execution is stopped") + @Default.Long(-1) + Long getTimeout(); + + void setTimeout(Long batchInterval); + + @Override + @Default.Boolean(true) + boolean isStreaming(); + + @Override + @Default.String("spark streaming dataflow pipeline job") + String getAppName(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java new file mode 100644 index 0000000..822feb4 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming; + +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; + +public final class SparkStreamingPipelineOptionsFactory { + + private SparkStreamingPipelineOptionsFactory() { + } + + public static SparkStreamingPipelineOptions create() { + return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java new file mode 100644 index 0000000..2c5414d --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; +import com.google.common.collect.ImmutableList; + +public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar { + + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions + .class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java new file mode 100644 index 0000000..9d1d786 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming; + + +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; +import com.google.cloud.dataflow.sdk.values.PValue; + +import org.apache.beam.runners.spark.EvaluationContext; +import org.apache.beam.runners.spark.SparkRuntimeContext; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + + +/** + * Streaming evaluation context helps to handle streaming. + */ +public class StreamingEvaluationContext extends EvaluationContext { + + private final JavaStreamingContext jssc; + private final long timeout; + private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>(); + private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>(); + + public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, + JavaStreamingContext jssc, long timeout) { + super(jsc, pipeline); + this.jssc = jssc; + this.timeout = timeout; + } + + /** + * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for + * testing. + */ + private class DStreamHolder<T> { + + private Iterable<Iterable<T>> values; + private Coder<T> coder; + private JavaDStream<WindowedValue<T>> dStream; + + DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) { + this.values = values; + this.coder = coder; + } + + DStreamHolder(JavaDStream<WindowedValue<T>> dStream) { + this.dStream = dStream; + } + + @SuppressWarnings("unchecked") + JavaDStream<WindowedValue<T>> getDStream() { + if (dStream == null) { + // create the DStream from values + Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); + for (Iterable<T> v : values) { + setOutputRDDFromValues(currentTransform.getTransform(), v, coder); + rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform())); + } + // create dstream from queue, one at a time, no defaults + // mainly for unit test so no reason to have this configurable + dStream = jssc.queueStream(rddQueue, true); + } + return dStream; + } + } + + <T> void setDStreamFromQueue( + PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) { + pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder)); + } + + <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) { + PValue pvalue = (PValue) getOutput(transform); + DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream); + pstreams.put(pvalue, dStreamHolder); + leafStreams.add(dStreamHolder); + } + + boolean hasStream(PTransform<?, ?> transform) { + PValue pvalue = (PValue) getInput(transform); + return pstreams.containsKey(pvalue); + } + + JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) { + return getStream((PValue) getInput(transform)); + } + + JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) { + DStreamHolder<?> dStreamHolder = pstreams.get(pvalue); + JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream(); + leafStreams.remove(dStreamHolder); + return dStream; + } + + // used to set the RDD from the DStream in the RDDHolder for transformation + <T> void setInputRDD( + PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) { + setRDD((PValue) getInput(transform), rdd); + } + + // used to get the RDD transformation output and use it as the DStream transformation output + JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) { + return getRDD((PValue) getOutput(transform)); + } + + public JavaStreamingContext getStreamingContext() { + return jssc; + } + + @Override + protected void computeOutputs() { + for (DStreamHolder<?> streamHolder : leafStreams) { + computeOutput(streamHolder); + } + } + + private static <T> void computeOutput(DStreamHolder<T> streamHolder) { + streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() { + @Override + public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception { + rdd.rdd().cache(); + rdd.count(); + return null; + } + }); // force a DStream action + } + + @Override + public void close() { + if (timeout > 0) { + jssc.awaitTerminationOrTimeout(timeout); + } else { + jssc.awaitTermination(); + } + //TODO: stop gracefully ? + jssc.stop(false, false); + state = State.DONE; + super.close(); + } + + private State state = State.RUNNING; + + @Override + public State getState() { + return state; + } + + //---------------- override in order to expose in package + @Override + protected <I extends PInput> I getInput(PTransform<I, ?> transform) { + return super.getInput(transform); + } + @Override + protected <O extends POutput> O getOutput(PTransform<?, O> transform) { + return super.getOutput(transform); + } + + @Override + protected JavaSparkContext getSparkContext() { + return super.getSparkContext(); + } + + @Override + protected SparkRuntimeContext getRuntimeContext() { + return super.getRuntimeContext(); + } + + @Override + protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { + super.setCurrentTransform(transform); + } + + @Override + protected AppliedPTransform<?, ?, ?> getCurrentTransform() { + return super.getCurrentTransform(); + } + + @Override + protected <T> void setOutputRDD(PTransform<?, ?> transform, + JavaRDDLike<WindowedValue<T>, ?> rdd) { + super.setOutputRDD(transform, rdd); + } + + @Override + protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values, + Coder<T> coder) { + super.setOutputRDDFromValues(transform, values, coder); + } + + @Override + protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) { + return super.hasOutputRDD(transform); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java new file mode 100644 index 0000000..c78c7fa --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java @@ -0,0 +1,415 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.api.client.util.Lists; +import com.google.api.client.util.Maps; +import com.google.api.client.util.Sets; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import com.google.cloud.dataflow.sdk.io.AvroIO; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollectionList; +import com.google.cloud.dataflow.sdk.values.PDone; + +import com.google.common.reflect.TypeToken; +import kafka.serializer.Decoder; + +import org.apache.beam.runners.spark.DoFnFunction; +import org.apache.beam.runners.spark.EvaluationContext; +import org.apache.beam.runners.spark.SparkPipelineTranslator; +import org.apache.beam.runners.spark.TransformEvaluator; +import org.apache.beam.runners.spark.TransformTranslator; +import org.apache.beam.runners.spark.WindowingHelpers; +import org.apache.beam.runners.spark.io.ConsoleIO; +import org.apache.beam.runners.spark.io.CreateStream; +import org.apache.beam.runners.spark.io.KafkaIO; +import org.apache.beam.runners.spark.io.hadoop.HadoopIO; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; +import org.apache.spark.streaming.api.java.JavaPairInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; + +import scala.Tuple2; + + +/** + * Supports translation between a DataFlow transform, and Spark's operations on DStreams. + */ +public final class StreamingTransformTranslator { + + private StreamingTransformTranslator() { + } + + private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() { + return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() { + @Override + public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) { + @SuppressWarnings("unchecked") + JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream = + (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>) + ((StreamingEvaluationContext) context).getStream(transform); + dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum()); + } + }; + } + + private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() { + return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() { + @Override + public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + JavaStreamingContext jssc = sec.getStreamingContext(); + Class<K> keyClazz = transform.getKeyClass(); + Class<V> valueClazz = transform.getValueClass(); + Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass(); + Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass(); + Map<String, String> kafkaParams = transform.getKafkaParams(); + Set<String> topics = transform.getTopics(); + JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz, + valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics); + JavaDStream<WindowedValue<KV<K, V>>> inputStream = + inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() { + @Override + public KV<K, V> call(Tuple2<K, V> t2) throws Exception { + return KV.of(t2._1(), t2._2()); + } + }).map(WindowingHelpers.<KV<K, V>>windowFunction()); + sec.setStream(transform, inputStream); + } + }; + } + + private static <T> TransformEvaluator<Create.Values<T>> create() { + return new TransformEvaluator<Create.Values<T>>() { + @SuppressWarnings("unchecked") + @Override + public void evaluate(Create.Values<T> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + Iterable<T> elems = transform.getElements(); + Coder<T> coder = sec.getOutput(transform).getCoder(); + if (coder != VoidCoder.of()) { + // actual create + sec.setOutputRDDFromValues(transform, elems, coder); + } else { + // fake create as an input + // creates a stream with a single batch containing a single null element + // to invoke following transformations once + // to support DataflowAssert + sec.setDStreamFromQueue(transform, + Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)), + (Coder<Void>) coder); + } + } + }; + } + + private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() { + return new TransformEvaluator<CreateStream.QueuedValues<T>>() { + @Override + public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + Iterable<Iterable<T>> values = transform.getQueuedValues(); + Coder<T> coder = sec.getOutput(transform).getCoder(); + sec.setDStreamFromQueue(transform, values, coder); + } + }; + } + + private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() { + return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() { + @SuppressWarnings("unchecked") + @Override + public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + PCollectionList<T> pcs = sec.getInput(transform); + JavaDStream<WindowedValue<T>> first = + (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0)); + List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1); + for (int i = 1; i < pcs.size(); i++) { + rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i))); + } + JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest); + sec.setStream(transform, dstream); + } + }; + } + + private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform( + final SparkPipelineTranslator rddTranslator) { + return new TransformEvaluator<PT>() { + @SuppressWarnings("unchecked") + @Override + public void evaluate(PT transform, EvaluationContext context) { + TransformEvaluator<PT> rddEvaluator = + rddTranslator.translate((Class<PT>) transform.getClass()); + + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + if (sec.hasStream(transform)) { + JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream = + (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) + sec.getStream(transform); + + sec.setStream(transform, dStream + .transform(new RDDTransform<>(sec, rddEvaluator, transform))); + } else { + // if the transformation requires direct access to RDD (not in stream) + // this is used for "fake" transformations like with DataflowAssert + rddEvaluator.evaluate(transform, context); + } + } + }; + } + + /** + * RDD transform function If the transformation function doesn't have an input, create a fake one + * as an empty RDD. + * + * @param <PT> PTransform type + */ + private static final class RDDTransform<PT extends PTransform<?, ?>> + implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> { + + private final StreamingEvaluationContext context; + private final AppliedPTransform<?, ?, ?> appliedPTransform; + private final TransformEvaluator<PT> rddEvaluator; + private final PT transform; + + + private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator, + PT transform) { + this.context = context; + this.appliedPTransform = context.getCurrentTransform(); + this.rddEvaluator = rddEvaluator; + this.transform = transform; + } + + @Override + @SuppressWarnings("unchecked") + public JavaRDD<WindowedValue<Object>> + call(JavaRDD<WindowedValue<Object>> rdd) throws Exception { + AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform(); + context.setCurrentTransform(appliedPTransform); + context.setInputRDD(transform, rdd); + rddEvaluator.evaluate(transform, context); + if (!context.hasOutputRDD(transform)) { + // fake RDD as output + context.setOutputRDD(transform, + context.getSparkContext().<WindowedValue<Object>>emptyRDD()); + } + JavaRDD<WindowedValue<Object>> outRDD = + (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform); + context.setCurrentTransform(existingAPT); + return outRDD; + } + } + + @SuppressWarnings("unchecked") + private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD( + final SparkPipelineTranslator rddTranslator) { + return new TransformEvaluator<PT>() { + @Override + public void evaluate(PT transform, EvaluationContext context) { + TransformEvaluator<PT> rddEvaluator = + rddTranslator.translate((Class<PT>) transform.getClass()); + + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + if (sec.hasStream(transform)) { + JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream = + (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) + sec.getStream(transform); + + dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform)); + } else { + rddEvaluator.evaluate(transform, context); + } + } + }; + } + + /** + * RDD output function. + * + * @param <PT> PTransform type + */ + private static final class RDDOutputOperator<PT extends PTransform<?, ?>> + implements Function<JavaRDD<WindowedValue<Object>>, Void> { + + private final StreamingEvaluationContext context; + private final AppliedPTransform<?, ?, ?> appliedPTransform; + private final TransformEvaluator<PT> rddEvaluator; + private final PT transform; + + + private RDDOutputOperator(StreamingEvaluationContext context, + TransformEvaluator<PT> rddEvaluator, PT transform) { + this.context = context; + this.appliedPTransform = context.getCurrentTransform(); + this.rddEvaluator = rddEvaluator; + this.transform = transform; + } + + @Override + @SuppressWarnings("unchecked") + public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception { + AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform(); + context.setCurrentTransform(appliedPTransform); + context.setInputRDD(transform, rdd); + rddEvaluator.evaluate(transform, context); + context.setCurrentTransform(existingAPT); + return null; + } + } + + private static final TransformTranslator.FieldGetter WINDOW_FG = + new TransformTranslator.FieldGetter(Window.Bound.class); + + private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() { + return new TransformEvaluator<Window.Bound<T>>() { + @Override + public void evaluate(Window.Bound<T> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + //--- first we apply windowing to the stream + WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform); + @SuppressWarnings("unchecked") + JavaDStream<WindowedValue<T>> dStream = + (JavaDStream<WindowedValue<T>>) sec.getStream(transform); + if (windowFn instanceof FixedWindows) { + Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize() + .getMillis()); + sec.setStream(transform, dStream.window(windowDuration)); + } else if (windowFn instanceof SlidingWindows) { + Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize() + .getMillis()); + Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod() + .getMillis()); + sec.setStream(transform, dStream.window(windowDuration, slideDuration)); + } + //--- then we apply windowing to the elements + DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); + DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn, + ((StreamingEvaluationContext)context).getRuntimeContext(), null); + @SuppressWarnings("unchecked") + JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream = + (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>) + sec.getStream(transform); + sec.setStream(transform, dstream.mapPartitions(dofn)); + } + }; + } + + private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps + .newHashMap(); + + static { + EVALUATORS.put(ConsoleIO.Write.Unbound.class, print()); + EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue()); + EVALUATORS.put(Create.Values.class, create()); + EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka()); + EVALUATORS.put(Window.Bound.class, window()); + EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl()); + } + + private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets + .newHashSet(); + + static { + //TODO - add support for the following + UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class); + UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class); + UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class); + UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class); + UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class); + UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class); + } + + @SuppressWarnings("unchecked") + private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> + getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) { + TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz); + if (transform == null) { + if (UNSUPPORTED_EVALUATORS.contains(clazz)) { + throw new UnsupportedOperationException("Dataflow transformation " + clazz + .getCanonicalName() + + " is currently unsupported by the Spark streaming pipeline"); + } + // DStream transformations will transform an RDD into another RDD + // Actions will create output + // In Dataflow it depends on the PTransform's Input and Output class + Class<?> pTOutputClazz = getPTransformOutputClazz(clazz); + if (PDone.class.equals(pTOutputClazz)) { + return foreachRDD(rddTranslator); + } else { + return rddTransform(rddTranslator); + } + } + return transform; + } + + private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) { + Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments(); + return TypeToken.of(clazz).resolveType(types[1]).getRawType(); + } + + /** + * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator. + * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation + */ + public static class Translator implements SparkPipelineTranslator { + + private final SparkPipelineTranslator rddTranslator; + + public Translator(SparkPipelineTranslator rddTranslator) { + this.rddTranslator = rddTranslator; + } + + @Override + public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) { + // streaming includes rdd transformations as well + return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz); + } + + @Override + public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) { + return getTransformEvaluator(clazz, rddTranslator); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java new file mode 100644 index 0000000..6844011 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.streaming; + +import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; + +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkPipelineTranslator; +import org.apache.beam.runners.spark.TransformTranslator; + +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; + + +/** + * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing. + */ +public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator { + + // Currently, Spark streaming recommends batches no smaller then 500 msec + private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500); + + private boolean windowing; + private Duration batchDuration; + + public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) { + super(translator); + } + + private static final TransformTranslator.FieldGetter WINDOW_FG = + new TransformTranslator.FieldGetter(Window.Bound.class); + + // Use the smallest window (fixed or sliding) as Spark streaming's batch duration + @Override + protected <PT extends PTransform<? super PInput, POutput>> void + doVisitTransform(TransformTreeNode node) { + @SuppressWarnings("unchecked") + PT transform = (PT) node.getTransform(); + @SuppressWarnings("unchecked") + Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass(); + if (transformClass.isAssignableFrom(Window.Bound.class)) { + WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform); + if (windowFn instanceof FixedWindows) { + setBatchDuration(((FixedWindows) windowFn).getSize()); + } else if (windowFn instanceof SlidingWindows) { + if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) { + throw new UnsupportedOperationException("Spark does not support window offsets"); + } + // Sliding window size might as well set the batch duration. Applying the transformation + // will add the "slide" + setBatchDuration(((SlidingWindows) windowFn).getSize()); + } else if (!(windowFn instanceof GlobalWindows)) { + throw new IllegalStateException("Windowing function not supported: " + windowFn); + } + } + } + + private void setBatchDuration(org.joda.time.Duration duration) { + Long durationMillis = duration.getMillis(); + // validate window size + if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) { + throw new IllegalArgumentException("Windowing of size " + durationMillis + + "msec is not supported!"); + } + // choose the smallest duration to be Spark's batch duration, larger ones will be handled + // as window functions over the batched-stream + if (!windowing || this.batchDuration.milliseconds() > durationMillis) { + this.batchDuration = Durations.milliseconds(durationMillis); + } + windowing = true; + } + + public boolean isWindowing() { + return windowing; + } + + public Duration getBatchDuration() { + return batchDuration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java new file mode 100644 index 0000000..af831c6 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.util; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.Serializable; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BroadcastHelper<T> implements Serializable { + + /** + * If the property {@code dataflow.spark.directBroadcast} is set to + * {@code true} then Spark serialization (Kryo) will be used to broadcast values + * in View objects. By default this property is not set, and values are coded using + * the appropriate {@link Coder}. + */ + public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast"; + + private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class); + + public static <T> BroadcastHelper<T> create(T value, Coder<T> coder) { + if (Boolean.parseBoolean(System.getProperty(DIRECT_BROADCAST, "false"))) { + return new DirectBroadcastHelper<>(value); + } + return new CodedBroadcastHelper<>(value, coder); + } + + public abstract T getValue(); + + public abstract void broadcast(JavaSparkContext jsc); + + /** + * A {@link BroadcastHelper} that relies on the underlying + * Spark serialization (Kryo) to broadcast values. This is appropriate when + * broadcasting very large values, since no copy of the object is made. + * @param <T> + */ + static class DirectBroadcastHelper<T> extends BroadcastHelper<T> { + private Broadcast<T> bcast; + private transient T value; + + DirectBroadcastHelper(T value) { + this.value = value; + } + + @Override + public synchronized T getValue() { + if (value == null) { + value = bcast.getValue(); + } + return value; + } + + @Override + public void broadcast(JavaSparkContext jsc) { + this.bcast = jsc.broadcast(value); + } + } + + /** + * A {@link BroadcastHelper} that uses a + * {@link Coder} to encode values as byte arrays + * before broadcasting. + * @param <T> + */ + static class CodedBroadcastHelper<T> extends BroadcastHelper<T> { + private Broadcast<byte[]> bcast; + private final Coder<T> coder; + private transient T value; + + CodedBroadcastHelper(T value, Coder<T> coder) { + this.value = value; + this.coder = coder; + } + + @Override + public synchronized T getValue() { + if (value == null) { + value = deserialize(); + } + return value; + } + + @Override + public void broadcast(JavaSparkContext jsc) { + this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder)); + } + + private T deserialize() { + T val; + try { + val = coder.decode(new ByteArrayInputStream(bcast.value()), + new Coder.Context(true)); + } catch (IOException ioe) { + // this should not ever happen, log it if it does. + LOG.warn(ioe.getMessage()); + val = null; + } + return val; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java new file mode 100644 index 0000000..7679b9c --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.util; + +import java.io.Serializable; +import java.util.Arrays; + +import com.google.common.primitives.UnsignedBytes; + +public class ByteArray implements Serializable, Comparable<ByteArray> { + + private final byte[] value; + + public ByteArray(byte[] value) { + this.value = value; + } + + public byte[] getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + ByteArray byteArray = (ByteArray) o; + return Arrays.equals(value, byteArray.value); + } + + @Override + public int hashCode() { + return value != null ? Arrays.hashCode(value) : 0; + } + + @Override + public int compareTo(ByteArray other) { + return UnsignedBytes.lexicographicalComparator().compare(value, other.value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar index 5733a86..98387a6 100644 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar +++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -com.cloudera.dataflow.spark.SparkPipelineOptionsRegistrar -com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptionsRegistrar \ No newline at end of file +org.apache.beam.runners.spark.SparkPipelineOptionsRegistrar +org.apache.beam.runners.spark.streaming.SparkStreamingPipelineOptionsRegistrar \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar index 26e0b3a..972b1a3 100644 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar +++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -com.cloudera.dataflow.spark.SparkPipelineRunnerRegistrar \ No newline at end of file +org.apache.beam.runners.spark.SparkPipelineRunnerRegistrar \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java deleted file mode 100644 index 29a73b6..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.hadoop; - -import com.google.cloud.dataflow.sdk.testing.CoderProperties; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.NullWritable; -import org.junit.Test; - -/** - * Tests for WritableCoder. - */ -public class WritableCoderTest { - - @Test - public void testIntWritableEncoding() throws Exception { - IntWritable value = new IntWritable(42); - WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, value); - } - - @Test - public void testNullWritableEncoding() throws Exception { - WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java deleted file mode 100644 index ea4cc38..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.AvroIO; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.Lists; -import com.google.common.io.Resources; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.List; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import static org.junit.Assert.assertEquals; - -public class AvroPipelineTest { - - private File inputFile; - private File outputDir; - - @Rule - public final TemporaryFolder tmpDir = new TemporaryFolder(); - - @Before - public void setUp() throws IOException { - inputFile = tmpDir.newFile("test.avro"); - outputDir = tmpDir.newFolder("out"); - outputDir.delete(); - } - - @Test - public void testGeneric() throws Exception { - Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream()); - GenericRecord savedRecord = new GenericData.Record(schema); - savedRecord.put("name", "John Doe"); - savedRecord.put("age", 42); - savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), schema); - - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<GenericRecord> input = p.apply( - AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema)); - input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); - EvaluationResult res = SparkPipelineRunner.create().run(p); - res.close(); - - List<GenericRecord> records = readGenericFile(); - assertEquals(Lists.newArrayList(savedRecord), records); - } - - private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { - FileOutputStream outputStream = new FileOutputStream(this.inputFile); - GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema); - - try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(genericDatumWriter)) { - dataFileWriter.create(schema, outputStream); - for (GenericRecord record : genericRecords) { - dataFileWriter.append(record); - } - } - outputStream.close(); - } - - private List<GenericRecord> readGenericFile() throws IOException { - List<GenericRecord> records = Lists.newArrayList(); - GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(); - try (DataFileReader<GenericRecord> dataFileReader = - new DataFileReader<>(new File(outputDir + "-00000-of-00001"), genericDatumReader)) { - for (GenericRecord record : dataFileReader) { - records.add(record); - } - } - return records; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java deleted file mode 100644 index 667e949..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.Iterables; -import java.util.Arrays; -import java.util.List; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class CombineGloballyTest { - - private static final String[] WORDS_ARRAY = { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - - @Test - public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - Pipeline p = Pipeline.create(options); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger())); - - EvaluationResult res = SparkPipelineRunner.create().run(p); - assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output))); - res.close(); - } - - public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> { - - @Override - public StringBuilder createAccumulator() { - // return null to differentiate from an empty string - return null; - } - - @Override - public StringBuilder addInput(StringBuilder accumulator, String input) { - return combine(accumulator, input); - } - - @Override - public StringBuilder mergeAccumulators(Iterable<StringBuilder> accumulators) { - StringBuilder sb = new StringBuilder(); - for (StringBuilder accum : accumulators) { - if (accum != null) { - sb.append(accum); - } - } - return sb; - } - - @Override - public String extractOutput(StringBuilder accumulator) { - return accumulator.toString(); - } - - private static StringBuilder combine(StringBuilder accum, String datum) { - if (accum == null) { - return new StringBuilder(datum); - } else { - accum.append(",").append(datum); - return accum; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java deleted file mode 100644 index f9d5b46..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.coders.VarLongCoder; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.ImmutableList; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class CombinePerKeyTest { - - private static final List<String> WORDS = - ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog"); - @Test - public void testRun() { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>()); - EvaluationResult res = SparkPipelineRunner.create().run(p); - Map<String, Long> actualCnts = new HashMap<>(); - for (KV<String, Long> kv : res.get(cnts)) { - actualCnts.put(kv.getKey(), kv.getValue()); - } - res.close(); - Assert.assertEquals(8, actualCnts.size()); - Assert.assertEquals(Long.valueOf(2L), actualCnts.get("the")); - } - - private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> { - @Override - public PCollection<KV<T, Long>> apply(PCollection<T> pcol) { - PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() { - @Override - public void processElement(ProcessContext processContext) throws Exception { - processContext.output(KV.of(processContext.element(), 1L)); - } - })).setCoder(KvCoder.of(pcol.getCoder(), VarLongCoder.of())); - return withLongs.apply(Sum.<T>longsPerKey()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java deleted file mode 100644 index 7495aeb..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.ImmutableSet; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import org.junit.Test; - -/** - * A test based on {@code DeDupExample} from the SDK. - */ -public class DeDupTest { - - private static final String[] LINES_ARRAY = { - "hi there", "hello", "hi there", - "hi", "hello"}; - private static final List<String> LINES = Arrays.asList(LINES_ARRAY); - private static final Set<String> EXPECTED_SET = - ImmutableSet.of("hi there", "hi", "hello"); - - @Test - public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - options.setRunner(SparkPipelineRunner.class); - Pipeline p = Pipeline.create(options); - PCollection<String> input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()); - PCollection<String> output = input.apply(RemoveDuplicates.<String>create()); - - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_SET); - - EvaluationResult res = SparkPipelineRunner.create().run(p); - res.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java deleted file mode 100644 index 2b0947f..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; -import java.io.Serializable; -import org.junit.Test; - -public class DoFnOutputTest implements Serializable { - @Test - public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - options.setRunner(SparkPipelineRunner.class); - Pipeline pipeline = Pipeline.create(options); - - PCollection<String> strings = pipeline.apply(Create.of("a")); - // Test that values written from startBundle() and finishBundle() are written to - // the output - PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() { - @Override - public void startBundle(Context c) throws Exception { - c.output("start"); - } - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element()); - } - @Override - public void finishBundle(Context c) throws Exception { - c.output("finish"); - } - })); - - DataflowAssert.that(output).containsInAnyOrder("start", "a", "finish"); - - EvaluationResult res = SparkPipelineRunner.create().run(pipeline); - res.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java deleted file mode 100644 index 6c89ca1..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.Iterables; -import java.util.Collections; -import java.util.List; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class EmptyInputTest { - - @Test - public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - Pipeline p = Pipeline.create(options); - List<String> empty = Collections.emptyList(); - PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of()); - PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords())); - - EvaluationResult res = SparkPipelineRunner.create().run(p); - assertEquals("", Iterables.getOnlyElement(res.get(output))); - res.close(); - } - - public static class ConcatWords implements SerializableFunction<Iterable<String>, String> { - @Override - public String apply(Iterable<String> input) { - StringBuilder all = new StringBuilder(); - for (String item : input) { - if (!item.isEmpty()) { - if (all.length() == 0) { - all.append(item); - } else { - all.append(","); - all.append(item); - } - } - } - return all.toString(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java deleted file mode 100644 index 579ada5..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.cloudera.dataflow.hadoop.HadoopIO; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import java.io.File; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import static org.junit.Assert.assertEquals; - -public class HadoopFileFormatPipelineTest { - - private File inputFile; - private File outputFile; - - @Rule - public final TemporaryFolder tmpDir = new TemporaryFolder(); - - @Before - public void setUp() throws IOException { - inputFile = tmpDir.newFile("test.seq"); - outputFile = tmpDir.newFolder("out"); - outputFile.delete(); - } - - @Test - public void testSequenceFile() throws Exception { - populateFile(); - - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - @SuppressWarnings("unchecked") - Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = - (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class; - HadoopIO.Read.Bound<IntWritable,Text> read = - HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class); - PCollection<KV<IntWritable, Text>> input = p.apply(read); - @SuppressWarnings("unchecked") - Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass = - (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class; - @SuppressWarnings("unchecked") - HadoopIO.Write.Bound<IntWritable,Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), - outputFormatClass, IntWritable.class, Text.class); - input.apply(write.withoutSharding()); - EvaluationResult res = SparkPipelineRunner.create().run(p); - res.close(); - - IntWritable key = new IntWritable(); - Text value = new Text(); - try (Reader reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())))) { - int i = 0; - while (reader.next(key, value)) { - assertEquals(i, key.get()); - assertEquals("value-" + i, value.toString()); - i++; - } - } - } - - private void populateFile() throws IOException { - IntWritable key = new IntWritable(); - Text value = new Text(); - try (Writer writer = SequenceFile.createWriter( - new Configuration(), - Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), - Writer.file(new Path(this.inputFile.toURI())))) { - for (int i = 0; i < 5; i++) { - key.set(i); - value.set("value-" + i); - writer.append(key, value); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java deleted file mode 100644 index 2df8493..0000000 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.ApproximateUnique; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.Max; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; -import com.google.cloud.dataflow.sdk.values.PCollectionTuple; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.cloud.dataflow.sdk.values.TupleTagList; -import com.google.common.collect.Iterables; -import org.junit.Assert; -import org.junit.Test; - -public class MultiOutputWordCountTest { - - private static final TupleTag<String> upper = new TupleTag<>(); - private static final TupleTag<String> lower = new TupleTag<>(); - private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>(); - private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>(); - - @Test - public void testRun() throws Exception { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+")); - PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others")); - PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words")); - PCollectionList<String> list = PCollectionList.of(w1).and(w2); - - PCollection<String> union = list.apply(Flatten.<String>pCollections()); - PCollectionView<String> regexView = regex.apply(View.<String>asSingleton()); - CountWords countWords = new CountWords(regexView); - PCollectionTuple luc = union.apply(countWords); - PCollection<Long> unique = luc.get(lowerCnts).apply( - ApproximateUnique.<KV<String, Long>>globally(16)); - - EvaluationResult res = SparkPipelineRunner.create().run(p); - Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts)); - Assert.assertEquals("are", actualLower.iterator().next().getKey()); - Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts)); - Assert.assertEquals("Here", actualUpper.iterator().next().getKey()); - Iterable<Long> actualUniqCount = res.get(unique); - Assert.assertEquals(9, (long) actualUniqCount.iterator().next()); - int actualTotalWords = res.getAggregatorValue("totalWords", Integer.class); - Assert.assertEquals(18, actualTotalWords); - int actualMaxWordLength = res.getAggregatorValue("maxWordLength", Integer.class); - Assert.assertEquals(6, actualMaxWordLength); - AggregatorValues<Integer> aggregatorValues = res.getAggregatorValues(countWords - .getTotalWordsAggregator()); - Assert.assertEquals(18, Iterables.getOnlyElement(aggregatorValues.getValues()).intValue()); - - res.close(); - } - - /** - * A DoFn that tokenizes lines of text into individual words. - */ - static class ExtractWordsFn extends DoFn<String, String> { - - private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords", - new Sum.SumIntegerFn()); - private final Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength", - new Max.MaxIntegerFn()); - private final PCollectionView<String> regex; - - ExtractWordsFn(PCollectionView<String> regex) { - this.regex = regex; - } - - @Override - public void processElement(ProcessContext c) { - String[] words = c.element().split(c.sideInput(regex)); - for (String word : words) { - totalWords.addValue(1); - if (!word.isEmpty()) { - maxWordLength.addValue(word.length()); - if (Character.isLowerCase(word.charAt(0))) { - c.output(word); - } else { - c.sideOutput(upper, word); - } - } - } - } - } - - public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> { - - private final PCollectionView<String> regex; - private final ExtractWordsFn extractWordsFn; - - public CountWords(PCollectionView<String> regex) { - this.regex = regex; - this.extractWordsFn = new ExtractWordsFn(regex); - } - - @Override - public PCollectionTuple apply(PCollection<String> lines) { - // Convert lines of text into individual words. - PCollectionTuple lowerUpper = lines - .apply(ParDo.of(extractWordsFn) - .withSideInputs(regex) - .withOutputTags(lower, TupleTagList.of(upper))); - lowerUpper.get(lower).setCoder(StringUtf8Coder.of()); - lowerUpper.get(upper).setCoder(StringUtf8Coder.of()); - PCollection<KV<String, Long>> lowerCounts = lowerUpper.get(lower).apply(Count - .<String>perElement()); - PCollection<KV<String, Long>> upperCounts = lowerUpper.get(upper).apply(Count - .<String>perElement()); - return PCollectionTuple - .of(lowerCnts, lowerCounts) - .and(upperCnts, upperCounts); - } - - Aggregator<Integer, Integer> getTotalWordsAggregator() { - return extractWordsFn.totalWords; - } - } -}
