More cleanup. View.AsSingleton is already exercised by the TfIdf test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78d66145 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78d66145 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78d66145 Branch: refs/heads/master Commit: 78d66145ed37ef82bb8cc8f1f7a4783d018cceae Parents: 89e2bb5 Author: Tom White <t...@cloudera.com> Authored: Tue Jun 30 09:00:04 2015 +0100 Committer: Tom White <t...@cloudera.com> Committed: Thu Mar 10 11:15:14 2016 +0000 ---------------------------------------------------------------------- .../main/java/com/cloudera/dataflow/spark/EvaluationContext.java | 3 ++- .../java/com/cloudera/dataflow/spark/SparkContextFactory.java | 2 +- .../java/com/cloudera/dataflow/spark/SparkProcessContext.java | 4 +--- .../java/com/cloudera/dataflow/spark/TransformTranslator.java | 2 -- 4 files changed, 4 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78d66145/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java index df3f7f7..5337264 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java @@ -193,8 +193,9 @@ public class EvaluationContext implements EvaluationResult { SparkContextFactory.stopSparkContext(jsc); } + /** The runner is blocking. */ @Override public State getState() { - return State.UNKNOWN; + return State.DONE; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78d66145/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java index 483899a..b7570b3 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkContextFactory.java @@ -59,7 +59,7 @@ final class SparkContextFactory { private static JavaSparkContext createSparkContext(String master) { SparkConf conf = new SparkConf(); conf.setMaster(master); - conf.setAppName("spark pipeline job"); + conf.setAppName("spark dataflow pipeline job"); conf.set("spark.serializer", KryoSerializer.class.getCanonicalName()); return new JavaSparkContext(conf); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78d66145/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java index fd4e048..12fb4e0 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java @@ -40,12 +40,11 @@ import org.slf4j.LoggerFactory; abstract class SparkProcessContext<I, O> extends DoFn<I, O>.ProcessContext { - private static final Logger LOG = LoggerFactory.getLogger(DoFnFunction.class); + private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class); private static final Collection<? extends BoundedWindow> GLOBAL_WINDOWS = Collections.singletonList(GlobalWindow.INSTANCE); - private final DoFn<I, O> mFunction; private final SparkRuntimeContext mRuntimeContext; private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; @@ -55,7 +54,6 @@ abstract class SparkProcessContext<I, O> extends DoFn<I, O>.ProcessContext { SparkRuntimeContext runtime, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { fn.super(); - this.mFunction = fn; this.mRuntimeContext = runtime; this.mSideInputs = sideInputs; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78d66145/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java index 6b78d9e..2689424 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java @@ -539,7 +539,6 @@ public final class TransformTranslator { return new TransformEvaluator<View.AsSingleton<T>>() { @Override public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) { - // TODO: PROBABLY INCORRECT. Fix it. Iterable<T> input = context.get(context.getInput(transform)); context.setPView(context.getOutput(transform), Iterables.transform(input, new WindowingFunction<T>())); @@ -552,7 +551,6 @@ public final class TransformTranslator { @Override public void evaluate(View.AsIterable<T> transform, EvaluationContext context) { Iterable<T> input = context.get(context.getInput(transform)); - context.setPView(context.getOutput(transform), Iterables.transform(input, new WindowingFunction<T>())); }