http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformTranslator.java deleted file mode 100644 index 6a9dca0..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TransformTranslator.java +++ /dev/null @@ -1,808 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import com.google.api.client.util.Maps; -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.io.AvroIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; -import com.google.cloud.dataflow.sdk.values.PCollectionTuple; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.collect.ImmutableMap; - -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroJob; -import org.apache.avro.mapreduce.AvroKeyInputFormat; -import org.apache.beam.runners.spark.coders.CoderHelpers; -import org.apache.beam.runners.spark.io.hadoop.HadoopIO; -import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper; -import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat; -import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat; -import org.apache.beam.runners.spark.util.BroadcastHelper; -import org.apache.beam.runners.spark.util.ByteArray; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaRDDLike; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import scala.Tuple2; - -/** - * Supports translation between a DataFlow transform, and Spark's operations on RDDs. - */ -public final class TransformTranslator { - - private TransformTranslator() { - } - - public static class FieldGetter { - private final Map<String, Field> fields; - - public FieldGetter(Class<?> clazz) { - this.fields = Maps.newHashMap(); - for (Field f : clazz.getDeclaredFields()) { - f.setAccessible(true); - this.fields.put(f.getName(), f); - } - } - - public <T> T get(String fieldname, Object value) { - try { - @SuppressWarnings("unchecked") - T fieldValue = (T) fields.get(fieldname).get(value); - return fieldValue; - } catch (IllegalAccessException e) { - throw new IllegalStateException(e); - } - } - } - - private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() { - return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() { - @SuppressWarnings("unchecked") - @Override - public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) { - PCollectionList<T> pcs = context.getInput(transform); - JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()]; - for (int i = 0; i < rdds.length; i++) { - rdds[i] = (JavaRDD<WindowedValue<T>>) context.getRDD(pcs.get(i)); - } - JavaRDD<WindowedValue<T>> rdd = context.getSparkContext().union(rdds); - context.setOutputRDD(transform, rdd); - } - }; - } - - private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> gbk() { - return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() { - @Override - public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD = - (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform); - @SuppressWarnings("unchecked") - KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); - Coder<K> keyCoder = coder.getKeyCoder(); - Coder<V> valueCoder = coder.getValueCoder(); - - // Use coders to convert objects in the PCollection to byte arrays, so they - // can be transferred over the network for the shuffle. - JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD = fromPair( - toPair(inRDD.map(WindowingHelpers.<KV<K, V>>unwindowFunction())) - .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder)) - .groupByKey() - .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder))) - // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK - .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction()); - context.setOutputRDD(transform, outRDD); - } - }; - } - - private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class); - - private static <K, VI, VO> TransformEvaluator<Combine.GroupedValues<K, VI, VO>> grouped() { - return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() { - @Override - public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) { - Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform); - @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD = - (JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform); - context.setOutputRDD(transform, - inRDD.map(new KVFunction<>(keyed))); - } - }; - } - - private static final FieldGetter COMBINE_GLOBALLY_FG = new FieldGetter(Combine.Globally.class); - - private static <I, A, O> TransformEvaluator<Combine.Globally<I, O>> combineGlobally() { - return new TransformEvaluator<Combine.Globally<I, O>>() { - - @Override - public void evaluate(Combine.Globally<I, O> transform, EvaluationContext context) { - final Combine.CombineFn<I, A, O> globally = COMBINE_GLOBALLY_FG.get("fn", transform); - - @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<I>, ?> inRdd = - (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform); - - final Coder<I> iCoder = context.getInput(transform).getCoder(); - final Coder<A> aCoder; - try { - aCoder = globally.getAccumulatorCoder( - context.getPipeline().getCoderRegistry(), iCoder); - } catch (CannotProvideCoderException e) { - throw new IllegalStateException("Could not determine coder for accumulator", e); - } - - // Use coders to convert objects in the PCollection to byte arrays, so they - // can be transferred over the network for the shuffle. - JavaRDD<byte[]> inRddBytes = inRdd - .map(WindowingHelpers.<I>unwindowFunction()) - .map(CoderHelpers.toByteFunction(iCoder)); - - /*A*/ byte[] acc = inRddBytes.aggregate( - CoderHelpers.toByteArray(globally.createAccumulator(), aCoder), - new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() { - @Override - public /*A*/ byte[] call(/*A*/ byte[] ab, /*I*/ byte[] ib) throws Exception { - A a = CoderHelpers.fromByteArray(ab, aCoder); - I i = CoderHelpers.fromByteArray(ib, iCoder); - return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder); - } - }, - new Function2</*A*/ byte[], /*A*/ byte[], /*A*/ byte[]>() { - @Override - public /*A*/ byte[] call(/*A*/ byte[] a1b, /*A*/ byte[] a2b) throws Exception { - A a1 = CoderHelpers.fromByteArray(a1b, aCoder); - A a2 = CoderHelpers.fromByteArray(a2b, aCoder); - // don't use Guava's ImmutableList.of as values may be null - List<A> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2)); - A merged = globally.mergeAccumulators(accumulators); - return CoderHelpers.toByteArray(merged, aCoder); - } - } - ); - O output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder)); - - Coder<O> coder = context.getOutput(transform).getCoder(); - JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize( - // don't use Guava's ImmutableList.of as output may be null - CoderHelpers.toByteArrays(Collections.singleton(output), coder)); - context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder)) - .map(WindowingHelpers.<O>windowFunction())); - } - }; - } - - private static final FieldGetter COMBINE_PERKEY_FG = new FieldGetter(Combine.PerKey.class); - - private static <K, VI, VA, VO> TransformEvaluator<Combine.PerKey<K, VI, VO>> combinePerKey() { - return new TransformEvaluator<Combine.PerKey<K, VI, VO>>() { - @Override - public void evaluate(Combine.PerKey<K, VI, VO> transform, EvaluationContext context) { - final Combine.KeyedCombineFn<K, VI, VA, VO> keyed = - COMBINE_PERKEY_FG.get("fn", transform); - @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<KV<K, VI>>, ?> inRdd = - (JavaRDDLike<WindowedValue<KV<K, VI>>, ?>) context.getInputRDD(transform); - - @SuppressWarnings("unchecked") - KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder(); - Coder<K> keyCoder = inputCoder.getKeyCoder(); - Coder<VI> viCoder = inputCoder.getValueCoder(); - Coder<VA> vaCoder; - try { - vaCoder = keyed.getAccumulatorCoder( - context.getPipeline().getCoderRegistry(), keyCoder, viCoder); - } catch (CannotProvideCoderException e) { - throw new IllegalStateException("Could not determine coder for accumulator", e); - } - Coder<KV<K, VI>> kviCoder = KvCoder.of(keyCoder, viCoder); - Coder<KV<K, VA>> kvaCoder = KvCoder.of(keyCoder, vaCoder); - - // We need to duplicate K as both the key of the JavaPairRDD as well as inside the value, - // since the functions passed to combineByKey don't receive the associated key of each - // value, and we need to map back into methods in Combine.KeyedCombineFn, which each - // require the key in addition to the VI's and VA's being merged/accumulated. Once Spark - // provides a way to include keys in the arguments of combine/merge functions, we won't - // need to duplicate the keys anymore. - - // Key has to bw windowed in order to group by window as well - JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair = - inRdd.mapToPair( - new PairFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>, - WindowedValue<KV<K, VI>>>() { - @Override - public Tuple2<WindowedValue<K>, - WindowedValue<KV<K, VI>>> call(WindowedValue<KV<K, VI>> kv) { - WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(), - kv.getTimestamp(), kv.getWindows(), kv.getPane()); - return new Tuple2<>(wk, kv); - } - }); - //-- windowed coders - final WindowedValue.FullWindowedValueCoder<K> wkCoder = - WindowedValue.FullWindowedValueCoder.of(keyCoder, - context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder()); - final WindowedValue.FullWindowedValueCoder<KV<K, VI>> wkviCoder = - WindowedValue.FullWindowedValueCoder.of(kviCoder, - context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder()); - final WindowedValue.FullWindowedValueCoder<KV<K, VA>> wkvaCoder = - WindowedValue.FullWindowedValueCoder.of(kvaCoder, - context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder()); - - // Use coders to convert objects in the PCollection to byte arrays, so they - // can be transferred over the network for the shuffle. - JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair - .mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder)); - - // The output of combineByKey will be "VA" (accumulator) types rather than "VO" (final - // output types) since Combine.CombineFn only provides ways to merge VAs, and no way - // to merge VOs. - JavaPairRDD</*K*/ ByteArray, /*KV<K, VA>*/ byte[]> accumulatedBytes = - inRddDuplicatedKeyPairBytes.combineByKey( - new Function</*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() { - @Override - public /*KV<K, VA>*/ byte[] call(/*KV<K, VI>*/ byte[] input) { - WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder); - VA va = keyed.createAccumulator(wkvi.getValue().getKey()); - va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue()); - WindowedValue<KV<K, VA>> wkva = - WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(), - wkvi.getWindows(), wkvi.getPane()); - return CoderHelpers.toByteArray(wkva, wkvaCoder); - } - }, - new Function2</*KV<K, VA>*/ byte[], /*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() { - @Override - public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc, - /*KV<K, VI>*/ byte[] input) { - WindowedValue<KV<K, VA>> wkva = CoderHelpers.fromByteArray(acc, wkvaCoder); - WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder); - VA va = keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(), - wkvi.getValue().getValue()); - wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(), - wkva.getWindows(), wkva.getPane()); - return CoderHelpers.toByteArray(wkva, wkvaCoder); - } - }, - new Function2</*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[]>() { - @Override - public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc1, - /*KV<K, VA>*/ byte[] acc2) { - WindowedValue<KV<K, VA>> wkva1 = CoderHelpers.fromByteArray(acc1, wkvaCoder); - WindowedValue<KV<K, VA>> wkva2 = CoderHelpers.fromByteArray(acc2, wkvaCoder); - VA va = keyed.mergeAccumulators(wkva1.getValue().getKey(), - // don't use Guava's ImmutableList.of as values may be null - Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(), - wkva2.getValue().getValue()))); - WindowedValue<KV<K, VA>> wkva = WindowedValue.of(KV.of(wkva1.getValue().getKey(), - va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane()); - return CoderHelpers.toByteArray(wkva, wkvaCoder); - } - }); - - JavaPairRDD<WindowedValue<K>, WindowedValue<VO>> extracted = accumulatedBytes - .mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder)) - .mapValues( - new Function<WindowedValue<KV<K, VA>>, WindowedValue<VO>>() { - @Override - public WindowedValue<VO> call(WindowedValue<KV<K, VA>> acc) { - return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(), - acc.getValue().getValue()), acc.getTimestamp(), - acc.getWindows(), acc.getPane()); - } - }); - - context.setOutputRDD(transform, - fromPair(extracted) - .map(new Function<KV<WindowedValue<K>, WindowedValue<VO>>, WindowedValue<KV<K, VO>>>() { - @Override - public WindowedValue<KV<K, VO>> call(KV<WindowedValue<K>, WindowedValue<VO>> kwvo) - throws Exception { - WindowedValue<VO> wvo = kwvo.getValue(); - KV<K, VO> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue()); - return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane()); - } - })); - } - }; - } - - private static final class KVFunction<K, VI, VO> - implements Function<WindowedValue<KV<K, Iterable<VI>>>, WindowedValue<KV<K, VO>>> { - private final Combine.KeyedCombineFn<K, VI, ?, VO> keyed; - - KVFunction(Combine.KeyedCombineFn<K, VI, ?, VO> keyed) { - this.keyed = keyed; - } - - @Override - public WindowedValue<KV<K, VO>> call(WindowedValue<KV<K, Iterable<VI>>> windowedKv) - throws Exception { - KV<K, Iterable<VI>> kv = windowedKv.getValue(); - return WindowedValue.of(KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue())), - windowedKv.getTimestamp(), windowedKv.getWindows(), windowedKv.getPane()); - } - } - - private static <K, V> JavaPairRDD<K, V> toPair(JavaRDDLike<KV<K, V>, ?> rdd) { - return rdd.mapToPair(new PairFunction<KV<K, V>, K, V>() { - @Override - public Tuple2<K, V> call(KV<K, V> kv) { - return new Tuple2<>(kv.getKey(), kv.getValue()); - } - }); - } - - private static <K, V> JavaRDDLike<KV<K, V>, ?> fromPair(JavaPairRDD<K, V> rdd) { - return rdd.map(new Function<Tuple2<K, V>, KV<K, V>>() { - @Override - public KV<K, V> call(Tuple2<K, V> t2) { - return KV.of(t2._1(), t2._2()); - } - }); - } - - private static <I, O> TransformEvaluator<ParDo.Bound<I, O>> parDo() { - return new TransformEvaluator<ParDo.Bound<I, O>>() { - @Override - public void evaluate(ParDo.Bound<I, O> transform, EvaluationContext context) { - DoFnFunction<I, O> dofn = - new DoFnFunction<>(transform.getFn(), - context.getRuntimeContext(), - getSideInputs(transform.getSideInputs(), context)); - @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<I>, ?> inRDD = - (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform); - context.setOutputRDD(transform, inRDD.mapPartitions(dofn)); - } - }; - } - - private static final FieldGetter MULTIDO_FG = new FieldGetter(ParDo.BoundMulti.class); - - private static <I, O> TransformEvaluator<ParDo.BoundMulti<I, O>> multiDo() { - return new TransformEvaluator<ParDo.BoundMulti<I, O>>() { - @Override - public void evaluate(ParDo.BoundMulti<I, O> transform, EvaluationContext context) { - TupleTag<O> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform); - MultiDoFnFunction<I, O> multifn = new MultiDoFnFunction<>( - transform.getFn(), - context.getRuntimeContext(), - mainOutputTag, - getSideInputs(transform.getSideInputs(), context)); - - @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<I>, ?> inRDD = - (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform); - JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD - .mapPartitionsToPair(multifn) - .cache(); - - PCollectionTuple pct = context.getOutput(transform); - for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) { - @SuppressWarnings("unchecked") - JavaPairRDD<TupleTag<?>, WindowedValue<?>> filtered = - all.filter(new TupleTagFilter(e.getKey())); - @SuppressWarnings("unchecked") - // Object is the best we can do since different outputs can have different tags - JavaRDD<WindowedValue<Object>> values = - (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values(); - context.setRDD(e.getValue(), values); - } - } - }; - } - - - private static <T> TransformEvaluator<TextIO.Read.Bound<T>> readText() { - return new TransformEvaluator<TextIO.Read.Bound<T>>() { - @Override - public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) { - String pattern = transform.getFilepattern(); - JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern) - .map(WindowingHelpers.<String>windowFunction()); - context.setOutputRDD(transform, rdd); - } - }; - } - - private static <T> TransformEvaluator<TextIO.Write.Bound<T>> writeText() { - return new TransformEvaluator<TextIO.Write.Bound<T>>() { - @Override - public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaPairRDD<T, Void> last = - ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform)) - .map(WindowingHelpers.<T>unwindowFunction()) - .mapToPair(new PairFunction<T, T, - Void>() { - @Override - public Tuple2<T, Void> call(T t) throws Exception { - return new Tuple2<>(t, null); - } - }); - ShardTemplateInformation shardTemplateInfo = - new ShardTemplateInformation(transform.getNumShards(), - transform.getShardTemplate(), transform.getFilenamePrefix(), - transform.getFilenameSuffix()); - writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, - NullWritable.class, TemplatedTextOutputFormat.class); - } - }; - } - - private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() { - return new TransformEvaluator<AvroIO.Read.Bound<T>>() { - @Override - public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) { - String pattern = transform.getFilepattern(); - JavaSparkContext jsc = context.getSparkContext(); - @SuppressWarnings("unchecked") - JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>) - jsc.newAPIHadoopFile(pattern, - AvroKeyInputFormat.class, - AvroKey.class, NullWritable.class, - new Configuration()).keys(); - JavaRDD<WindowedValue<T>> rdd = avroFile.map( - new Function<AvroKey<T>, T>() { - @Override - public T call(AvroKey<T> key) { - return key.datum(); - } - }).map(WindowingHelpers.<T>windowFunction()); - context.setOutputRDD(transform, rdd); - } - }; - } - - private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() { - return new TransformEvaluator<AvroIO.Write.Bound<T>>() { - @Override - public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) { - Job job; - try { - job = Job.getInstance(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - AvroJob.setOutputKeySchema(job, transform.getSchema()); - @SuppressWarnings("unchecked") - JavaPairRDD<AvroKey<T>, NullWritable> last = - ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform)) - .map(WindowingHelpers.<T>unwindowFunction()) - .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() { - @Override - public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception { - return new Tuple2<>(new AvroKey<>(t), NullWritable.get()); - } - }); - ShardTemplateInformation shardTemplateInfo = - new ShardTemplateInformation(transform.getNumShards(), - transform.getShardTemplate(), transform.getFilenamePrefix(), - transform.getFilenameSuffix()); - writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo, - AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class); - } - }; - } - - private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() { - return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() { - @Override - public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) { - String pattern = transform.getFilepattern(); - JavaSparkContext jsc = context.getSparkContext(); - @SuppressWarnings ("unchecked") - JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern, - transform.getFormatClass(), - transform.getKeyClass(), transform.getValueClass(), - new Configuration()); - JavaRDD<WindowedValue<KV<K, V>>> rdd = - file.map(new Function<Tuple2<K, V>, KV<K, V>>() { - @Override - public KV<K, V> call(Tuple2<K, V> t2) throws Exception { - return KV.of(t2._1(), t2._2()); - } - }).map(WindowingHelpers.<KV<K, V>>windowFunction()); - context.setOutputRDD(transform, rdd); - } - }; - } - - private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() { - return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>() { - @Override - public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaPairRDD<K, V> last = ((JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context - .getInputRDD(transform)) - .map(WindowingHelpers.<KV<K, V>>unwindowFunction()) - .mapToPair(new PairFunction<KV<K, V>, K, V>() { - @Override - public Tuple2<K, V> call(KV<K, V> t) throws Exception { - return new Tuple2<>(t.getKey(), t.getValue()); - } - }); - ShardTemplateInformation shardTemplateInfo = - new ShardTemplateInformation(transform.getNumShards(), - transform.getShardTemplate(), transform.getFilenamePrefix(), - transform.getFilenameSuffix()); - Configuration conf = new Configuration(); - for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) { - conf.set(e.getKey(), e.getValue()); - } - writeHadoopFile(last, conf, shardTemplateInfo, - transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass()); - } - }; - } - - private static final class ShardTemplateInformation { - private final int numShards; - private final String shardTemplate; - private final String filenamePrefix; - private final String filenameSuffix; - - private ShardTemplateInformation(int numShards, String shardTemplate, String - filenamePrefix, String filenameSuffix) { - this.numShards = numShards; - this.shardTemplate = shardTemplate; - this.filenamePrefix = filenamePrefix; - this.filenameSuffix = filenameSuffix; - } - - int getNumShards() { - return numShards; - } - - String getShardTemplate() { - return shardTemplate; - } - - String getFilenamePrefix() { - return filenamePrefix; - } - - String getFilenameSuffix() { - return filenameSuffix; - } - } - - private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf, - ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass, - Class<? extends FileOutputFormat> formatClass) { - int numShards = shardTemplateInfo.getNumShards(); - String shardTemplate = shardTemplateInfo.getShardTemplate(); - String filenamePrefix = shardTemplateInfo.getFilenamePrefix(); - String filenameSuffix = shardTemplateInfo.getFilenameSuffix(); - if (numShards != 0) { - // number of shards was set explicitly, so repartition - rdd = rdd.repartition(numShards); - } - int actualNumShards = rdd.partitions().size(); - String template = replaceShardCount(shardTemplate, actualNumShards); - String outputDir = getOutputDirectory(filenamePrefix, template); - String filePrefix = getOutputFilePrefix(filenamePrefix, template); - String fileTemplate = getOutputFileTemplate(filenamePrefix, template); - - conf.set(ShardNameTemplateHelper.OUTPUT_FILE_PREFIX, filePrefix); - conf.set(ShardNameTemplateHelper.OUTPUT_FILE_TEMPLATE, fileTemplate); - conf.set(ShardNameTemplateHelper.OUTPUT_FILE_SUFFIX, filenameSuffix); - rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf); - } - - private static final FieldGetter WINDOW_FG = new FieldGetter(Window.Bound.class); - - private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() { - return new TransformEvaluator<Window.Bound<T>>() { - @Override - public void evaluate(Window.Bound<T> transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<T>, ?> inRDD = - (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform); - WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform); - if (windowFn instanceof GlobalWindows) { - context.setOutputRDD(transform, inRDD); - } else { - @SuppressWarnings("unchecked") - DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); - DoFnFunction<T, T> dofn = - new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null); - context.setOutputRDD(transform, inRDD.mapPartitions(dofn)); - } - } - }; - } - - private static <T> TransformEvaluator<Create.Values<T>> create() { - return new TransformEvaluator<Create.Values<T>>() { - @Override - public void evaluate(Create.Values<T> transform, EvaluationContext context) { - Iterable<T> elems = transform.getElements(); - // Use a coder to convert the objects in the PCollection to byte arrays, so they - // can be transferred over the network. - Coder<T> coder = context.getOutput(transform).getCoder(); - context.setOutputRDDFromValues(transform, elems, coder); - } - }; - } - - private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() { - return new TransformEvaluator<View.AsSingleton<T>>() { - @Override - public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) { - Iterable<? extends WindowedValue<?>> iter = - context.getWindowedValues(context.getInput(transform)); - context.setPView(context.getOutput(transform), iter); - } - }; - } - - private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() { - return new TransformEvaluator<View.AsIterable<T>>() { - @Override - public void evaluate(View.AsIterable<T> transform, EvaluationContext context) { - Iterable<? extends WindowedValue<?>> iter = - context.getWindowedValues(context.getInput(transform)); - context.setPView(context.getOutput(transform), iter); - } - }; - } - - private static <R, W> TransformEvaluator<View.CreatePCollectionView<R, W>> createPCollView() { - return new TransformEvaluator<View.CreatePCollectionView<R, W>>() { - @Override - public void evaluate(View.CreatePCollectionView<R, W> transform, EvaluationContext context) { - Iterable<? extends WindowedValue<?>> iter = - context.getWindowedValues(context.getInput(transform)); - context.setPView(context.getOutput(transform), iter); - } - }; - } - - private static final class TupleTagFilter<V> - implements Function<Tuple2<TupleTag<V>, WindowedValue<?>>, Boolean> { - - private final TupleTag<V> tag; - - private TupleTagFilter(TupleTag<V> tag) { - this.tag = tag; - } - - @Override - public Boolean call(Tuple2<TupleTag<V>, WindowedValue<?>> input) { - return tag.equals(input._1()); - } - } - - private static Map<TupleTag<?>, BroadcastHelper<?>> getSideInputs( - List<PCollectionView<?>> views, - EvaluationContext context) { - if (views == null) { - return ImmutableMap.of(); - } else { - Map<TupleTag<?>, BroadcastHelper<?>> sideInputs = Maps.newHashMap(); - for (PCollectionView<?> view : views) { - Iterable<? extends WindowedValue<?>> collectionView = context.getPCollectionView(view); - Coder<Iterable<WindowedValue<?>>> coderInternal = view.getCoderInternal(); - @SuppressWarnings("unchecked") - BroadcastHelper<?> helper = - BroadcastHelper.create((Iterable<WindowedValue<?>>) collectionView, coderInternal); - //broadcast side inputs - helper.broadcast(context.getSparkContext()); - sideInputs.put(view.getTagInternal(), helper); - } - return sideInputs; - } - } - - private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps - .newHashMap(); - - static { - EVALUATORS.put(TextIO.Read.Bound.class, readText()); - EVALUATORS.put(TextIO.Write.Bound.class, writeText()); - EVALUATORS.put(AvroIO.Read.Bound.class, readAvro()); - EVALUATORS.put(AvroIO.Write.Bound.class, writeAvro()); - EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop()); - EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop()); - EVALUATORS.put(ParDo.Bound.class, parDo()); - EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); - EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk()); - EVALUATORS.put(Combine.GroupedValues.class, grouped()); - EVALUATORS.put(Combine.Globally.class, combineGlobally()); - EVALUATORS.put(Combine.PerKey.class, combinePerKey()); - EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl()); - EVALUATORS.put(Create.Values.class, create()); - EVALUATORS.put(View.AsSingleton.class, viewAsSingleton()); - EVALUATORS.put(View.AsIterable.class, viewAsIter()); - EVALUATORS.put(View.CreatePCollectionView.class, createPCollView()); - EVALUATORS.put(Window.Bound.class, window()); - } - - public static <PT extends PTransform<?, ?>> TransformEvaluator<PT> - getTransformEvaluator(Class<PT> clazz) { - @SuppressWarnings("unchecked") - TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz); - if (transform == null) { - throw new IllegalStateException("No TransformEvaluator registered for " + clazz); - } - return transform; - } - - /** - * Translator matches Dataflow transformation with the appropriate evaluator. - */ - public static class Translator implements SparkPipelineTranslator { - - @Override - public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) { - return EVALUATORS.containsKey(clazz); - } - - @Override - public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) { - return getTransformEvaluator(clazz); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/WindowingHelpers.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/WindowingHelpers.java deleted file mode 100644 index bc09d5b..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/WindowingHelpers.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import org.apache.spark.api.java.function.Function; - -/** - * Helper functions for working with windows. - */ -public final class WindowingHelpers { - private WindowingHelpers() { - } - - /** - * A function for converting a value to a {@link WindowedValue}. The resulting - * {@link WindowedValue} will be in no windows, and will have the default timestamp - * and pane. - * - * @param <T> The type of the object. - * @return A function that accepts an object and returns its {@link WindowedValue}. - */ - public static <T> Function<T, WindowedValue<T>> windowFunction() { - return new Function<T, WindowedValue<T>>() { - @Override - public WindowedValue<T> call(T t) { - return WindowedValue.valueInEmptyWindows(t); - } - }; - } - - /** - * A function for extracting the value from a {@link WindowedValue}. - * - * @param <T> The type of the object. - * @return A function that accepts a {@link WindowedValue} and returns its value. - */ - public static <T> Function<WindowedValue<T>, T> unwindowFunction() { - return new Function<WindowedValue<T>, T>() { - @Override - public T call(WindowedValue<T> t) { - return t.getValue(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index c3718f4..6c397a2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -30,7 +30,7 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.spark.SparkRuntimeContext; +import org.apache.beam.runners.spark.translation.SparkRuntimeContext; /** * This class wraps a map of named aggregators. Spark expects that all accumulators be declared http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java deleted file mode 100644 index f96f4dd..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.streaming; - -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; - -import org.apache.beam.runners.spark.SparkPipelineOptions; - -/** - * Options used to configure Spark streaming. - */ -public interface SparkStreamingPipelineOptions extends SparkPipelineOptions { - @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " + - "execution is stopped") - @Default.Long(-1) - Long getTimeout(); - - void setTimeout(Long batchInterval); - - @Override - @Default.Boolean(true) - boolean isStreaming(); - - @Override - @Default.String("spark streaming dataflow pipeline job") - String getAppName(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java deleted file mode 100644 index ae04ebe..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.streaming; - -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; - -public final class SparkStreamingPipelineOptionsFactory { - - private SparkStreamingPipelineOptionsFactory() { - } - - public static SparkStreamingPipelineOptions create() { - return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java deleted file mode 100644 index 256820c..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.streaming; - -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; -import com.google.common.collect.ImmutableList; - -public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar { - - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions - .class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java deleted file mode 100644 index 50c889c..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.streaming; - - -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.cloud.dataflow.sdk.values.PValue; - -import org.apache.beam.runners.spark.EvaluationContext; -import org.apache.beam.runners.spark.SparkRuntimeContext; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaRDDLike; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaDStreamLike; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - - -/** - * Streaming evaluation context helps to handle streaming. - */ -public class StreamingEvaluationContext extends EvaluationContext { - - private final JavaStreamingContext jssc; - private final long timeout; - private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>(); - private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>(); - - public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, - JavaStreamingContext jssc, long timeout) { - super(jsc, pipeline); - this.jssc = jssc; - this.timeout = timeout; - } - - /** - * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for - * testing. - */ - private class DStreamHolder<T> { - - private Iterable<Iterable<T>> values; - private Coder<T> coder; - private JavaDStream<WindowedValue<T>> dStream; - - DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) { - this.values = values; - this.coder = coder; - } - - DStreamHolder(JavaDStream<WindowedValue<T>> dStream) { - this.dStream = dStream; - } - - @SuppressWarnings("unchecked") - JavaDStream<WindowedValue<T>> getDStream() { - if (dStream == null) { - // create the DStream from values - Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); - for (Iterable<T> v : values) { - setOutputRDDFromValues(currentTransform.getTransform(), v, coder); - rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform())); - } - // create dstream from queue, one at a time, no defaults - // mainly for unit test so no reason to have this configurable - dStream = jssc.queueStream(rddQueue, true); - } - return dStream; - } - } - - <T> void setDStreamFromQueue( - PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) { - pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder)); - } - - <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) { - PValue pvalue = (PValue) getOutput(transform); - DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream); - pstreams.put(pvalue, dStreamHolder); - leafStreams.add(dStreamHolder); - } - - boolean hasStream(PTransform<?, ?> transform) { - PValue pvalue = (PValue) getInput(transform); - return pstreams.containsKey(pvalue); - } - - JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) { - return getStream((PValue) getInput(transform)); - } - - JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) { - DStreamHolder<?> dStreamHolder = pstreams.get(pvalue); - JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream(); - leafStreams.remove(dStreamHolder); - return dStream; - } - - // used to set the RDD from the DStream in the RDDHolder for transformation - <T> void setInputRDD( - PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) { - setRDD((PValue) getInput(transform), rdd); - } - - // used to get the RDD transformation output and use it as the DStream transformation output - JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) { - return getRDD((PValue) getOutput(transform)); - } - - public JavaStreamingContext getStreamingContext() { - return jssc; - } - - @Override - protected void computeOutputs() { - for (DStreamHolder<?> streamHolder : leafStreams) { - computeOutput(streamHolder); - } - } - - private static <T> void computeOutput(DStreamHolder<T> streamHolder) { - streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() { - @Override - public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception { - rdd.rdd().cache(); - rdd.count(); - return null; - } - }); // force a DStream action - } - - @Override - public void close() { - if (timeout > 0) { - jssc.awaitTerminationOrTimeout(timeout); - } else { - jssc.awaitTermination(); - } - //TODO: stop gracefully ? - jssc.stop(false, false); - state = State.DONE; - super.close(); - } - - private State state = State.RUNNING; - - @Override - public State getState() { - return state; - } - - //---------------- override in order to expose in package - @Override - protected <I extends PInput> I getInput(PTransform<I, ?> transform) { - return super.getInput(transform); - } - @Override - protected <O extends POutput> O getOutput(PTransform<?, O> transform) { - return super.getOutput(transform); - } - - @Override - protected JavaSparkContext getSparkContext() { - return super.getSparkContext(); - } - - @Override - protected SparkRuntimeContext getRuntimeContext() { - return super.getRuntimeContext(); - } - - @Override - protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { - super.setCurrentTransform(transform); - } - - @Override - protected AppliedPTransform<?, ?, ?> getCurrentTransform() { - return super.getCurrentTransform(); - } - - @Override - protected <T> void setOutputRDD(PTransform<?, ?> transform, - JavaRDDLike<WindowedValue<T>, ?> rdd) { - super.setOutputRDD(transform, rdd); - } - - @Override - protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values, - Coder<T> coder) { - super.setOutputRDDFromValues(transform, values, coder); - } - - @Override - protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) { - return super.hasOutputRDD(transform); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java deleted file mode 100644 index 56d0dd9..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java +++ /dev/null @@ -1,418 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.streaming; - -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.google.api.client.util.Lists; -import com.google.api.client.util.Maps; -import com.google.api.client.util.Sets; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.io.AvroIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollectionList; -import com.google.cloud.dataflow.sdk.values.PDone; - -import com.google.common.reflect.TypeToken; -import kafka.serializer.Decoder; - -import org.apache.beam.runners.spark.DoFnFunction; -import org.apache.beam.runners.spark.EvaluationContext; -import org.apache.beam.runners.spark.SparkPipelineTranslator; -import org.apache.beam.runners.spark.TransformEvaluator; -import org.apache.beam.runners.spark.TransformTranslator; -import org.apache.beam.runners.spark.WindowingHelpers; -import org.apache.beam.runners.spark.io.ConsoleIO; -import org.apache.beam.runners.spark.io.CreateStream; -import org.apache.beam.runners.spark.io.KafkaIO; -import org.apache.beam.runners.spark.io.hadoop.HadoopIO; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaDStreamLike; -import org.apache.spark.streaming.api.java.JavaPairInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; - -import scala.Tuple2; - - -/** - * Supports translation between a DataFlow transform, and Spark's operations on DStreams. - */ -public final class StreamingTransformTranslator { - - private StreamingTransformTranslator() { - } - - private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() { - return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() { - @Override - public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream = - (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>) - ((StreamingEvaluationContext) context).getStream(transform); - dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum()); - } - }; - } - - private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() { - return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() { - @Override - public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - JavaStreamingContext jssc = sec.getStreamingContext(); - Class<K> keyClazz = transform.getKeyClass(); - Class<V> valueClazz = transform.getValueClass(); - Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass(); - Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass(); - Map<String, String> kafkaParams = transform.getKafkaParams(); - Set<String> topics = transform.getTopics(); - JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz, - valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics); - JavaDStream<WindowedValue<KV<K, V>>> inputStream = - inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() { - @Override - public KV<K, V> call(Tuple2<K, V> t2) throws Exception { - return KV.of(t2._1(), t2._2()); - } - }).map(WindowingHelpers.<KV<K, V>>windowFunction()); - sec.setStream(transform, inputStream); - } - }; - } - - private static <T> TransformEvaluator<Create.Values<T>> create() { - return new TransformEvaluator<Create.Values<T>>() { - @SuppressWarnings("unchecked") - @Override - public void evaluate(Create.Values<T> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - Iterable<T> elems = transform.getElements(); - Coder<T> coder = sec.getOutput(transform).getCoder(); - if (coder != VoidCoder.of()) { - // actual create - sec.setOutputRDDFromValues(transform, elems, coder); - } else { - // fake create as an input - // creates a stream with a single batch containing a single null element - // to invoke following transformations once - // to support DataflowAssert - sec.setDStreamFromQueue(transform, - Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)), - (Coder<Void>) coder); - } - } - }; - } - - private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() { - return new TransformEvaluator<CreateStream.QueuedValues<T>>() { - @Override - public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - Iterable<Iterable<T>> values = transform.getQueuedValues(); - Coder<T> coder = sec.getOutput(transform).getCoder(); - sec.setDStreamFromQueue(transform, values, coder); - } - }; - } - - private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() { - return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() { - @SuppressWarnings("unchecked") - @Override - public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - PCollectionList<T> pcs = sec.getInput(transform); - JavaDStream<WindowedValue<T>> first = - (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0)); - List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1); - for (int i = 1; i < pcs.size(); i++) { - rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i))); - } - JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest); - sec.setStream(transform, dstream); - } - }; - } - - private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform( - final SparkPipelineTranslator rddTranslator) { - return new TransformEvaluator<PT>() { - @SuppressWarnings("unchecked") - @Override - public void evaluate(PT transform, EvaluationContext context) { - TransformEvaluator<PT> rddEvaluator = - rddTranslator.translate((Class<PT>) transform.getClass()); - - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - if (sec.hasStream(transform)) { - JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream = - (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) - sec.getStream(transform); - - sec.setStream(transform, dStream - .transform(new RDDTransform<>(sec, rddEvaluator, transform))); - } else { - // if the transformation requires direct access to RDD (not in stream) - // this is used for "fake" transformations like with DataflowAssert - rddEvaluator.evaluate(transform, context); - } - } - }; - } - - /** - * RDD transform function If the transformation function doesn't have an input, create a fake one - * as an empty RDD. - * - * @param <PT> PTransform type - */ - private static final class RDDTransform<PT extends PTransform<?, ?>> - implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> { - - private final StreamingEvaluationContext context; - private final AppliedPTransform<?, ?, ?> appliedPTransform; - private final TransformEvaluator<PT> rddEvaluator; - private final PT transform; - - - private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator, - PT transform) { - this.context = context; - this.appliedPTransform = context.getCurrentTransform(); - this.rddEvaluator = rddEvaluator; - this.transform = transform; - } - - @Override - @SuppressWarnings("unchecked") - public JavaRDD<WindowedValue<Object>> - call(JavaRDD<WindowedValue<Object>> rdd) throws Exception { - AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform(); - context.setCurrentTransform(appliedPTransform); - context.setInputRDD(transform, rdd); - rddEvaluator.evaluate(transform, context); - if (!context.hasOutputRDD(transform)) { - // fake RDD as output - context.setOutputRDD(transform, - context.getSparkContext().<WindowedValue<Object>>emptyRDD()); - } - JavaRDD<WindowedValue<Object>> outRDD = - (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform); - context.setCurrentTransform(existingAPT); - return outRDD; - } - } - - @SuppressWarnings("unchecked") - private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD( - final SparkPipelineTranslator rddTranslator) { - return new TransformEvaluator<PT>() { - @Override - public void evaluate(PT transform, EvaluationContext context) { - TransformEvaluator<PT> rddEvaluator = - rddTranslator.translate((Class<PT>) transform.getClass()); - - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - if (sec.hasStream(transform)) { - JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream = - (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>) - sec.getStream(transform); - - dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform)); - } else { - rddEvaluator.evaluate(transform, context); - } - } - }; - } - - /** - * RDD output function. - * - * @param <PT> PTransform type - */ - private static final class RDDOutputOperator<PT extends PTransform<?, ?>> - implements Function<JavaRDD<WindowedValue<Object>>, Void> { - - private final StreamingEvaluationContext context; - private final AppliedPTransform<?, ?, ?> appliedPTransform; - private final TransformEvaluator<PT> rddEvaluator; - private final PT transform; - - - private RDDOutputOperator(StreamingEvaluationContext context, - TransformEvaluator<PT> rddEvaluator, PT transform) { - this.context = context; - this.appliedPTransform = context.getCurrentTransform(); - this.rddEvaluator = rddEvaluator; - this.transform = transform; - } - - @Override - @SuppressWarnings("unchecked") - public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception { - AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform(); - context.setCurrentTransform(appliedPTransform); - context.setInputRDD(transform, rdd); - rddEvaluator.evaluate(transform, context); - context.setCurrentTransform(existingAPT); - return null; - } - } - - private static final TransformTranslator.FieldGetter WINDOW_FG = - new TransformTranslator.FieldGetter(Window.Bound.class); - - private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() { - return new TransformEvaluator<Window.Bound<T>>() { - @Override - public void evaluate(Window.Bound<T> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - //--- first we apply windowing to the stream - WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform); - @SuppressWarnings("unchecked") - JavaDStream<WindowedValue<T>> dStream = - (JavaDStream<WindowedValue<T>>) sec.getStream(transform); - if (windowFn instanceof FixedWindows) { - Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize() - .getMillis()); - sec.setStream(transform, dStream.window(windowDuration)); - } else if (windowFn instanceof SlidingWindows) { - Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize() - .getMillis()); - Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod() - .getMillis()); - sec.setStream(transform, dStream.window(windowDuration, slideDuration)); - } - //--- then we apply windowing to the elements - DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); - DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn, - ((StreamingEvaluationContext)context).getRuntimeContext(), null); - @SuppressWarnings("unchecked") - JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream = - (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>) - sec.getStream(transform); - sec.setStream(transform, dstream.mapPartitions(dofn)); - } - }; - } - - private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps - .newHashMap(); - - static { - EVALUATORS.put(ConsoleIO.Write.Unbound.class, print()); - EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue()); - EVALUATORS.put(Create.Values.class, create()); - EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka()); - EVALUATORS.put(Window.Bound.class, window()); - EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl()); - } - - private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets - .newHashSet(); - - static { - //TODO - add support for the following - UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class); - UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class); - UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class); - UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class); - UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class); - UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class); - } - - @SuppressWarnings("unchecked") - private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> - getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) { - TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz); - if (transform == null) { - if (UNSUPPORTED_EVALUATORS.contains(clazz)) { - throw new UnsupportedOperationException("Dataflow transformation " + clazz - .getCanonicalName() - + " is currently unsupported by the Spark streaming pipeline"); - } - // DStream transformations will transform an RDD into another RDD - // Actions will create output - // In Dataflow it depends on the PTransform's Input and Output class - Class<?> pTOutputClazz = getPTransformOutputClazz(clazz); - if (PDone.class.equals(pTOutputClazz)) { - return foreachRDD(rddTranslator); - } else { - return rddTransform(rddTranslator); - } - } - return transform; - } - - private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) { - Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments(); - return TypeToken.of(clazz).resolveType(types[1]).getRawType(); - } - - /** - * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator. - * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation - */ - public static class Translator implements SparkPipelineTranslator { - - private final SparkPipelineTranslator rddTranslator; - - public Translator(SparkPipelineTranslator rddTranslator) { - this.rddTranslator = rddTranslator; - } - - @Override - public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) { - // streaming includes rdd transformations as well - return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz); - } - - @Override - public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) { - return getTransformEvaluator(clazz, rddTranslator); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java deleted file mode 100644 index 9c58126..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.streaming; - -import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; - -import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.runners.spark.SparkPipelineTranslator; -import org.apache.beam.runners.spark.TransformTranslator; - -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.Durations; - - -/** - * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing. - */ -public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator { - - // Currently, Spark streaming recommends batches no smaller then 500 msec - private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500); - - private boolean windowing; - private Duration batchDuration; - - public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) { - super(translator); - } - - private static final TransformTranslator.FieldGetter WINDOW_FG = - new TransformTranslator.FieldGetter(Window.Bound.class); - - // Use the smallest window (fixed or sliding) as Spark streaming's batch duration - @Override - protected <PT extends PTransform<? super PInput, POutput>> void - doVisitTransform(TransformTreeNode node) { - @SuppressWarnings("unchecked") - PT transform = (PT) node.getTransform(); - @SuppressWarnings("unchecked") - Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass(); - if (transformClass.isAssignableFrom(Window.Bound.class)) { - WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform); - if (windowFn instanceof FixedWindows) { - setBatchDuration(((FixedWindows) windowFn).getSize()); - } else if (windowFn instanceof SlidingWindows) { - if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) { - throw new UnsupportedOperationException("Spark does not support window offsets"); - } - // Sliding window size might as well set the batch duration. Applying the transformation - // will add the "slide" - setBatchDuration(((SlidingWindows) windowFn).getSize()); - } else if (!(windowFn instanceof GlobalWindows)) { - throw new IllegalStateException("Windowing function not supported: " + windowFn); - } - } - } - - private void setBatchDuration(org.joda.time.Duration duration) { - Long durationMillis = duration.getMillis(); - // validate window size - if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) { - throw new IllegalArgumentException("Windowing of size " + durationMillis + - "msec is not supported!"); - } - // choose the smallest duration to be Spark's batch duration, larger ones will be handled - // as window functions over the batched-stream - if (!windowing || this.batchDuration.milliseconds() > durationMillis) { - this.batchDuration = Durations.milliseconds(durationMillis); - } - windowing = true; - } - - public boolean isWindowing() { - return windowing; - } - - public Duration getBatchDuration() { - return batchDuration; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eb0341d4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java new file mode 100644 index 0000000..37bf849 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.apache.beam.runners.spark.util.BroadcastHelper; +import org.apache.spark.api.java.function.FlatMapFunction; + +/** + * Dataflow's Do functions correspond to Spark's FlatMap functions. + * + * @param <I> Input element type. + * @param <O> Output element type. + */ +public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValue<I>>, + WindowedValue<O>> { + private final DoFn<I, O> mFunction; + private final SparkRuntimeContext mRuntimeContext; + private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; + + /** + * @param fn DoFunction to be wrapped. + * @param runtime Runtime to apply function in. + * @param sideInputs Side inputs used in DoFunction. + */ + public DoFnFunction(DoFn<I, O> fn, + SparkRuntimeContext runtime, + Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { + this.mFunction = fn; + this.mRuntimeContext = runtime; + this.mSideInputs = sideInputs; + } + + @Override + public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) throws + Exception { + ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs); + ctxt.setup(); + mFunction.startBundle(ctxt); + return ctxt.getOutputIterable(iter, mFunction); + } + + private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> { + + private final List<WindowedValue<O>> outputs = new LinkedList<>(); + + ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, + BroadcastHelper<?>> sideInputs) { + super(fn, runtimeContext, sideInputs); + } + + @Override + public synchronized void output(O o) { + outputs.add(windowedValue != null ? windowedValue.withValue(o) : + WindowedValue.valueInEmptyWindows(o)); + } + + @Override + public synchronized void output(WindowedValue<O> o) { + outputs.add(o); + } + + @Override + protected void clearOutput() { + outputs.clear(); + } + + @Override + protected Iterator<WindowedValue<O>> getOutputIterator() { + return outputs.iterator(); + } + } + +}
