jstorm-runner: 1. Use the TupleTag of "PCollection expand" when getting input tags and output tags 2. Check the exception record when asserting of unit test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30f3eda6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30f3eda6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30f3eda6 Branch: refs/heads/jstorm-runner Commit: 30f3eda64c68cea092c42b7acc1dfd98eb8cbbd0 Parents: df154de Author: basti.lj <[email protected]> Authored: Mon Jul 17 15:55:01 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:58 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/TestJStormRunner.java | 9 +++++++++ .../runners/jstorm/translation/DoFnExecutor.java | 6 +++++- .../jstorm/translation/MultiOutputDoFnExecutor.java | 6 +----- .../jstorm/translation/ParDoBoundTranslator.java | 5 +++-- .../jstorm/translation/TranslationContext.java | 16 +++++++++------- 5 files changed, 27 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index b1b0379..a117675 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -8,6 +8,7 @@ import com.alibaba.jstorm.metric.AsmWindow; import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.metric.MetaType; import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.task.error.TaskReportErrorAndDie; import com.alibaba.jstorm.utils.JStormUtils; import com.google.common.base.Optional; import com.google.common.collect.Maps; @@ -56,14 +57,21 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { if (numberOfAssertions == 0) { // If assert number is zero, wait 5 sec JStormUtils.sleepMs(5000); + Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); + if (taskExceptionRec != null) { + throw new RuntimeException(taskExceptionRec.getCause()); + } return result; } else { for (int i = 0; i < 40; ++i) { Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions); + Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); if (success.isPresent() && success.get()) { return result; } else if (success.isPresent() && !success.get()) { throw new AssertionError("Failed assertion checks."); + } else if (taskExceptionRec != null) { + throw new RuntimeException(taskExceptionRec.getCause()); } else { JStormUtils.sleepMs(500); } @@ -74,6 +82,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { } finally { clearPAssertCount(); cancel(result); + TaskReportErrorAndDie.setExceptionRecord(null); } } http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index fdd9af6..6baa944 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -203,8 +203,12 @@ class DoFnExecutor<InputT, OutputT> implements Executor { tag, mainInputTag, sideInputs, elem.getValue())); if (mainInputTag.equals(tag)) { processMainInput(elem); - } else { + } else if (sideInputTagToView.containsKey(tag)) { processSideInput(tag, elem); + } else { + LOG.warn("Discard unexpected elem={} from tag={}", elem.getValue(), tag); + LOG.warn("Current mainInputTag={}, sideInputTags={}", + mainInputTag, sideInputTagToView.keySet()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java index 49b0f85..138a5dc 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java @@ -46,11 +46,7 @@ class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, Outp public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { @Override public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - if (localTupleTagMap.containsKey(tag)) { - executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output); - } else { - executorsBolt.processExecutorElem(tag, output); - } + executorsBolt.processExecutorElem(tag, output); } } http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java index 6feb7f8..e6d09c4 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java @@ -20,8 +20,10 @@ package org.apache.beam.runners.jstorm.translation; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.List; import java.util.Map; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -54,8 +56,7 @@ class ParDoBoundTranslator<InputT, OutputT> TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); List<TupleTag<?>> sideOutputTags = Lists.newArrayList(); - Map<TupleTag<?>, PValue> allInputs = - avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs()); + Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs()); for (PCollectionView pCollectionView : transform.getSideInputs()) { allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); } http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index e25f211..101921f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -36,7 +36,6 @@ import java.util.Map; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.PValueBase; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; @@ -184,10 +183,6 @@ public class TranslationContext { TupleTag tag = entry.getKey(); PValue value = entry.getValue(); - // use tag of PValueBase - if (value instanceof PValueBase) { - tag = ((PValueBase) value).expand().keySet().iterator().next(); - } executionGraphContext.registerStreamProducer( TaggedPValue.of(tag, value), Stream.Producer.of(name, tag.getId(), value.getName())); @@ -198,6 +193,9 @@ public class TranslationContext { for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) { TupleTag tag = entry.getKey(); PValue value = entry.getValue(); + if (userGraphContext.findTupleTag(value) != null) { + tag = userGraphContext.findTupleTag(value); + } bolt.addExecutor(tag, executor); // filter all connections inside bolt @@ -269,11 +267,15 @@ public class TranslationContext { } public TupleTag<?> getInputTag() { - return currentTransform.getInputs().keySet().iterator().next(); + return pValueToTupleTag.get(this.getInput()); } public List<TupleTag<?>> getInputTags() { - return Lists.newArrayList(currentTransform.getInputs().keySet()); + List inputTags = Lists.newArrayList(); + for (PValue value : currentTransform.getInputs().values()) { + inputTags.add(pValueToTupleTag.get(value)); + } + return inputTags; } public <T extends PValue> T getOutput() {
