http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java deleted file mode 100644 index 56980a1..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.IOException; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.cloudera.dataflow.spark.ShardNameBuilder.replaceShardNumber; - -public final class ShardNameTemplateHelper { - - private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class); - - public static final String OUTPUT_FILE_PREFIX = "spark.dataflow.fileoutputformat.prefix"; - public static final String OUTPUT_FILE_TEMPLATE = "spark.dataflow.fileoutputformat.template"; - public static final String OUTPUT_FILE_SUFFIX = "spark.dataflow.fileoutputformat.suffix"; - - private ShardNameTemplateHelper() { - } - - public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format, - TaskAttemptContext context) throws IOException { - FileOutputCommitter committer = - (FileOutputCommitter) format.getOutputCommitter(context); - return new Path(committer.getWorkPath(), getOutputFile(context)); - } - - private static String getOutputFile(TaskAttemptContext context) { - TaskID taskId = context.getTaskAttemptID().getTaskID(); - int partition = taskId.getId(); - - String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX); - String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE); - String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX); - return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java deleted file mode 100644 index d3e8c9b..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.serializer.KryoSerializer; - -final class SparkContextFactory { - - /** - * If the property {@code dataflow.spark.test.reuseSparkContext} is set to - * {@code true} then the Spark context will be reused for dataflow pipelines. - * This property should only be enabled for tests. - */ - static final String TEST_REUSE_SPARK_CONTEXT = - "dataflow.spark.test.reuseSparkContext"; - private static JavaSparkContext sparkContext; - private static String sparkMaster; - - private SparkContextFactory() { - } - - static synchronized JavaSparkContext getSparkContext(String master, String appName) { - if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) { - if (sparkContext == null) { - sparkContext = createSparkContext(master, appName); - sparkMaster = master; - } else if (!master.equals(sparkMaster)) { - throw new IllegalArgumentException(String.format("Cannot reuse spark context " + - "with different spark master URL. Existing: %s, requested: %s.", - sparkMaster, master)); - } - return sparkContext; - } else { - return createSparkContext(master, appName); - } - } - - static synchronized void stopSparkContext(JavaSparkContext context) { - if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) { - context.stop(); - } - } - - private static JavaSparkContext createSparkContext(String master, String appName) { - SparkConf conf = new SparkConf(); - conf.setMaster(master); - conf.setAppName(appName); - conf.set("spark.serializer", KryoSerializer.class.getCanonicalName()); - return new JavaSparkContext(conf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java deleted file mode 100644 index 6762180..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineEvaluator.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; - -/** - * Pipeline {@link SparkPipelineRunner.Evaluator} for Spark. - */ -public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator { - - private final EvaluationContext ctxt; - - public SparkPipelineEvaluator(EvaluationContext ctxt, SparkPipelineTranslator translator) { - super(translator); - this.ctxt = ctxt; - } - - @Override - protected <PT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode - node) { - @SuppressWarnings("unchecked") - PT transform = (PT) node.getTransform(); - @SuppressWarnings("unchecked") - Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass(); - @SuppressWarnings("unchecked") TransformEvaluator<PT> evaluator = - (TransformEvaluator<PT>) translator.translate(transformClass); - LOG.info("Evaluating {}", transform); - AppliedPTransform<PInput, POutput, PT> appliedTransform = - AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform); - ctxt.setCurrentTransform(appliedTransform); - evaluator.evaluate(transform, ctxt); - ctxt.setCurrentTransform(null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java deleted file mode 100644 index e96162e..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptions.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.StreamingOptions; - -public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, - ApplicationNameOptions { - @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).") - @Default.String("local[1]") - String getSparkMaster(); - - void setSparkMaster(String master); - - @Override - @Default.Boolean(false) - boolean isStreaming(); - - @Override - @Default.String("spark dataflow pipeline job") - String getAppName(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsFactory.java deleted file mode 100644 index 89cd030..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; - -public final class SparkPipelineOptionsFactory { - private SparkPipelineOptionsFactory() { - } - - public static SparkPipelineOptions create() { - return PipelineOptionsFactory.as(SparkPipelineOptions.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsRegistrar.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsRegistrar.java deleted file mode 100644 index 21fe693..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineOptionsRegistrar.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; -import com.google.common.collect.ImmutableList; - -public class SparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar { - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java deleted file mode 100644 index a9c2d86..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.cloud.dataflow.sdk.values.PValue; - -import org.apache.spark.SparkException; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptions; -import com.cloudera.dataflow.spark.streaming.StreamingEvaluationContext; -import com.cloudera.dataflow.spark.streaming.StreamingTransformTranslator; -import com.cloudera.dataflow.spark.streaming.StreamingWindowPipelineDetector; - -/** - * The SparkPipelineRunner translate operations defined on a pipeline to a representation - * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run - * a dataflow pipeline with the default options of a single threaded spark instance in local mode, - * we would do the following: - * - * {@code - * Pipeline p = [logic for pipeline creation] - * EvaluationResult result = SparkPipelineRunner.create().run(p); - * } - * - * To create a pipeline runner to run against a different spark cluster, with a custom master url - * we would do the following: - * - * {@code - * Pipeline p = [logic for pipeline creation] - * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - * options.setSparkMaster("spark://host:port"); - * EvaluationResult result = SparkPipelineRunner.create(options).run(p); - * } - * - * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} - */ -public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult> { - - private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class); - /** - * Options used in this pipeline runner. - */ - private final SparkPipelineOptions mOptions; - - /** - * Creates and returns a new SparkPipelineRunner with default options. In particular, against a - * spark instance running in local mode. - * - * @return A pipeline runner with default options. - */ - public static SparkPipelineRunner create() { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - return new SparkPipelineRunner(options); - } - - /** - * Creates and returns a new SparkPipelineRunner with specified options. - * - * @param options The SparkPipelineOptions to use when executing the job. - * @return A pipeline runner that will execute with specified options. - */ - public static SparkPipelineRunner create(SparkPipelineOptions options) { - return new SparkPipelineRunner(options); - } - - /** - * Creates and returns a new SparkPipelineRunner with specified options. - * - * @param options The PipelineOptions to use when executing the job. - * @return A pipeline runner that will execute with specified options. - */ - public static SparkPipelineRunner fromOptions(PipelineOptions options) { - SparkPipelineOptions sparkOptions = - PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); - return new SparkPipelineRunner(sparkOptions); - } - - /** - * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single - * thread. - */ - private SparkPipelineRunner(SparkPipelineOptions options) { - mOptions = options; - } - - - @Override - public EvaluationResult run(Pipeline pipeline) { - try { - // validate streaming configuration - if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) { - throw new RuntimeException("A streaming job must be configured with " + - SparkStreamingPipelineOptions.class.getSimpleName() + ", found " + - mOptions.getClass().getSimpleName()); - } - LOG.info("Executing pipeline using the SparkPipelineRunner."); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions - .getSparkMaster(), mOptions.getAppName()); - - if (mOptions.isStreaming()) { - SparkPipelineTranslator translator = - new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); - // if streaming - fixed window should be defined on all UNBOUNDED inputs - StreamingWindowPipelineDetector streamingWindowPipelineDetector = - new StreamingWindowPipelineDetector(translator); - pipeline.traverseTopologically(streamingWindowPipelineDetector); - if (!streamingWindowPipelineDetector.isWindowing()) { - throw new IllegalStateException("Spark streaming pipeline must be windowed!"); - } - - Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration(); - LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds()); - EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval); - - pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator)); - ctxt.computeOutputs(); - - LOG.info("Streaming pipeline construction complete. Starting execution.."); - ((StreamingEvaluationContext) ctxt).getStreamingContext().start(); - - return ctxt; - } else { - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); - SparkPipelineTranslator translator = new TransformTranslator.Translator(); - pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator)); - ctxt.computeOutputs(); - - LOG.info("Pipeline execution complete."); - - return ctxt; - } - } catch (Exception e) { - // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler - // won't let you catch something that is not declared, so we can't catch - // SparkException here. Instead we do an instanceof check. - // Then we find the cause by seeing if it's a user exception (wrapped by our - // SparkProcessException), or just use the SparkException cause. - if (e instanceof SparkException && e.getCause() != null) { - if (e.getCause() instanceof SparkProcessContext.SparkProcessException && - e.getCause().getCause() != null) { - throw new RuntimeException(e.getCause().getCause()); - } else { - throw new RuntimeException(e.getCause()); - } - } - // otherwise just wrap in a RuntimeException - throw new RuntimeException(e); - } - } - - private EvaluationContext - createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, - Duration batchDuration) { - SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions; - JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); - return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout()); - } - - public abstract static class Evaluator implements Pipeline.PipelineVisitor { - protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class); - - protected final SparkPipelineTranslator translator; - - protected Evaluator(SparkPipelineTranslator translator) { - this.translator = translator; - } - - // Set upon entering a composite node which can be directly mapped to a single - // TransformEvaluator. - private TransformTreeNode currentTranslatedCompositeNode; - - /** - * If true, we're currently inside a subtree of a composite node which directly maps to a - * single - * TransformEvaluator; children nodes are ignored, and upon post-visiting the translated - * composite node, the associated TransformEvaluator will be visited. - */ - private boolean inTranslatedCompositeNode() { - return currentTranslatedCompositeNode != null; - } - - @Override - public void enterCompositeTransform(TransformTreeNode node) { - if (!inTranslatedCompositeNode() && node.getTransform() != null) { - @SuppressWarnings("unchecked") - Class<PTransform<?, ?>> transformClass = - (Class<PTransform<?, ?>>) node.getTransform().getClass(); - if (translator.hasTranslation(transformClass)) { - LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); - LOG.debug("Composite transform class: '{}'", transformClass); - currentTranslatedCompositeNode = node; - } - } - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - // NB: We depend on enterCompositeTransform and leaveCompositeTransform providing 'node' - // objects for which Object.equals() returns true iff they are the same logical node - // within the tree. - if (inTranslatedCompositeNode() && node.equals(currentTranslatedCompositeNode)) { - LOG.info("Post-visiting directly-translatable composite transform: '{}'", - node.getFullName()); - doVisitTransform(node); - currentTranslatedCompositeNode = null; - } - } - - @Override - public void visitTransform(TransformTreeNode node) { - if (inTranslatedCompositeNode()) { - LOG.info("Skipping '{}'; already in composite transform.", node.getFullName()); - return; - } - doVisitTransform(node); - } - - protected abstract <PT extends PTransform<? super PInput, POutput>> void - doVisitTransform(TransformTreeNode node); - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunnerRegistrar.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunnerRegistrar.java deleted file mode 100644 index 5bdd322..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunnerRegistrar.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar; -import com.google.common.collect.ImmutableList; - -public class SparkPipelineRunnerRegistrar implements PipelineRunnerRegistrar { - @Override - public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>>of(SparkPipelineRunner.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java deleted file mode 100644 index d90363f..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineTranslator.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ -package com.cloudera.dataflow.spark; - -import com.google.cloud.dataflow.sdk.transforms.PTransform; - -/** - * Translator to support translation between Dataflow transformations and Spark transformations. - */ -public interface SparkPipelineTranslator { - - boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz); - - <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java deleted file mode 100644 index 73cec25..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingInternals; -import com.google.cloud.dataflow.sdk.util.state.StateInternals; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterables; - -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext { - - private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class); - - private final DoFn<I, O> fn; - private final SparkRuntimeContext mRuntimeContext; - private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; - - protected WindowedValue<I> windowedValue; - - SparkProcessContext(DoFn<I, O> fn, - SparkRuntimeContext runtime, - Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { - fn.super(); - this.fn = fn; - this.mRuntimeContext = runtime; - this.mSideInputs = sideInputs; - } - - void setup() { - setupDelegateAggregators(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return mRuntimeContext.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - @SuppressWarnings("unchecked") - BroadcastHelper<Iterable<WindowedValue<?>>> broadcastHelper = - (BroadcastHelper<Iterable<WindowedValue<?>>>) mSideInputs.get(view.getTagInternal()); - Iterable<WindowedValue<?>> contents = broadcastHelper.getValue(); - return view.fromIterableInternal(contents); - } - - @Override - public abstract void output(O output); - - public abstract void output(WindowedValue<O> output); - - @Override - public <T> void sideOutput(TupleTag<T> tupleTag, T t) { - String message = "sideOutput is an unsupported operation for doFunctions, use a " + - "MultiDoFunction instead."; - LOG.warn(message); - throw new UnsupportedOperationException(message); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) { - String message = - "sideOutputWithTimestamp is an unsupported operation for doFunctions, use a " + - "MultiDoFunction instead."; - LOG.warn(message); - throw new UnsupportedOperationException(message); - } - - @Override - public <AI, AO> Aggregator<AI, AO> createAggregatorInternal( - String named, - Combine.CombineFn<AI, ?, AO> combineFn) { - return mRuntimeContext.createAggregator(named, combineFn); - } - - @Override - public I element() { - return windowedValue.getValue(); - } - - @Override - public void outputWithTimestamp(O output, Instant timestamp) { - output(WindowedValue.of(output, timestamp, - windowedValue.getWindows(), windowedValue.getPane())); - } - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - @Override - public BoundedWindow window() { - if (!(fn instanceof DoFn.RequiresWindowAccess)) { - throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindow."); - } - return Iterables.getOnlyElement(windowedValue.getWindows()); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public WindowingInternals<I, O> windowingInternals() { - return new WindowingInternals<I, O>() { - - @Override - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - @Override - public void outputWindowedValue(O output, Instant timestamp, Collection<? - extends BoundedWindow> windows, PaneInfo paneInfo) { - output(WindowedValue.of(output, timestamp, windows, paneInfo)); - } - - @Override - public StateInternals stateInternals() { - throw new UnsupportedOperationException( - "WindowingInternals#stateInternals() is not yet supported."); - } - - @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException( - "WindowingInternals#timerInternals() is not yet supported."); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, - Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new UnsupportedOperationException( - "WindowingInternals#writePCollectionViewData() is not yet supported."); - } - }; - } - - protected abstract void clearOutput(); - protected abstract Iterator<V> getOutputIterator(); - - protected Iterable<V> getOutputIterable(final Iterator<WindowedValue<I>> iter, - final DoFn<I, O> doFn) { - return new Iterable<V>() { - @Override - public Iterator<V> iterator() { - return new ProcCtxtIterator(iter, doFn); - } - }; - } - - private class ProcCtxtIterator extends AbstractIterator<V> { - - private final Iterator<WindowedValue<I>> inputIterator; - private final DoFn<I, O> doFn; - private Iterator<V> outputIterator; - private boolean calledFinish; - - ProcCtxtIterator(Iterator<WindowedValue<I>> iterator, DoFn<I, O> doFn) { - this.inputIterator = iterator; - this.doFn = doFn; - this.outputIterator = getOutputIterator(); - } - - @Override - protected V computeNext() { - // Process each element from the (input) iterator, which produces, zero, one or more - // output elements (of type V) in the output iterator. Note that the output - // collection (and iterator) is reset between each call to processElement, so the - // collection only holds the output values for each call to processElement, rather - // than for the whole partition (which would use too much memory). - while (true) { - if (outputIterator.hasNext()) { - return outputIterator.next(); - } else if (inputIterator.hasNext()) { - clearOutput(); - windowedValue = inputIterator.next(); - try { - doFn.processElement(SparkProcessContext.this); - } catch (Exception e) { - throw new SparkProcessException(e); - } - outputIterator = getOutputIterator(); - } else { - // no more input to consume, but finishBundle can produce more output - if (!calledFinish) { - clearOutput(); - try { - calledFinish = true; - doFn.finishBundle(SparkProcessContext.this); - } catch (Exception e) { - throw new SparkProcessException(e); - } - outputIterator = getOutputIterator(); - continue; // try to consume outputIterator from start of loop - } - return endOfData(); - } - } - } - } - - static class SparkProcessException extends RuntimeException { - SparkProcessException(Throwable t) { - super(t); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java deleted file mode 100644 index ec590a9..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkRuntimeContext.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderRegistry; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Max; -import com.google.cloud.dataflow.sdk.transforms.Min; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.collect.ImmutableList; -import org.apache.spark.Accumulator; -import org.apache.spark.api.java.JavaSparkContext; - -import com.cloudera.dataflow.spark.aggregators.AggAccumParam; -import com.cloudera.dataflow.spark.aggregators.NamedAggregators; - -/** - * The SparkRuntimeContext allows us to define useful features on the client side before our - * data flow program is launched. - */ -public class SparkRuntimeContext implements Serializable { - /** - * An accumulator that is a map from names to aggregators. - */ - private final Accumulator<NamedAggregators> accum; - - private final String serializedPipelineOptions; - - /** - * Map fo names to dataflow aggregators. - */ - private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>(); - private transient CoderRegistry coderRegistry; - - SparkRuntimeContext(JavaSparkContext jsc, Pipeline pipeline) { - this.accum = jsc.accumulator(new NamedAggregators(), new AggAccumParam()); - this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions()); - } - - private static String serializePipelineOptions(PipelineOptions pipelineOptions) { - try { - return new ObjectMapper().writeValueAsString(pipelineOptions); - } catch (JsonProcessingException e) { - throw new IllegalStateException("Failed to serialize the pipeline options.", e); - } - } - - private static PipelineOptions deserializePipelineOptions(String serializedPipelineOptions) { - try { - return new ObjectMapper().readValue(serializedPipelineOptions, PipelineOptions.class); - } catch (IOException e) { - throw new IllegalStateException("Failed to deserialize the pipeline options.", e); - } - } - - /** - * Retrieves corresponding value of an aggregator. - * - * @param aggregatorName Name of the aggregator to retrieve the value of. - * @param typeClass Type class of value to be retrieved. - * @param <T> Type of object to be returned. - * @return The value of the aggregator. - */ - public <T> T getAggregatorValue(String aggregatorName, Class<T> typeClass) { - return accum.value().getValue(aggregatorName, typeClass); - } - - public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) { - @SuppressWarnings("unchecked") - Class<T> aggValueClass = (Class<T>) aggregator.getCombineFn().getOutputType().getRawType(); - final T aggregatorValue = getAggregatorValue(aggregator.getName(), aggValueClass); - return new AggregatorValues<T>() { - @Override - public Collection<T> getValues() { - return ImmutableList.of(aggregatorValue); - } - - @Override - public Map<String, T> getValuesAtSteps() { - throw new UnsupportedOperationException("getValuesAtSteps is not supported."); - } - }; - } - - public synchronized PipelineOptions getPipelineOptions() { - return deserializePipelineOptions(serializedPipelineOptions); - } - - /** - * Creates and aggregator and associates it with the specified name. - * - * @param named Name of aggregator. - * @param combineFn Combine function used in aggregation. - * @param <IN> Type of inputs to aggregator. - * @param <INTER> Intermediate data type - * @param <OUT> Type of aggregator outputs. - * @return Specified aggregator - */ - public synchronized <IN, INTER, OUT> Aggregator<IN, OUT> createAggregator( - String named, - Combine.CombineFn<? super IN, INTER, OUT> combineFn) { - @SuppressWarnings("unchecked") - Aggregator<IN, OUT> aggregator = (Aggregator<IN, OUT>) aggregators.get(named); - if (aggregator == null) { - @SuppressWarnings("unchecked") - NamedAggregators.CombineFunctionState<IN, INTER, OUT> state = - new NamedAggregators.CombineFunctionState<>( - (Combine.CombineFn<IN, INTER, OUT>) combineFn, - (Coder<IN>) getCoder(combineFn), - this); - accum.add(new NamedAggregators(named, state)); - aggregator = new SparkAggregator<>(named, state); - aggregators.put(named, aggregator); - } - return aggregator; - } - - public CoderRegistry getCoderRegistry() { - if (coderRegistry == null) { - coderRegistry = new CoderRegistry(); - coderRegistry.registerStandardCoders(); - } - return coderRegistry; - } - - private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) { - try { - if (combiner.getClass() == Sum.SumIntegerFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); - } else if (combiner.getClass() == Sum.SumLongFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); - } else if (combiner.getClass() == Sum.SumDoubleFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); - } else if (combiner.getClass() == Min.MinIntegerFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); - } else if (combiner.getClass() == Min.MinLongFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); - } else if (combiner.getClass() == Min.MinDoubleFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); - } else if (combiner.getClass() == Max.MaxIntegerFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); - } else if (combiner.getClass() == Max.MaxLongFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); - } else if (combiner.getClass() == Max.MaxDoubleFn.class) { - return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); - } else { - throw new IllegalArgumentException("unsupported combiner in Aggregator: " - + combiner.getClass().getName()); - } - } catch (CannotProvideCoderException e) { - throw new IllegalStateException("Could not determine default coder for combiner", e); - } - } - - /** - * Initialize spark aggregators exactly once. - * - * @param <IN> Type of element fed in to aggregator. - */ - private static class SparkAggregator<IN, OUT> implements Aggregator<IN, OUT>, Serializable { - private final String name; - private final NamedAggregators.State<IN, ?, OUT> state; - - SparkAggregator(String name, NamedAggregators.State<IN, ?, OUT> state) { - this.name = name; - this.state = state; - } - - @Override - public String getName() { - return name; - } - - @Override - public void addValue(IN elem) { - state.update(elem); - } - - @Override - public Combine.CombineFn<IN, ?, OUT> getCombineFn() { - return state.getCombineFn(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java deleted file mode 100644 index ef24137..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.avro.mapreduce.AvroKeyOutputFormat; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T> - implements ShardNameTemplateAware { - - @Override - public void checkOutputSpecs(JobContext job) { - // don't fail if the output already exists - } - - @Override - protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException { - Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context); - return path.getFileSystem(context.getConfiguration()).create(path); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java deleted file mode 100644 index 3ab07b5..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.IOException; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; - -public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V> - implements ShardNameTemplateAware { - - @Override - public void checkOutputSpecs(JobContext job) { - // don't fail if the output already exists - } - - @Override - public Path getDefaultWorkFile(TaskAttemptContext context, - String extension) throws IOException { - // note that the passed-in extension is ignored since it comes from the template - return ShardNameTemplateHelper.getDefaultWorkFile(this, context); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java deleted file mode 100644 index a8e218d..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.IOException; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; - -public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V> - implements ShardNameTemplateAware { - - @Override - public void checkOutputSpecs(JobContext job) { - // don't fail if the output already exists - } - - @Override - public Path getDefaultWorkFile(TaskAttemptContext context, - String extension) throws IOException { - // note that the passed-in extension is ignored since it comes from the template - return ShardNameTemplateHelper.getDefaultWorkFile(this, context); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java deleted file mode 100644 index 52842d5..0000000 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformEvaluator.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. - * - * Cloudera, Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"). You may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for - * the specific language governing permissions and limitations under the - * License. - */ - -package com.cloudera.dataflow.spark; - -import java.io.Serializable; - -import com.google.cloud.dataflow.sdk.transforms.PTransform; - -public interface TransformEvaluator<PT extends PTransform<?, ?>> extends Serializable { - void evaluate(PT transform, EvaluationContext context); -}
