jstorm-runner: Support multiple copies of Flatten
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1178f9fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1178f9fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1178f9fb Branch: refs/heads/jstorm-runner Commit: 1178f9fb957c7e6cf1b277696ff63dc0e29a6d5e Parents: 52913b7 Author: basti.lj <[email protected]> Authored: Thu Jul 20 20:04:24 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:03:00 2017 +0800 ---------------------------------------------------------------------- .../runners/jstorm/translation/FlattenExecutor.java | 12 ++++++++++-- .../jstorm/translation/FlattenTranslator.java | 15 +++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1178f9fb/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java index a64f494..928fa24 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.jstorm.translation; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Map; + import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -32,8 +34,11 @@ class FlattenExecutor<InputT> implements Executor { private TupleTag mainOutputTag; private ExecutorContext context; private ExecutorsBolt executorsBolt; + private final Map<TupleTag, Integer> tagToCopyNum; - public FlattenExecutor(String description, TupleTag mainTupleTag) { + public FlattenExecutor(String description, TupleTag mainTupleTag, + Map<TupleTag, Integer> tagToCopyNum) { + this.tagToCopyNum = checkNotNull(tagToCopyNum, "tagToCopyNum"); this.description = checkNotNull(description, "description"); this.mainOutputTag = mainTupleTag; } @@ -46,7 +51,10 @@ class FlattenExecutor<InputT> implements Executor { @Override public void process(TupleTag tag, WindowedValue elem) { - executorsBolt.processExecutorElem(mainOutputTag, elem); + int copyNum = tagToCopyNum.get(tag); + for (int i = 0; i < copyNum; i++) { + executorsBolt.processExecutorElem(mainOutputTag, elem); + } } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1178f9fb/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java index e104ad8..b96bc56 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -48,9 +48,19 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti // Since a new tag is created in PCollectionList, retrieve the real tag here. Map<TupleTag<?>, PValue> inputs = Maps.newHashMap(); + Map<TupleTag<?>, Integer> tagToCopyNum = Maps.newHashMap(); for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) { PCollection<V> pc = (PCollection<V>) entry.getValue(); - inputs.putAll(pc.expand()); + //inputs.putAll(pc.expand()); + for (Map.Entry<TupleTag<?>, PValue> entry1 : pc.expand().entrySet()) { + if (inputs.containsKey(entry1.getKey())) { + int copyNum = tagToCopyNum.get(entry1.getKey()); + tagToCopyNum.put(entry1.getKey(), ++copyNum); + } else { + inputs.put(entry1.getKey(), entry1.getValue()); + tagToCopyNum.put(entry1.getKey(), 1); + } + } } String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); @@ -67,7 +77,8 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); } else { - FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); + FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag(), + tagToCopyNum); context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); } }
