First wave of changes from feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1229b009 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1229b009 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1229b009 Branch: refs/heads/master Commit: 1229b009aab111734ac9dc488cc50099285a43f6 Parents: 89a21ca Author: Sean Owen <so...@cloudera.com> Authored: Fri Jan 22 15:20:48 2016 +0000 Committer: Tom White <t...@cloudera.com> Committed: Thu Mar 10 11:15:17 2016 +0000 ---------------------------------------------------------------------- .../java/com/cloudera/dataflow/spark/EvaluationContext.java | 2 -- .../java/com/cloudera/dataflow/spark/SparkPipelineRunner.java | 2 +- .../java/com/cloudera/dataflow/spark/TransformTranslator.java | 6 +----- 3 files changed, 2 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1229b009/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java index 356acab..a6ac6c2 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java @@ -49,7 +49,6 @@ public class EvaluationContext implements EvaluationResult { private final JavaSparkContext jsc; private final Pipeline pipeline; private final SparkRuntimeContext runtime; - //private final CoderRegistry registry; private final Map<PValue, RDDHolder<?>> pcollections = new LinkedHashMap<>(); private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>(); private final Set<PValue> multireads = new LinkedHashSet<>(); @@ -60,7 +59,6 @@ public class EvaluationContext implements EvaluationResult { public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) { this.jsc = jsc; this.pipeline = pipeline; - //this.registry = pipeline.getCoderRegistry(); this.runtime = new SparkRuntimeContext(jsc, pipeline); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1229b009/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java index 5287f20..a9c2d86 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkPipelineRunner.java @@ -213,7 +213,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult> (Class<PTransform<?, ?>>) node.getTransform().getClass(); if (translator.hasTranslation(transformClass)) { LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); - LOG.debug("Composite transform class: '{}'", node.getTransform().getClass()); + LOG.debug("Composite transform class: '{}'", transformClass); currentTranslatedCompositeNode = node; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1229b009/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java index db335ed..58b1924 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java @@ -772,10 +772,6 @@ public final class TransformTranslator { EVALUATORS.put(Window.Bound.class, window()); } - public static <PT extends PTransform<?, ?>> boolean hasTransformEvaluator(Class<PT> clazz) { - return EVALUATORS.containsKey(clazz); - } - public static <PT extends PTransform<?, ?>> TransformEvaluator<PT> getTransformEvaluator(Class<PT> clazz) { @SuppressWarnings("unchecked") @@ -793,7 +789,7 @@ public final class TransformTranslator { @Override public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) { - return hasTransformEvaluator(clazz); + return EVALUATORS.containsKey(clazz); } @Override