jstorm-runner: Fixup for review comments
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/90ed2ef3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/90ed2ef3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/90ed2ef3 Branch: refs/heads/jstorm-runner Commit: 90ed2ef344d19ca730429e9eb7c71779f995fc47 Parents: 6078cbc Author: basti.lj <[email protected]> Authored: Mon Aug 14 16:20:03 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:03:00 2017 +0800 ---------------------------------------------------------------------- .../runners/jstorm/JStormPipelineOptions.java | 12 +-- .../beam/runners/jstorm/JStormRunner.java | 21 ++-- .../beam/runners/jstorm/JStormRunnerResult.java | 21 ++-- .../beam/runners/jstorm/TestJStormRunner.java | 19 +++- .../serialization/JavaUtilsSerializer.java | 3 +- .../translation/BoundedSourceTranslator.java | 4 +- .../jstorm/translation/DoFnExecutor.java | 27 +++-- .../runners/jstorm/translation/Executor.java | 6 ++ .../jstorm/translation/ExecutorsBolt.java | 8 +- .../jstorm/translation/FlattenExecutor.java | 1 - .../jstorm/translation/FlattenTranslator.java | 23 ++-- .../translation/GroupByKeyTranslator.java | 12 --- .../translation/GroupByWindowExecutor.java | 12 --- .../translation/JStormStateInternals.java | 29 +++-- .../jstorm/translation/MetricsReporter.java | 2 - .../translation/MultiOutputDoFnExecutor.java | 22 +--- .../translation/MultiStatefulDoFnExecutor.java | 5 +- .../translation/ParDoBoundMultiTranslator.java | 14 +-- .../translation/ParDoBoundTranslator.java | 108 ------------------- .../translation/StatefulDoFnExecutor.java | 1 - .../jstorm/translation/TimerService.java | 2 +- .../jstorm/translation/TimerServiceImpl.java | 8 +- .../jstorm/translation/TransformTranslator.java | 16 ++- .../jstorm/translation/TranslationContext.java | 18 +++- .../jstorm/translation/TranslatorRegistry.java | 1 - .../translation/UnboundedSourceSpout.java | 8 +- .../jstorm/translation/ViewTranslator.java | 4 +- .../translation/WindowAssignExecutor.java | 2 - .../translation/JStormStateInternalsTest.java | 2 +- 29 files changed, 141 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java index 114877a..e494757 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java @@ -36,8 +36,8 @@ public interface JStormPipelineOptions extends PipelineOptions { @Description("Executing time(sec) of topology on local mode. Default is 1min.") @Default.Long(60) - Long getLocalModeExecuteTime(); - void setLocalModeExecuteTime(Long time); + Long getLocalModeExecuteTimeSec(); + void setLocalModeExecuteTimeSec(Long time); @Description("Worker number of topology") @Default.Integer(1) @@ -46,8 +46,8 @@ public interface JStormPipelineOptions extends PipelineOptions { @Description("Global parallelism number of a component") @Default.Integer(1) - Integer getParallelismNumber(); - void setParallelismNumber(Integer number); + Integer getParallelism(); + void setParallelism(Integer number); @Description("System topology config of JStorm") @Default.InstanceFactory(DefaultMapValueFactory.class) @@ -61,8 +61,8 @@ public interface JStormPipelineOptions extends PipelineOptions { @Description("Parallelism number of a specified composite PTransform") @Default.InstanceFactory(DefaultMapValueFactory.class) - Map getParallelismNumMap(); - void setParallelismNumMap(Map parallelismNumMap); + Map getParallelismMap(); + void setParallelismMap(Map parallelismNumMap); /** * Default value factory for topology configuration of JStorm. http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 47de42c..21a8fae 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -79,15 +79,15 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { } public static JStormRunner fromOptions(PipelineOptions options) { - JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate( - JStormPipelineOptions.class, options); + JStormPipelineOptions pipelineOptions = + PipelineOptionsValidator.validate(JStormPipelineOptions.class, options); return new JStormRunner(pipelineOptions); } /** - * convert pipeline options to storm configuration format. + * Convert pipeline options to JStorm configuration format. * @param options - * @return + * @return JStorm configuration */ private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) { Config config = new Config(); @@ -103,6 +103,8 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { // Setup config for runtime env config.put("worker.external", "beam"); + // We use "com.alibaba.jstorm.transactional" API for "at least once" and "exactly once", + // so we don't need acker task for beam job any more, and set related number to 0. config.put("topology.acker.executors", 0); // Register serializers of Kryo @@ -271,7 +273,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { LocalCluster localCluster = LocalCluster.getInstance(); localCluster.submitTopology(topologyName, config, topology); return JStormRunnerResult.local( - topologyName, config, localCluster, options.getLocalModeExecuteTime()); + topologyName, config, localCluster, options.getLocalModeExecuteTimeSec()); } else { StormSubmitter.submitTopology(topologyName, config, topology); return null; @@ -298,11 +300,12 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { TopologyBuilder builder = isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder(); - int parallelismNumber = options.getParallelismNumber(); + int parallelismNumber = options.getParallelism(); Map<String, UnboundedSourceSpout> spouts = context.getSpouts(); - for (String id : spouts.keySet()) { - IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id)); - builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber)); + for (Map.Entry<String, UnboundedSourceSpout> entry : spouts.entrySet()) { + IRichSpout spout = getSpout(isExactlyOnce, entry.getValue()); + builder.setSpout( + entry.getKey(), spout, getParallelismNum(entry.getValue(), parallelismNumber)); } HashMap<String, BoltDeclarer> declarers = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 797c899..4b1850e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -38,7 +38,7 @@ public abstract class JStormRunnerResult implements PipelineResult { Config config, LocalCluster localCluster, long localModeExecuteTimeSecs) { - return new LocalStormPipelineResult( + return new LocalJStormPipelineResult( topologyName, config, localCluster, localModeExecuteTimeSecs); } @@ -62,12 +62,12 @@ public abstract class JStormRunnerResult implements PipelineResult { return topologyName; } - private static class LocalStormPipelineResult extends JStormRunnerResult { + private static class LocalJStormPipelineResult extends JStormRunnerResult { private LocalCluster localCluster; private long localModeExecuteTimeSecs; - LocalStormPipelineResult( + LocalJStormPipelineResult( String topologyName, Config config, LocalCluster localCluster, @@ -78,7 +78,6 @@ public abstract class JStormRunnerResult implements PipelineResult { @Override public State cancel() throws IOException { - //localCluster.deactivate(getTopologyName()); localCluster.killTopology(getTopologyName()); localCluster.shutdown(); JStormUtils.sleepMs(1000); @@ -87,12 +86,7 @@ public abstract class JStormRunnerResult implements PipelineResult { @Override public State waitUntilFinish(Duration duration) { - return waitUntilFinish(); - } - - @Override - public State waitUntilFinish() { - JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000); + JStormUtils.sleepMs(duration.getMillis()); try { return cancel(); } catch (IOException e) { @@ -101,8 +95,13 @@ public abstract class JStormRunnerResult implements PipelineResult { } @Override + public State waitUntilFinish() { + return waitUntilFinish(Duration.standardSeconds(localModeExecuteTimeSecs)); + } + + @Override public MetricResults metrics() { - return null; + throw new UnsupportedOperationException("This method is not yet supported."); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index 21a58e3..c9990e4 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -50,13 +50,21 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { return new TestJStormRunner(options.as(JStormPipelineOptions.class)); } + // waiting time when job with assertion + private static final int ASSERTION_WAITING_TIME_MS = 20 * 1000; + // waiting time when job without assertion + private static final int RESULT_WAITING_TIME_MS = 5 * 1000; + private static final int RESULT_CHECK_INTERVAL_MS = 500; + private final JStormRunner stormRunner; private final JStormPipelineOptions options; private TestJStormRunner(JStormPipelineOptions options) { this.options = options; Map conf = Maps.newHashMap(); - //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString()); + // Default state backend is RocksDB, for the users who could not run RocksDB on local testing + // env, following config is used to configure state backend to memory. + // conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString()); options.setTopologyConfig(conf); options.setLocalMode(true); stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options")); @@ -73,8 +81,9 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions); - int maxTimeoutSec = numberOfAssertions > 0 ? 20 : 5; - for (int waitTime = 0; waitTime <= maxTimeoutSec * 1000; ) { + int maxTimeoutMs = + numberOfAssertions > 0 ? ASSERTION_WAITING_TIME_MS : RESULT_WAITING_TIME_MS; + for (int waitTime = 0; waitTime <= maxTimeoutMs; ) { Optional<Boolean> success = numberOfAssertions > 0 ? checkForPAssertSuccess(numberOfAssertions) : Optional.<Boolean>absent(); Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); @@ -86,8 +95,8 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { LOG.info("Exception was found.", taskExceptionRec); throw new RuntimeException(taskExceptionRec.getCause()); } else { - JStormUtils.sleepMs(500); - waitTime += 500; + JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS); + waitTime += RESULT_CHECK_INTERVAL_MS; } } http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java index 5df686c..fa46fdb 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java @@ -45,7 +45,7 @@ import java.util.TreeSet; public class JavaUtilsSerializer { /** - * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}. + * Specific {@link Kryo} serializer for {@code java.util.Collections.SingletonList}. */ public static class CollectionsSingletonListSerializer extends Serializer<List<?>> { public CollectionsSingletonListSerializer() { @@ -222,7 +222,6 @@ public class JavaUtilsSerializer { * @see Collections#unmodifiableSortedMap(SortedMap) */ private static void registerUnmodifableCollectionSerializers(Config config) { - UnmodifiableCollection.values(); for (final UnmodifiableCollection item : UnmodifiableCollection.values()) { config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class); } http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java index 53555c9..77d0823 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java @@ -24,9 +24,7 @@ import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; /** - * Translates a {@link Read.Bounded} into a Storm spout. - * - * @param <T> + * Translates a {@link Read.Bounded} into a JStorm spout. */ class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index 2148f34..72c386a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -71,9 +71,15 @@ class DoFnExecutor<InputT, OutputT> implements Executor { /** * Implements {@link OutputManager} in a DoFn executor. */ - public class DoFnExecutorOutputManager implements OutputManager, Serializable { + protected static class DoFnExecutorOutputManager implements OutputManager, Serializable { private static final long serialVersionUID = -661113364735206170L; + private ExecutorsBolt executorsBolt; + + public DoFnExecutorOutputManager(ExecutorsBolt executorsBolt) { + this.executorsBolt = executorsBolt; + } + @Override public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { executorsBolt.processExecutorElem(tag, output); @@ -97,23 +103,23 @@ class DoFnExecutor<InputT, OutputT> implements Executor { protected DoFn<InputT, OutputT> doFn; protected final Coder<WindowedValue<InputT>> inputCoder; - protected DoFnInvoker<InputT, OutputT> doFnInvoker; - protected OutputManager outputManager; + protected transient DoFnInvoker<InputT, OutputT> doFnInvoker; + protected transient OutputManager outputManager; protected WindowingStrategy<?, ?> windowingStrategy; protected final TupleTag<InputT> mainInputTag; protected Collection<PCollectionView<?>> sideInputs; - protected SideInputHandler sideInputHandler; + protected transient SideInputHandler sideInputHandler; protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView; // Initialize during runtime - protected ExecutorContext executorContext; + protected transient ExecutorContext executorContext; protected ExecutorsBolt executorsBolt; - protected TimerInternals timerInternals; + protected transient TimerInternals timerInternals; protected transient StateInternals pushbackStateInternals; protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; protected transient StateTag<WatermarkHoldState> watermarkHoldTag; protected transient IKvStoreManager kvStoreManager; - protected DefaultStepContext stepContext; + protected transient DefaultStepContext stepContext; protected transient MetricClient metricClient; public DoFnExecutor( @@ -133,7 +139,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor { this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); this.doFn = doFn; this.inputCoder = inputCoder; - this.outputManager = new DoFnExecutorOutputManager(); this.windowingStrategy = windowingStrategy; this.mainInputTag = mainInputTag; this.sideInputs = sideInputs; @@ -174,6 +179,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor { this.executorsBolt = context.getExecutorsBolt(); this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); + this.outputManager = new DoFnExecutorOutputManager(executorsBolt); initService(context); @@ -199,8 +205,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor { @Override public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { - LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}", - tag, mainInputTag, sideInputs, elem.getValue())); if (mainInputTag.equals(tag)) { processMainInput(elem); } else if (sideInputTagToView.containsKey(tag)) { @@ -213,6 +217,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor { } protected <T> void processMainInput(WindowedValue<T> elem) { + LOG.debug(String.format("Main input: tag=%s, elem=%s", mainInputTag, elem)); if (sideInputs.isEmpty()) { runner.processElement((WindowedValue<InputT>) elem); } else { @@ -234,7 +239,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor { } protected void processSideInput(TupleTag tag, WindowedValue elem) { - LOG.debug(String.format("side inputs: %s, %s.", tag, elem)); + LOG.debug(String.format("Side inputs: tag=%s, elem=%s.", tag, elem)); PCollectionView<?> sideInputView = sideInputTagToView.get(tag); sideInputHandler.addSideInputValue(sideInputView, elem); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java index 8812988..fd7af7d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java @@ -30,7 +30,13 @@ public interface Executor extends Serializable { */ void init(ExecutorContext context); + /** + * Process element form "tag" stream. + */ <T> void process(TupleTag<T> tag, WindowedValue<T> elem); + /** + * Cleanup when task is shutdown. + */ void cleanup(); } http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index f8e09be..449ecb5 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -58,9 +58,9 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); - protected ExecutorContext executorContext; + protected transient ExecutorContext executorContext; - protected TimerService timerService; + protected transient TimerService timerService; // map from input tag to executor inside bolt protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); @@ -73,7 +73,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { protected int internalDoFnExecutorId = 1; protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap(); - protected OutputCollector collector; + protected transient OutputCollector collector; protected boolean isStatefulBolt = false; @@ -265,8 +265,8 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { } public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { - LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag); if (elem != null) { + LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag); Executor executor = inputTagToExecutor.get(inputTag); if (executor != null) { executor.process(inputTag, elem); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java index 928fa24..9d4184c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.values.TupleTag; /** * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}. - * @param <InputT> */ class FlattenExecutor<InputT> implements Executor { http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java index ebe8bc3..62621d0 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; @@ -46,20 +45,18 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) { TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - // Since a new tag is created in PCollectionList, retrieve the real tag here. + // Flatten supports to consume multi-copy from a same PCollection, so we need to record + // the copy number here. Map<TupleTag<?>, PValue> inputs = Maps.newHashMap(); Map<TupleTag<?>, Integer> tagToCopyNum = Maps.newHashMap(); - for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) { - PCollection<V> pc = (PCollection<V>) entry.getValue(); - //inputs.putAll(pc.expand()); - for (Map.Entry<TupleTag<?>, PValue> entry1 : pc.expand().entrySet()) { - if (inputs.containsKey(entry1.getKey())) { - int copyNum = tagToCopyNum.get(entry1.getKey()); - tagToCopyNum.put(entry1.getKey(), ++copyNum); - } else { - inputs.put(entry1.getKey(), entry1.getValue()); - tagToCopyNum.put(entry1.getKey(), 1); - } + for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getTransformInputs().entrySet()) { + TupleTag tag = userGraphContext.findTupleTag(entry.getValue()); + if (inputs.containsKey(tag)) { + int copyNum = tagToCopyNum.get(tag); + tagToCopyNum.put(tag, ++copyNum); + } else { + inputs.put(tag, entry.getValue()); + tagToCopyNum.put(tag, 1); } } String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java index 85c958a..02f42bd 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java @@ -18,28 +18,21 @@ package org.apache.beam.runners.jstorm.translation; import com.google.common.collect.Lists; -import java.util.Collections; import java.util.List; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; /** * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}. - * @param <K> - * @param <V> */ class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> { // information of transform protected PCollection<KV<K, V>> input; - protected PCollection<KV<K, Iterable<V>>> output; - protected List<TupleTag<?>> inputTags; protected TupleTag<KV<K, Iterable<V>>> mainOutputTag; protected List<TupleTag<?>> sideOutputTags; - protected List<PCollectionView<?>> sideInputs; protected WindowingStrategy<?, ?> windowingStrategy; @Override @@ -49,13 +42,8 @@ class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey< describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); input = (PCollection<KV<K, V>>) userGraphContext.getInput(); - output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput(); - - inputTags = userGraphContext.getInputTags(); mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag(); sideOutputTags = Lists.newArrayList(); - - sideInputs = Collections.<PCollectionView<?>>emptyList(); windowingStrategy = input.getWindowingStrategy(); GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>( http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java index 1c858b7..cae1bc3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableList; -import java.io.Serializable; import java.util.Collections; import java.util.List; import org.apache.beam.runners.core.DoFnRunner; @@ -51,8 +50,6 @@ import org.slf4j.LoggerFactory; /** * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}. - * @param <K> - * @param <V> */ class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> { @@ -60,14 +57,6 @@ class GroupByWindowExecutor<K, V> private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class); - private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable { - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - executorsBolt.processExecutorElem(tag, output); - } - } - private KvCoder<K, V> inputKvCoder; private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; @@ -92,7 +81,6 @@ class GroupByWindowExecutor<K, V> mainTupleTag, sideOutputTags); - this.outputManager = new GroupByWindowOutputManager(); UserGraphContext userGraphContext = context.getUserGraphContext(); PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput(); this.inputKvCoder = (KvCoder<K, V>) input.getCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java index 292b771..e2139d8 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java @@ -180,12 +180,8 @@ class JStormStateInternals<K> implements StateInternals { kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn = - new BinaryCombineFn<Instant>() { - @Override - public Instant apply(Instant left, Instant right) { - return timestampCombiner.combine(left, right); - } - }; + new WatermarkCombineFn(timestampCombiner); + return new JStormWatermarkHoldState( id, spec, namespace, new JStormCombiningState<>( @@ -203,6 +199,19 @@ class JStormStateInternals<K> implements StateInternals { }); } + private static class WatermarkCombineFn extends BinaryCombineFn<Instant> { + private final TimestampCombiner timestampCombiner; + + public WatermarkCombineFn(TimestampCombiner timestampCombiner) { + this.timestampCombiner = timestampCombiner; + } + + @Override + public Instant apply(Instant left, Instant right) { + return timestampCombiner.combine(left, right); + } + }; + /** * JStorm implementation of {@link ValueState}. */ @@ -623,7 +632,7 @@ class JStormStateInternals<K> implements StateInternals { @Override public ReadableState<V> get(K var1) { - ReadableState<V> ret = new MapReadableState<>(null); + ReadableState<V> ret = null; try { ret = new MapReadableState(kvStore.get(var1)); } catch (IOException e) { @@ -634,7 +643,7 @@ class JStormStateInternals<K> implements StateInternals { @Override public ReadableState<Iterable<K>> keys() { - ReadableState<Iterable<K>> ret = new MapReadableState<>(null); + ReadableState<Iterable<K>> ret = null; try { ret = new MapReadableState<>(kvStore.keys()); } catch (IOException e) { @@ -645,7 +654,7 @@ class JStormStateInternals<K> implements StateInternals { @Override public ReadableState<Iterable<V>> values() { - ReadableState<Iterable<V>> ret = new MapReadableState<>(null); + ReadableState<Iterable<V>> ret = null; try { ret = new MapReadableState<>(kvStore.values()); } catch (IOException e) { @@ -656,7 +665,7 @@ class JStormStateInternals<K> implements StateInternals { @Override public ReadableState<Iterable<Map.Entry<K, V>>> entries() { - ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null); + ReadableState<Iterable<Map.Entry<K, V>>> ret = null; try { ret = new MapReadableState<>(kvStore.entries()); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java index 5b60b03..e7f3285 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java @@ -63,10 +63,8 @@ class MetricsReporter { } private void updateCounters(Iterable<MetricResult<Long>> counters) { - System.out.print("updateCounters"); for (MetricResult<Long> metricResult : counters) { String metricName = getMetricNameString(COUNTER_PREFIX, metricResult); - System.out.print("metricName: " + metricName); Long updateValue = metricResult.attempted(); Long oldValue = reportedCounters.get(metricName); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java index 138a5dc..f318a89 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java @@ -32,26 +32,10 @@ import org.slf4j.LoggerFactory; /** * JStorm {@link Executor} for {@link DoFn} with multi-output. - * @param <InputT> - * @param <OutputT> */ class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> { private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); - /** - * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated - * tag is used in downstream consumer. So before output, we need to map this "local" tag to - * "external" tag. See PCollectionTuple for details. - */ - public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - executorsBolt.processExecutorElem(tag, output); - } - } - - protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap; - public MultiOutputDoFnExecutor( String stepName, String description, @@ -63,13 +47,9 @@ class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, Outp Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags, - Map<TupleTag<?>, TupleTag<?>> localTupleTagMap + List<TupleTag<?>> sideOutputTags ) { super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); - this.localTupleTagMap = localTupleTagMap; - this.outputManager = new MultiOutputDoFnExecutorOutputManager(); - LOG.info("localTupleTagMap: {}", localTupleTagMap); } } http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java index a3ffc30..44c0765 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.WindowingStrategy; /** * JStorm {@link Executor} for stateful {@link DoFn} with multi-output. - * @param <OutputT> */ class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> { @@ -42,9 +41,9 @@ class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, Out Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) { + List<TupleTag<?>> sideOutputTags) { super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, - sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap); + sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java index 7daa1cb..986af43 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.beam.sdk.coders.Coder; @@ -33,7 +32,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.PValueBase; import org.apache.beam.sdk.values.TupleTag; /** @@ -50,12 +48,6 @@ class ParDoBoundMultiTranslator<InputT, OutputT> PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); - Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap(); - for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) { - Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); - localToExternalTupleTagMap.put(entry.getKey(), itr.next()); - } - TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags(); sideOutputTags.remove(mainOutputTag); @@ -90,8 +82,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT> transform.getSideInputs(), sideInputTagToView.build(), mainOutputTag, - sideOutputTags, - localToExternalTupleTagMap); + sideOutputTags); } else { executor = new MultiOutputDoFnExecutor<>( userGraphContext.getStepName(), @@ -105,8 +96,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT> transform.getSideInputs(), sideInputTagToView.build(), mainOutputTag, - sideOutputTags, - localToExternalTupleTagMap); + sideOutputTags); } context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java deleted file mode 100644 index e6d09c4..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.util.List; -import java.util.Map; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}. - */ -class ParDoBoundTranslator<InputT, OutputT> - extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> { - - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class); - - @Override - public void translateNode( - ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) { - final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - final TupleTag<?> inputTag = userGraphContext.getInputTag(); - PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput(); - - TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag(); - List<TupleTag<?>> sideOutputTags = Lists.newArrayList(); - - Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs()); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - String description = describeTransform( - transform, - allInputs, - userGraphContext.getOutputs()); - - ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder(); - for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); - } - - DoFnExecutor executor; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - executor = new StatefulDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - (DoFn<KV, OutputT>) transform.getFn(), - (Coder) WindowedValue.getFullCoder( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<KV>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags); - } else { - executor = new DoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - transform.getFn(), - WindowedValue.getFullCoder( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), - input.getWindowingStrategy(), - (TupleTag<InputT>) inputTag, - transform.getSideInputs(), - sideInputTagToView.build(), - mainOutputTag, - sideOutputTags); - } - - context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs())); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java index 911f259..70e2570 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.WindowingStrategy; /** * JStorm {@link Executor} for stateful {@link DoFn}. - * @param <OutputT> */ class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> { public StatefulDoFnExecutor( http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java index 24a9050..159fe70 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java @@ -30,7 +30,7 @@ interface TimerService extends Serializable { void init(List<Integer> upStreamTasks); /** - * + * Update watermark when receiving watermark from a upstream task. * @param task * @param inputWatermark * @return new watermark if any timer is triggered during the update of watermark, otherwise 0 http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java index 6b463db..027fc14 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java @@ -39,15 +39,15 @@ import org.joda.time.Instant; * Default implementation of {@link TimerService}. */ class TimerServiceImpl implements TimerService { - private transient ExecutorContext executorContext; - private transient Map<Integer, DoFnExecutor> idToDoFnExecutor; + private ExecutorContext executorContext; + private Map<Integer, DoFnExecutor> idToDoFnExecutor; private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = new ConcurrentHashMap<>(); private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>(); private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>(); private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>(); - private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = + private final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>(); private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>> timerDataToKeyedExecutors = Maps.newHashMap(); @@ -132,7 +132,7 @@ class TimerServiceImpl implements TimerService { if (currentHold == null) { namespaceToWatermarkHold.put(namespace, watermarkHold); watermarkHolds.add(watermarkHold); - } else if (currentHold != null && watermarkHold.isBefore(currentHold)) { + } else if (watermarkHold.isBefore(currentHold)) { namespaceToWatermarkHold.put(namespace, watermarkHold); watermarkHolds.add(watermarkHold); watermarkHolds.remove(currentHold); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java index 4d431d3..f0b8f74 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java @@ -38,8 +38,8 @@ interface TransformTranslator<T extends PTransform<?, ?>> { boolean canTranslate(T transform, TranslationContext context); /** - * Default translator. - * @param <T1> + * Default translator does NOT translate anything, but just generate + * the description of PTransform. */ class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { @Override @@ -61,7 +61,11 @@ interface TransformTranslator<T extends PTransform<?, ?>> { .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { @Override public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) { - return taggedPValue.getKey().getId(); + if (taggedPValue != null) { + return taggedPValue.getKey().getId(); + } else { + return null; + } } })), transform.getName(), @@ -69,7 +73,11 @@ interface TransformTranslator<T extends PTransform<?, ?>> { .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() { @Override public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) { - return taggedPvalue.getKey().getId(); + if (taggedPvalue != null) { + return taggedPvalue.getKey().getId(); + } else { + return null; + } } }))); } http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index 0991448..4407f15 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -198,8 +198,6 @@ public class TranslationContext { } bolt.addExecutor(tag, executor, userGraphContext.getStepName()); - // filter all connections inside bolt - //if (!bolt.getOutputTags().contains(tag)) { Stream.Grouping grouping; if (isGBK) { grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY)); @@ -207,7 +205,6 @@ public class TranslationContext { grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE); } addStormStreamDef(TaggedPValue.of(tag, value), name, grouping); - //} } for (PValue sideInput : sideInputs) { @@ -223,7 +220,7 @@ public class TranslationContext { // set parallelismNumber String pTransformfullName = userGraphContext.currentTransform.getFullName(); String compositeName = pTransformfullName.split("/")[0]; - Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap(); + Map parallelismNumMap = userGraphContext.getOptions().getParallelismMap(); if (parallelismNumMap.containsKey(compositeName)) { int configNum = (Integer) parallelismNumMap.get(compositeName); int currNum = bolt.getParallelismNum(); @@ -262,10 +259,21 @@ public class TranslationContext { return (T) currentTransform.getInputs().values().iterator().next(); } - public Map<TupleTag<?>, PValue> getInputs() { + public Map<TupleTag<?>, PValue> getTransformInputs() { return currentTransform.getInputs(); } + /** + * Get input PValues with the output tags of upstream node. + */ + public Map<TupleTag<?>, PValue> getInputs() { + Map<TupleTag<?>, PValue> ret = Maps.newHashMap(); + for (PValue pValue : currentTransform.getInputs().values()) { + ret.put(findTupleTag(pValue), pValue); + } + return ret; + } + public TupleTag<?> getInputTag() { return pValueToTupleTag.get(this.getInput()); } http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java index 9eaa13a..c8ea545 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java @@ -40,7 +40,6 @@ class TranslatorRegistry { static { TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator()); TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator()); - TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator()); TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator()); TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>()); TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator()); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java index 4ae28e6..627a834 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -61,7 +62,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou private KryoSerializer<WindowedValue> serializer; - private long lastWaterMark = 0L; + private long lastWaterMark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); public UnboundedSourceSpout( String name, @@ -113,7 +114,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou } @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + public synchronized void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; this.pipelineOptions = @@ -127,7 +128,8 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou } } - public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException { + public synchronized void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) + throws IOException { if (reader != null) { reader.close(); } http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java index 9ab5784..de3f568 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java @@ -256,9 +256,7 @@ class ViewTranslator /** * Specialized expansion for * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}. - * @param <InputT> - * @param <OutputT> - */ + */ public static class CombineGloballyAsSingletonView<InputT, OutputT> extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { Combine.GloballyAsSingletonView<InputT, OutputT> transform; http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java index 8d60392..832c95c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java @@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory; /** * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}. - * @param <T> - * @param <W> */ class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor { private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class); http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java index b2ca267..3acf662 100644 --- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java @@ -64,7 +64,7 @@ public class JStormStateInternalsTest { IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager( Maps.newHashMap(), "test", - tmp.toString(), + tmp.getRoot().toString(), new KryoSerializer(Maps.newHashMap())); jstormStateInternals = new JStormStateInternals( "key-1", kvStoreManager, new TimerServiceImpl(), 0);
