Repository: beam Updated Branches: refs/heads/release-2.0.0 20bfa9411 -> 7dfc45563
[BEAM-831] Fix chaining, add test. closes #2216 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3f5282d5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3f5282d5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3f5282d5 Branch: refs/heads/release-2.0.0 Commit: 3f5282d515fa53516fda6d0376cc912560fd6d85 Parents: 6eab5c9 Author: Thomas Weise <[email protected]> Authored: Fri May 5 06:45:34 2017 -0700 Committer: Thomas Weise <[email protected]> Committed: Mon May 8 08:37:29 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 11 +++-- .../beam/runners/apex/TestApexRunner.java | 8 ++++ .../apex/translation/TranslationContext.java | 26 ++++------- .../beam/runners/apex/ApexRunnerTest.java | 47 ++++++++++++++++---- .../FlattenPCollectionTranslatorTest.java | 13 ++---- 5 files changed, 67 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index a50e10e..366308e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -70,11 +70,11 @@ import org.apache.hadoop.conf.Configuration; * pipeline to an Apex DAG and executes it on an Apex cluster. * */ -@SuppressWarnings({"rawtypes", "unchecked"}) public class ApexRunner extends PipelineRunner<ApexRunnerResult> { private final ApexPipelineOptions options; public static final String CLASSPATH_SCHEME = "classpath"; + protected boolean translateOnly = false; /** * TODO: this isn't thread safe and may cause issues when tests run in parallel @@ -93,6 +93,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { return new ApexRunner(apexPipelineOptions); } + @SuppressWarnings({"rawtypes"}) private List<PTransformOverride> getOverrides() { return ImmutableList.<PTransformOverride>builder() .add( @@ -156,7 +157,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } if (options.isEmbeddedExecution()) { - Launcher<AppHandle> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); + EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); if (options.isEmbeddedExecutionDebugMode()) { @@ -166,11 +167,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { Configuration conf = new Configuration(false); ApexYarnLauncher.addProperties(conf, configProperties); try { + if (translateOnly) { + launcher.prepareDAG(apexApp, conf); + return new ApexRunnerResult(launcher.getDAG(), null); + } ApexRunner.ASSERTION_ERROR.set(null); AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes); return new ApexRunnerResult(apexDAG.get(), apexAppResult); } catch (Exception e) { - Throwables.propagateIfPossible(e); + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } } else { http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java index e068db0..b68d3da 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.apex; +import com.datatorrent.api.DAG; import java.io.IOException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; @@ -44,6 +45,13 @@ public class TestApexRunner extends PipelineRunner<ApexRunnerResult> { return new TestApexRunner(apexOptions); } + public static DAG translate(Pipeline pipeline, ApexPipelineOptions options) { + ApexRunner delegate = new ApexRunner(options); + delegate.translateOnly = true; + DAG dag = delegate.run(pipeline).getApexDAG(); + return dag; + } + @Override public ApexRunnerResult run(Pipeline pipeline) { ApexRunnerResult result = delegate.run(pipeline); http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 1224e25..a5e3028 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -217,36 +218,27 @@ class TranslationContext { List<InputPortInfo> sinks = streamEntry.getValue().getRight(); OutputPortInfo source = streamEntry.getValue().getLeft(); PTransform sourceTransform = source.transform.getTransform(); - if (sourceTransform instanceof ParDo.Bound - || sourceTransform instanceof ParDo.BoundMulti) { - // Source is ParDo.. Check sink(s) + if (sourceTransform instanceof ParDo.MultiOutput + || sourceTransform instanceof Window.Assign) { + // source qualifies for chaining, check sink(s) for (InputPortInfo sink : sinks) { PTransform transform = sink.transform.getTransform(); - if (transform instanceof ParDo.Bound) { - ParDo.Bound t = (ParDo.Bound) transform; - if (t.getSideInputs().size() > 0) { - loc = DAG.Locality.CONTAINER_LOCAL; - break; - } else { - loc = DAG.Locality.THREAD_LOCAL; - } - } else if (transform instanceof ParDo.BoundMulti) { - ParDo.BoundMulti t = (ParDo.BoundMulti) transform; + if (transform instanceof ParDo.MultiOutput) { + ParDo.MultiOutput t = (ParDo.MultiOutput) transform; if (t.getSideInputs().size() > 0) { loc = DAG.Locality.CONTAINER_LOCAL; break; } else { loc = DAG.Locality.THREAD_LOCAL; } + } else if (transform instanceof Window.Assign) { + loc = DAG.Locality.THREAD_LOCAL; } else { - // Sink is not ParDo.. set null locality. + // cannot chain, if there is any other sink loc = null; break; } } - } else { - // Source is not ParDo... set null locality - loc = null; } streamMeta.setLocality(loc); http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java index e9e9a5b..c5521d1 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java @@ -18,15 +18,23 @@ package org.apache.beam.runners.apex; import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.DAG.OperatorMeta; import com.datatorrent.stram.engine.OperatorContext; +import com.google.common.collect.Sets; import java.io.File; import java.io.FileOutputStream; import java.util.Properties; +import java.util.Set; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -41,16 +49,13 @@ public class ApexRunnerTest { String operName = "testProperties"; ApexPipelineOptions options = PipelineOptionsFactory.create() .as(ApexPipelineOptions.class); - options.setRunner(ApexRunner.class); // default configuration from class path - Pipeline p = Pipeline.create(options); + Pipeline p = Pipeline.create(); Create.Values<Void> empty = Create.empty(VoidCoder.of()); p.apply(operName, empty); - ApexRunnerResult result = (ApexRunnerResult) p.run(); - result.cancel(); - DAG dag = result.getApexDAG(); + DAG dag = TestApexRunner.translate(p, options); OperatorMeta t1Meta = dag.getOperatorMeta(operName); Assert.assertNotNull(t1Meta); Assert.assertEquals(new Integer(32), t1Meta.getValue(OperatorContext.MEMORY_MB)); @@ -63,14 +68,40 @@ public class ApexRunnerTest { props.store(fos, ""); } options.setConfigFile(tmp.getAbsolutePath()); - result = (ApexRunnerResult) p.run(); - result.cancel(); + dag = TestApexRunner.translate(p, options); tmp.delete(); - dag = result.getApexDAG(); + t1Meta = dag.getOperatorMeta(operName); Assert.assertNotNull(t1Meta); Assert.assertEquals(new Integer(64), t1Meta.getValue(OperatorContext.MEMORY_MB)); } + @Test + public void testParDoChaining() throws Exception { + Pipeline p = Pipeline.create(); + long numElements = 1000; + PCollection<Long> input = p.apply(GenerateSequence.from(0).to(numElements)); + PAssert.thatSingleton(input.apply("Count", Count.<Long>globally())).isEqualTo(numElements); + + ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); + DAG dag = TestApexRunner.translate(p, options); + + String[] expectedThreadLocal = { "/CreateActual/FilterActuals/Window.Assign" }; + Set<String> actualThreadLocal = Sets.newHashSet(); + for (DAG.StreamMeta sm : dag.getAllStreamsMeta()) { + DAG.OutputPortMeta opm = sm.getSource(); + if (sm.getLocality() == Locality.THREAD_LOCAL) { + String name = opm.getOperatorMeta().getName(); + String prefix = "PAssert$"; + if (name.startsWith(prefix)) { + // remove indeterministic prefix + name = name.substring(prefix.length() + 1); + } + actualThreadLocal.add(name); + } + } + Assert.assertThat(actualThreadLocal, Matchers.hasItems(expectedThreadLocal)); + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java index 64ca0ee..929778a 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java @@ -26,12 +26,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; -import org.apache.apex.api.EmbeddedAppLauncher; -import org.apache.apex.api.Launcher; -import org.apache.apex.api.Launcher.LaunchMode; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.TestApexRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -100,16 +98,11 @@ public class FlattenPCollectionTranslatorTest { @Test public void testFlattenSingleCollection() { ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); - options.setRunner(ApexRunner.class); - ApexPipelineTranslator translator = new ApexPipelineTranslator(options); - EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); - DAG dag = launcher.getDAG(); - - Pipeline p = Pipeline.create(options); + Pipeline p = Pipeline.create(); PCollection<String> single = p.apply(Create.of(Collections.singletonList("1"))); PCollectionList.of(single).apply(Flatten.<String>pCollections()) .apply(ParDo.of(new EmbeddedCollector())); - translator.translate(p, dag); + DAG dag = TestApexRunner.translate(p, options); Assert.assertNotNull( dag.getOperatorMeta("ParDo(EmbeddedCollector)/ParMultiDo(EmbeddedCollector)")); }
