http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java new file mode 100644 index 0000000..6ba04b7 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation; + +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import org.apache.spark.api.java.function.Function; + +/** + * Helper functions for working with windows. + */ +public final class WindowingHelpers { + private WindowingHelpers() { + } + + /** + * A function for converting a value to a {@link WindowedValue}. The resulting + * {@link WindowedValue} will be in no windows, and will have the default timestamp + * and pane. + * + * @param <T> The type of the object. + * @return A function that accepts an object and returns its {@link WindowedValue}. + */ + public static <T> Function<T, WindowedValue<T>> windowFunction() { + return new Function<T, WindowedValue<T>>() { + @Override + public WindowedValue<T> call(T t) { + return WindowedValue.valueInEmptyWindows(t); + } + }; + } + + /** + * A function for extracting the value from a {@link WindowedValue}. + * + * @param <T> The type of the object. + * @return A function that accepts a {@link WindowedValue} and returns its value. + */ + public static <T> Function<WindowedValue<T>, T> unwindowFunction() { + return new Function<WindowedValue<T>, T>() { + @Override + public T call(WindowedValue<T> t) { + return t.getValue(); + } + }; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java new file mode 100644 index 0000000..2c34caf --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming; + +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; + +public final class SparkStreamingPipelineOptionsFactory { + + private SparkStreamingPipelineOptionsFactory() { + } + + public static SparkStreamingPipelineOptions create() { + return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java new file mode 100644 index 0000000..e39d3ed --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkStreamingPipelineOptionsRegistrar.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; +import com.google.common.collect.ImmutableList; +import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; + +public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar { + + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions + .class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java new file mode 100644 index 0000000..0e87355 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming; + + +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; +import com.google.cloud.dataflow.sdk.values.PValue; + +import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkRuntimeContext; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + + +/** + * Streaming evaluation context helps to handle streaming. + */ +public class StreamingEvaluationContext extends EvaluationContext { + + private final JavaStreamingContext jssc; + private final long timeout; + private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>(); + private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>(); + + public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, + JavaStreamingContext jssc, long timeout) { + super(jsc, pipeline); + this.jssc = jssc; + this.timeout = timeout; + } + + /** + * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for + * testing. + */ + private class DStreamHolder<T> { + + private Iterable<Iterable<T>> values; + private Coder<T> coder; + private JavaDStream<WindowedValue<T>> dStream; + + DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) { + this.values = values; + this.coder = coder; + } + + DStreamHolder(JavaDStream<WindowedValue<T>> dStream) { + this.dStream = dStream; + } + + @SuppressWarnings("unchecked") + JavaDStream<WindowedValue<T>> getDStream() { + if (dStream == null) { + // create the DStream from values + Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); + for (Iterable<T> v : values) { + setOutputRDDFromValues(currentTransform.getTransform(), v, coder); + rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform())); + } + // create dstream from queue, one at a time, no defaults + // mainly for unit test so no reason to have this configurable + dStream = jssc.queueStream(rddQueue, true); + } + return dStream; + } + } + + <T> void setDStreamFromQueue( + PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) { + pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder)); + } + + <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) { + PValue pvalue = (PValue) getOutput(transform); + DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream); + pstreams.put(pvalue, dStreamHolder); + leafStreams.add(dStreamHolder); + } + + boolean hasStream(PTransform<?, ?> transform) { + PValue pvalue = (PValue) getInput(transform); + return pstreams.containsKey(pvalue); + } + + JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) { + return getStream((PValue) getInput(transform)); + } + + JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) { + DStreamHolder<?> dStreamHolder = pstreams.get(pvalue); + JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream(); + leafStreams.remove(dStreamHolder); + return dStream; + } + + // used to set the RDD from the DStream in the RDDHolder for transformation + <T> void setInputRDD( + PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) { + setRDD((PValue) getInput(transform), rdd); + } + + // used to get the RDD transformation output and use it as the DStream transformation output + JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) { + return getRDD((PValue) getOutput(transform)); + } + + public JavaStreamingContext getStreamingContext() { + return jssc; + } + + @Override + public void computeOutputs() { + for (DStreamHolder<?> streamHolder : leafStreams) { + computeOutput(streamHolder); + } + } + + private static <T> void computeOutput(DStreamHolder<T> streamHolder) { + streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() { + @Override + public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception { + rdd.rdd().cache(); + rdd.count(); + return null; + } + }); // force a DStream action + } + + @Override + public void close() { + if (timeout > 0) { + jssc.awaitTerminationOrTimeout(timeout); + } else { + jssc.awaitTermination(); + } + //TODO: stop gracefully ? + jssc.stop(false, false); + state = State.DONE; + super.close(); + } + + private State state = State.RUNNING; + + @Override + public State getState() { + return state; + } + + //---------------- override in order to expose in package + @Override + protected <I extends PInput> I getInput(PTransform<I, ?> transform) { + return super.getInput(transform); + } + @Override + protected <O extends POutput> O getOutput(PTransform<?, O> transform) { + return super.getOutput(transform); + } + + @Override + protected JavaSparkContext getSparkContext() { + return super.getSparkContext(); + } + + @Override + protected SparkRuntimeContext getRuntimeContext() { + return super.getRuntimeContext(); + } + + @Override + protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { + super.setCurrentTransform(transform); + } + + @Override + protected AppliedPTransform<?, ?, ?> getCurrentTransform() { + return super.getCurrentTransform(); + } + + @Override + protected <T> void setOutputRDD(PTransform<?, ?> transform, + JavaRDDLike<WindowedValue<T>, ?> rdd) { + super.setOutputRDD(transform, rdd); + } + + @Override + protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values, + Coder<T> coder) { + super.setOutputRDDFromValues(transform, values, coder); + } + + @Override + protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) { + return super.hasOutputRDD(transform); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java new file mode 100644 index 0000000..349bb7c --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.api.client.util.Lists; +import com.google.api.client.util.Maps; +import com.google.api.client.util.Sets; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import com.google.cloud.dataflow.sdk.io.AvroIO; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollectionList; +import com.google.cloud.dataflow.sdk.values.PDone; + +import com.google.common.reflect.TypeToken; +import kafka.serializer.Decoder; + +import org.apache.beam.runners.spark.io.ConsoleIO; +import org.apache.beam.runners.spark.io.CreateStream; +import org.apache.beam.runners.spark.io.KafkaIO; +import org.apache.beam.runners.spark.io.hadoop.HadoopIO; +import org.apache.beam.runners.spark.translation.DoFnFunction; +import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; +import org.apache.beam.runners.spark.translation.TransformEvaluator; +import org.apache.beam.runners.spark.translation.TransformTranslator; +import org.apache.beam.runners.spark.translation.WindowingHelpers; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; +import org.apache.spark.streaming.api.java.JavaPairInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; + +import scala.Tuple2; + + +/** + * Supports translation between a DataFlow transform, and Spark's operations on DStreams. + */ +public final class StreamingTransformTranslator { + + private StreamingTransformTranslator() { + } + + private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() { + return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() { + @Override + public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) { + @SuppressWarnings("unchecked") + JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream = + (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>) + ((StreamingEvaluationContext) context).getStream(transform); + dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum()); + } + }; + } + + private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() { + return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() { + @Override + public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + JavaStreamingContext jssc = sec.getStreamingContext(); + Class<K> keyClazz = transform.getKeyClass(); + Class<V> valueClazz = transform.getValueClass(); + Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass(); + Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass(); + Map<String, String> kafkaParams = transform.getKafkaParams(); + Set<String> topics = transform.getTopics(); + JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz, + valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics); + JavaDStream<WindowedValue<KV<K, V>>> inputStream = + inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() { + @Override + public KV<K, V> call(Tuple2<K, V> t2) throws Exception { + return KV.of(t2._1(), t2._2()); + } + }).map(WindowingHelpers.<KV<K, V>>windowFunction()); + sec.setStream(transform, inputStream); + } + }; + } + + private static <T> TransformEvaluator<Create.Values<T>> create() { + return new TransformEvaluator<Create.Values<T>>() { + @SuppressWarnings("unchecked") + @Override + public void evaluate(Create.Values<T> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + Iterable<T> elems = transform.getElements(); + Coder<T> coder = sec.getOutput(transform).getCoder(); + if (coder != VoidCoder.of()) { + // actual create + sec.setOutputRDDFromValues(transform, elems, coder); + } else { + // fake create as an input + // creates a stream with a single batch containing a single null element + // to invoke following transformations once + // to support DataflowAssert + sec.setDStreamFromQueue(transform, + Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)), + (Coder<Void>) coder); + } + } + }; + } + + private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() { + return new TransformEvaluator<CreateStream.QueuedValues<T>>() { + @Override + public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + Iterable<Iterable<T>> values = transform.getQueuedValues(); + Coder<T> coder = sec.getOutput(transform).getCoder(); + sec.setDStreamFromQueue(transform, values, coder); + } + }; + } + + private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() { + return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() { + @SuppressWarnings("unchecked") + @Override + public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + PCollectionList<T> pcs = sec.getInput(transform); + JavaDStream<WindowedValue<T>> first = + (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0)); + List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1); + for (int i = 1; i < pcs.size(); i++) { + rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i))); + } + JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest); + sec.setStream(transform, dstream); + } + }; + } + + private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform( + final SparkPipelineTranslator rddTranslator) { + return new TransformEvaluator<PT>() { + @SuppressWarnings("unchecked") + @Override + public void evaluate(PT transform, EvaluationContext context) { + TransformEvaluator<PT> rddEvaluator = + rddTranslator.translate((Class<PT>) transform.getClass()); + + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + if (sec.hasStream(transform)) { + JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream = + (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) + sec.getStream(transform); + + sec.setStream(transform, dStream + .transform(new RDDTransform<>(sec, rddEvaluator, transform))); + } else { + // if the transformation requires direct access to RDD (not in stream) + // this is used for "fake" transformations like with DataflowAssert + rddEvaluator.evaluate(transform, context); + } + } + }; + } + + /** + * RDD transform function If the transformation function doesn't have an input, create a fake one + * as an empty RDD. + * + * @param <PT> PTransform type + */ + private static final class RDDTransform<PT extends PTransform<?, ?>> + implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> { + + private final StreamingEvaluationContext context; + private final AppliedPTransform<?, ?, ?> appliedPTransform; + private final TransformEvaluator<PT> rddEvaluator; + private final PT transform; + + + private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator, + PT transform) { + this.context = context; + this.appliedPTransform = context.getCurrentTransform(); + this.rddEvaluator = rddEvaluator; + this.transform = transform; + } + + @Override + @SuppressWarnings("unchecked") + public JavaRDD<WindowedValue<Object>> + call(JavaRDD<WindowedValue<Object>> rdd) throws Exception { + AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform(); + context.setCurrentTransform(appliedPTransform); + context.setInputRDD(transform, rdd); + rddEvaluator.evaluate(transform, context); + if (!context.hasOutputRDD(transform)) { + // fake RDD as output + context.setOutputRDD(transform, + context.getSparkContext().<WindowedValue<Object>>emptyRDD()); + } + JavaRDD<WindowedValue<Object>> outRDD = + (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform); + context.setCurrentTransform(existingAPT); + return outRDD; + } + } + + @SuppressWarnings("unchecked") + private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD( + final SparkPipelineTranslator rddTranslator) { + return new TransformEvaluator<PT>() { + @Override + public void evaluate(PT transform, EvaluationContext context) { + TransformEvaluator<PT> rddEvaluator = + rddTranslator.translate((Class<PT>) transform.getClass()); + + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + if (sec.hasStream(transform)) { + JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream = + (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) + sec.getStream(transform); + + dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform)); + } else { + rddEvaluator.evaluate(transform, context); + } + } + }; + } + + /** + * RDD output function. + * + * @param <PT> PTransform type + */ + private static final class RDDOutputOperator<PT extends PTransform<?, ?>> + implements Function<JavaRDD<WindowedValue<Object>>, Void> { + + private final StreamingEvaluationContext context; + private final AppliedPTransform<?, ?, ?> appliedPTransform; + private final TransformEvaluator<PT> rddEvaluator; + private final PT transform; + + + private RDDOutputOperator(StreamingEvaluationContext context, + TransformEvaluator<PT> rddEvaluator, PT transform) { + this.context = context; + this.appliedPTransform = context.getCurrentTransform(); + this.rddEvaluator = rddEvaluator; + this.transform = transform; + } + + @Override + @SuppressWarnings("unchecked") + public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception { + AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform(); + context.setCurrentTransform(appliedPTransform); + context.setInputRDD(transform, rdd); + rddEvaluator.evaluate(transform, context); + context.setCurrentTransform(existingAPT); + return null; + } + } + + private static final TransformTranslator.FieldGetter WINDOW_FG = + new TransformTranslator.FieldGetter(Window.Bound.class); + + private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() { + return new TransformEvaluator<Window.Bound<T>>() { + @Override + public void evaluate(Window.Bound<T> transform, EvaluationContext context) { + StreamingEvaluationContext sec = (StreamingEvaluationContext) context; + //--- first we apply windowing to the stream + WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform); + @SuppressWarnings("unchecked") + JavaDStream<WindowedValue<T>> dStream = + (JavaDStream<WindowedValue<T>>) sec.getStream(transform); + if (windowFn instanceof FixedWindows) { + Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize() + .getMillis()); + sec.setStream(transform, dStream.window(windowDuration)); + } else if (windowFn instanceof SlidingWindows) { + Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize() + .getMillis()); + Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod() + .getMillis()); + sec.setStream(transform, dStream.window(windowDuration, slideDuration)); + } + //--- then we apply windowing to the elements + DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); + DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn, + ((StreamingEvaluationContext)context).getRuntimeContext(), null); + @SuppressWarnings("unchecked") + JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream = + (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>) + sec.getStream(transform); + sec.setStream(transform, dstream.mapPartitions(dofn)); + } + }; + } + + private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps + .newHashMap(); + + static { + EVALUATORS.put(ConsoleIO.Write.Unbound.class, print()); + EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue()); + EVALUATORS.put(Create.Values.class, create()); + EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka()); + EVALUATORS.put(Window.Bound.class, window()); + EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl()); + } + + private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets + .newHashSet(); + + static { + //TODO - add support for the following + UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class); + UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class); + UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class); + UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class); + UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class); + UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class); + } + + @SuppressWarnings("unchecked") + private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> + getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) { + TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz); + if (transform == null) { + if (UNSUPPORTED_EVALUATORS.contains(clazz)) { + throw new UnsupportedOperationException("Dataflow transformation " + clazz + .getCanonicalName() + + " is currently unsupported by the Spark streaming pipeline"); + } + // DStream transformations will transform an RDD into another RDD + // Actions will create output + // In Dataflow it depends on the PTransform's Input and Output class + Class<?> pTOutputClazz = getPTransformOutputClazz(clazz); + if (PDone.class.equals(pTOutputClazz)) { + return foreachRDD(rddTranslator); + } else { + return rddTransform(rddTranslator); + } + } + return transform; + } + + private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) { + Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments(); + return TypeToken.of(clazz).resolveType(types[1]).getRawType(); + } + + /** + * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator. + * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation + */ + public static class Translator implements SparkPipelineTranslator { + + private final SparkPipelineTranslator rddTranslator; + + public Translator(SparkPipelineTranslator rddTranslator) { + this.rddTranslator = rddTranslator; + } + + @Override + public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) { + // streaming includes rdd transformations as well + return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz); + } + + @Override + public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) { + return getTransformEvaluator(clazz, rddTranslator); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java new file mode 100644 index 0000000..504ea92 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation.streaming; + +import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; + +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; +import org.apache.beam.runners.spark.translation.TransformTranslator; + +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; + + +/** + * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing. + */ +public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator { + + // Currently, Spark streaming recommends batches no smaller then 500 msec + private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500); + + private boolean windowing; + private Duration batchDuration; + + public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) { + super(translator); + } + + private static final TransformTranslator.FieldGetter WINDOW_FG = + new TransformTranslator.FieldGetter(Window.Bound.class); + + // Use the smallest window (fixed or sliding) as Spark streaming's batch duration + @Override + protected <PT extends PTransform<? super PInput, POutput>> void + doVisitTransform(TransformTreeNode node) { + @SuppressWarnings("unchecked") + PT transform = (PT) node.getTransform(); + @SuppressWarnings("unchecked") + Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass(); + if (transformClass.isAssignableFrom(Window.Bound.class)) { + WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform); + if (windowFn instanceof FixedWindows) { + setBatchDuration(((FixedWindows) windowFn).getSize()); + } else if (windowFn instanceof SlidingWindows) { + if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) { + throw new UnsupportedOperationException("Spark does not support window offsets"); + } + // Sliding window size might as well set the batch duration. Applying the transformation + // will add the "slide" + setBatchDuration(((SlidingWindows) windowFn).getSize()); + } else if (!(windowFn instanceof GlobalWindows)) { + throw new IllegalStateException("Windowing function not supported: " + windowFn); + } + } + } + + private void setBatchDuration(org.joda.time.Duration duration) { + Long durationMillis = duration.getMillis(); + // validate window size + if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) { + throw new IllegalArgumentException("Windowing of size " + durationMillis + + "msec is not supported!"); + } + // choose the smallest duration to be Spark's batch duration, larger ones will be handled + // as window functions over the batched-stream + if (!windowing || this.batchDuration.milliseconds() > durationMillis) { + this.batchDuration = Durations.milliseconds(durationMillis); + } + windowing = true; + } + + public boolean isWindowing() { + return windowing; + } + + public Duration getBatchDuration() { + return batchDuration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar index 98387a6..e4a3a73 100644 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar +++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.beam.runners.spark.SparkPipelineOptionsRegistrar -org.apache.beam.runners.spark.streaming.SparkStreamingPipelineOptionsRegistrar \ No newline at end of file +org.apache.beam.runners.spark.translation.SparkPipelineOptionsRegistrar +org.apache.beam.runners.spark.translation.streaming.SparkStreamingPipelineOptionsRegistrar \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar index 972b1a3..7949db4 100644 --- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar +++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.beam.runners.spark.SparkPipelineRunnerRegistrar \ No newline at end of file +org.apache.beam.runners.spark.translation.SparkPipelineRunnerRegistrar \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/CombineGloballyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CombineGloballyTest.java deleted file mode 100644 index 35a634a..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CombineGloballyTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.Iterables; -import org.junit.Test; - -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -public class CombineGloballyTest { - - private static final String[] WORDS_ARRAY = { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - - @Test - public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - Pipeline p = Pipeline.create(options); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger())); - - EvaluationResult res = SparkPipelineRunner.create().run(p); - assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output))); - res.close(); - } - - public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> { - - @Override - public StringBuilder createAccumulator() { - // return null to differentiate from an empty string - return null; - } - - @Override - public StringBuilder addInput(StringBuilder accumulator, String input) { - return combine(accumulator, input); - } - - @Override - public StringBuilder mergeAccumulators(Iterable<StringBuilder> accumulators) { - StringBuilder sb = new StringBuilder(); - for (StringBuilder accum : accumulators) { - if (accum != null) { - sb.append(accum); - } - } - return sb; - } - - @Override - public String extractOutput(StringBuilder accumulator) { - return accumulator != null ? accumulator.toString(): ""; - } - - private static StringBuilder combine(StringBuilder accum, String datum) { - if (accum == null) { - return new StringBuilder(datum); - } else { - accum.append(",").append(datum); - return accum; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CombinePerKeyTest.java deleted file mode 100644 index a4c5eb7..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CombinePerKeyTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.coders.VarLongCoder; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.*; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.ImmutableList; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class CombinePerKeyTest { - - private static final List<String> WORDS = - ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog"); - @Test - public void testRun() { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>()); - EvaluationResult res = SparkPipelineRunner.create().run(p); - Map<String, Long> actualCnts = new HashMap<>(); - for (KV<String, Long> kv : res.get(cnts)) { - actualCnts.put(kv.getKey(), kv.getValue()); - } - res.close(); - Assert.assertEquals(8, actualCnts.size()); - Assert.assertEquals(Long.valueOf(2L), actualCnts.get("the")); - } - - private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> { - @Override - public PCollection<KV<T, Long>> apply(PCollection<T> pcol) { - PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() { - @Override - public void processElement(ProcessContext processContext) throws Exception { - processContext.output(KV.of(processContext.element(), 1L)); - } - })).setCoder(KvCoder.of(pcol.getCoder(), VarLongCoder.of())); - return withLongs.apply(Sum.<T>longsPerKey()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java index 905e243..4a080e8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java @@ -25,6 +25,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.ImmutableSet; +import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.junit.Test; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/DoFnOutputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DoFnOutputTest.java deleted file mode 100644 index 1ec3d75..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DoFnOutputTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; -import org.junit.Test; - -import java.io.Serializable; - -public class DoFnOutputTest implements Serializable { - @Test - public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - options.setRunner(SparkPipelineRunner.class); - Pipeline pipeline = Pipeline.create(options); - - PCollection<String> strings = pipeline.apply(Create.of("a")); - // Test that values written from startBundle() and finishBundle() are written to - // the output - PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() { - @Override - public void startBundle(Context c) throws Exception { - c.output("start"); - } - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element()); - } - @Override - public void finishBundle(Context c) throws Exception { - c.output("finish"); - } - })); - - DataflowAssert.that(output).containsInAnyOrder("start", "a", "finish"); - - EvaluationResult res = SparkPipelineRunner.create().run(pipeline); - res.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java index e0fe47d..057cf3b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java @@ -25,6 +25,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.Iterables; +import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.junit.Test; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/MultiOutputWordCountTest.java deleted file mode 100644 index c89090d..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/MultiOutputWordCountTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.transforms.*; -import com.google.cloud.dataflow.sdk.values.*; -import com.google.common.collect.Iterables; -import org.junit.Assert; -import org.junit.Test; - -public class MultiOutputWordCountTest { - - private static final TupleTag<String> upper = new TupleTag<>(); - private static final TupleTag<String> lower = new TupleTag<>(); - private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>(); - private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>(); - - @Test - public void testRun() throws Exception { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+")); - PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others")); - PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words")); - PCollectionList<String> list = PCollectionList.of(w1).and(w2); - - PCollection<String> union = list.apply(Flatten.<String>pCollections()); - PCollectionView<String> regexView = regex.apply(View.<String>asSingleton()); - CountWords countWords = new CountWords(regexView); - PCollectionTuple luc = union.apply(countWords); - PCollection<Long> unique = luc.get(lowerCnts).apply( - ApproximateUnique.<KV<String, Long>>globally(16)); - - EvaluationResult res = SparkPipelineRunner.create().run(p); - Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts)); - Assert.assertEquals("are", actualLower.iterator().next().getKey()); - Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts)); - Assert.assertEquals("Here", actualUpper.iterator().next().getKey()); - Iterable<Long> actualUniqCount = res.get(unique); - Assert.assertEquals(9, (long) actualUniqCount.iterator().next()); - int actualTotalWords = res.getAggregatorValue("totalWords", Integer.class); - Assert.assertEquals(18, actualTotalWords); - int actualMaxWordLength = res.getAggregatorValue("maxWordLength", Integer.class); - Assert.assertEquals(6, actualMaxWordLength); - AggregatorValues<Integer> aggregatorValues = res.getAggregatorValues(countWords - .getTotalWordsAggregator()); - Assert.assertEquals(18, Iterables.getOnlyElement(aggregatorValues.getValues()).intValue()); - - res.close(); - } - - /** - * A DoFn that tokenizes lines of text into individual words. - */ - static class ExtractWordsFn extends DoFn<String, String> { - - private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords", - new Sum.SumIntegerFn()); - private final Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength", - new Max.MaxIntegerFn()); - private final PCollectionView<String> regex; - - ExtractWordsFn(PCollectionView<String> regex) { - this.regex = regex; - } - - @Override - public void processElement(ProcessContext c) { - String[] words = c.element().split(c.sideInput(regex)); - for (String word : words) { - totalWords.addValue(1); - if (!word.isEmpty()) { - maxWordLength.addValue(word.length()); - if (Character.isLowerCase(word.charAt(0))) { - c.output(word); - } else { - c.sideOutput(upper, word); - } - } - } - } - } - - public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> { - - private final PCollectionView<String> regex; - private final ExtractWordsFn extractWordsFn; - - public CountWords(PCollectionView<String> regex) { - this.regex = regex; - this.extractWordsFn = new ExtractWordsFn(regex); - } - - @Override - public PCollectionTuple apply(PCollection<String> lines) { - // Convert lines of text into individual words. - PCollectionTuple lowerUpper = lines - .apply(ParDo.of(extractWordsFn) - .withSideInputs(regex) - .withOutputTags(lower, TupleTagList.of(upper))); - lowerUpper.get(lower).setCoder(StringUtf8Coder.of()); - lowerUpper.get(upper).setCoder(StringUtf8Coder.of()); - PCollection<KV<String, Long>> lowerCounts = lowerUpper.get(lower).apply(Count - .<String>perElement()); - PCollection<KV<String, Long>> upperCounts = lowerUpper.get(upper).apply(Count - .<String>perElement()); - return PCollectionTuple - .of(lowerCnts, lowerCounts) - .and(upperCnts, upperCounts); - } - - Aggregator<Integer, Integer> getTotalWordsAggregator() { - return extractWordsFn.totalWords; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SerializationTest.java deleted file mode 100644 index ae1eed7..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SerializationTest.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.AtomicCoder; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.transforms.*; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import org.junit.Test; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; - -public class SerializationTest { - - public static class StringHolder { // not serializable - private final String string; - - public StringHolder(String string) { - this.string = string; - } - - @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) { - return false; - } - StringHolder that = (StringHolder) o; - return string.equals(that.string); - } - - @Override - public int hashCode() { - return string.hashCode(); - } - - @Override - public String toString() { - return string; - } - } - - public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> { - - private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of(); - - @Override - public void encode(StringHolder value, OutputStream outStream, Context context) throws IOException { - stringUtf8Coder.encode(value.toString(), outStream, context); - } - - @Override - public StringHolder decode(InputStream inStream, Context context) throws IOException { - return new StringHolder(stringUtf8Coder.decode(inStream, context)); - } - - public static Coder<StringHolder> of() { - return new StringHolderUtf8Coder(); - } - } - - private static final String[] WORDS_ARRAY = { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - private static final List<StringHolder> WORDS = Lists.transform( - Arrays.asList(WORDS_ARRAY), new Function<String, StringHolder>() { - @Override public StringHolder apply(String s) { - return new StringHolder(s); - } - }); - private static final Set<StringHolder> EXPECTED_COUNT_SET = - ImmutableSet.copyOf(Lists.transform( - Arrays.asList("hi: 5", "there: 1", "sue: 2", "bob: 2"), - new Function<String, StringHolder>() { - @Override - public StringHolder apply(String s) { - return new StringHolder(s); - } - })); - - @Test - public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - options.setRunner(SparkPipelineRunner.class); - Pipeline p = Pipeline.create(options); - PCollection<StringHolder> inputWords = - p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of())); - PCollection<StringHolder> output = inputWords.apply(new CountWords()); - - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - - EvaluationResult res = SparkPipelineRunner.create().run(p); - res.close(); - } - - /** - * A DoFn that tokenizes lines of text into individual words. - */ - static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> { - private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+"); - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - // Split the line into words. - String[] words = WORD_BOUNDARY.split(c.element().toString()); - - // Keep track of the number of lines without any words encountered while tokenizing. - // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner. - if (words.length == 0) { - emptyLines.addValue(1L); - } - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(new StringHolder(word)); - } - } - } - } - - /** - * A DoFn that converts a Word and Count into a printable string. - */ - private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> { - @Override - public void processElement(ProcessContext c) { - c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue())); - } - } - - private static class CountWords extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> { - @Override - public PCollection<StringHolder> apply(PCollection<StringHolder> lines) { - - // Convert lines of text into individual words. - PCollection<StringHolder> words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection<KV<StringHolder, Long>> wordCounts = - words.apply(Count.<StringHolder>perElement()); - - // Format each word and count into a printable string. - - return wordCounts.apply(ParDo.of(new FormatCountsFn())); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java deleted file mode 100644 index bdc048c..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.Serializable; -import java.net.URI; - -import static org.junit.Assert.*; - -public class SideEffectsTest implements Serializable { - - static class UserException extends RuntimeException { - } - - @Test - public void test() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - options.setRunner(SparkPipelineRunner.class); - Pipeline pipeline = Pipeline.create(options); - - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - throw new UserException(); - } - })); - - try { - pipeline.run(); - fail("Run should thrown an exception"); - } catch (RuntimeException e) { - assertNotNull(e.getCause()); - - // TODO: remove the version check (and the setup and teardown methods) when we no - // longer support Spark 1.3 or 1.4 - String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), options.getAppName()).version(); - if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) { - assertTrue(e.getCause() instanceof UserException); - } - } - } - - @Before - public void setup() { - System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "true"); - } - - @After - public void teardown() { - System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "false"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index c7dc400..e32b39a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -25,6 +25,7 @@ import com.google.cloud.dataflow.sdk.transforms.*; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.ImmutableSet; +import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.junit.Test; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java deleted file mode 100644 index 23416d7..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import org.junit.Assert; -import org.junit.Test; - -public class TestSparkPipelineOptionsFactory { - @Test - public void testDefaultCreateMethod() { - SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create(); - Assert.assertEquals("local[1]", actualOptions.getSparkMaster()); - } - - @Test - public void testSettingCustomOptions() { - SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create(); - actualOptions.setSparkMaster("spark://207.184.161.138:7077"); - Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java deleted file mode 100644 index ed58c77..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.api.client.repackaged.com.google.common.base.Joiner; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Charsets; -import org.apache.commons.io.FileUtils; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; - -/** - * A test for the transforms registered in TransformTranslator. - * Builds a regular Dataflow pipeline with each of the mapped - * transforms, and makes sure that they work when the pipeline is - * executed in Spark. - */ -public class TransformTranslatorTest { - - @Rule - public TestName name = new TestName(); - - private DirectPipelineRunner directRunner; - private SparkPipelineRunner sparkRunner; - private String testDataDirName; - - @Before public void init() throws IOException { - sparkRunner = SparkPipelineRunner.create(); - directRunner = DirectPipelineRunner.createForTest(); - testDataDirName = Joiner.on(File.separator).join("target", "test-data", name.getMethodName()) - + File.separator; - FileUtils.deleteDirectory(new File(testDataDirName)); - new File(testDataDirName).mkdirs(); - } - - /** - * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline - * in DirectPipelineRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark - * transforms. Finally it makes sure that the results are the same for both runs. - */ - @Test - public void testTextIOReadAndWriteTransforms() throws IOException { - String directOut = runPipeline("direct", directRunner); - String sparkOut = runPipeline("spark", sparkRunner); - - List<String> directOutput = - Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8); - - List<String> sparkOutput = - Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8); - - // sort output to get a stable result (PCollections are not ordered) - Collections.sort(directOutput); - Collections.sort(sparkOutput); - - Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray()); - } - - private String runPipeline(String name, PipelineRunner<?> runner) { - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name); - PCollection<String> lines = p.apply(TextIO.Read.from("src/test/resources/test_text.txt")); - lines.apply(TextIO.Write.to(outFile)); - runner.run(p); - return outFile; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java deleted file mode 100644 index 77409a0..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.collect.ImmutableList; -import java.util.Arrays; -import java.util.List; - -import org.joda.time.Duration; -import org.junit.Test; - -public class WindowedWordCountTest { - private static final String[] WORDS_ARRAY = { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - private static final Long[] TIMESTAMPS_ARRAY = { - 60000L, 60000L, 60000L, - 120000L, 120000L, 120000L}; - private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY); - private static final List<String> EXPECTED_COUNT_SET = - ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", - "hi: 2", "sue: 1", "bob: 1"); - - @Test - public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - options.setRunner(SparkPipelineRunner.class); - Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)) - .setCoder(StringUtf8Coder.of()); - PCollection<String> windowedWords = inputWords - .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))); - - PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); - - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - - EvaluationResult res = SparkPipelineRunner.create().run(p); - res.close(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 5609e88..9edf41c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -30,7 +30,7 @@ import com.google.common.collect.Sets; import com.google.common.io.Files; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkPipelineOptionsFactory; +import org.apache.beam.runners.spark.translation.SparkPipelineOptionsFactory; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.junit.Before; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java deleted file mode 100644 index a7b9f28..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.streaming; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; - -import org.apache.beam.runners.spark.io.CreateStream; -import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.runners.spark.streaming.utils.DataflowAssertStreaming; - -import org.joda.time.Duration; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * Test Flatten (union) implementation for streaming. - */ -public class FlattenStreamingTest { - - private static final String[] WORDS_ARRAY_1 = { - "one", "two", "three", "four"}; - private static final List<Iterable<String>> WORDS_QUEUE_1 = - Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_1)); - private static final String[] WORDS_ARRAY_2 = { - "five", "six", "seven", "eight"}; - private static final List<Iterable<String>> WORDS_QUEUE_2 = - Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2)); - private static final String[] EXPECTED_UNION = { - "one", "two", "three", "four", "five", "six", "seven", "eight"}; - private static final long TEST_TIMEOUT_MSEC = 1000L; - - @Test - public void testRun() throws Exception { - SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); - options.setAppName(this.getClass().getSimpleName()); - options.setRunner(SparkPipelineRunner.class); - options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval - Pipeline p = Pipeline.create(options); - - PCollection<String> w1 = - p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of()); - PCollection<String> windowedW1 = - w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))); - PCollection<String> w2 = - p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of()); - PCollection<String> windowedW2 = - w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))); - PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2); - PCollection<String> union = list.apply(Flatten.<String>pCollections()); - - DataflowAssert.thatIterable(union.apply(View.<String>asIterable())) - .containsInAnyOrder(EXPECTED_UNION); - - EvaluationResult res = SparkPipelineRunner.create(options).run(p); - res.close(); - - DataflowAssertStreaming.assertNoFailures(res); - } - -}
