jstorm-runner: Fix checkstyle error
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5a15d548 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5a15d548 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5a15d548 Branch: refs/heads/jstorm-runner Commit: 5a15d5488f9438695948e72af08ada4c263471d7 Parents: 78a5076 Author: basti.lj <[email protected]> Authored: Fri Jul 14 14:14:49 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:57 2017 +0800 ---------------------------------------------------------------------- .../runners/jstorm/JStormPipelineOptions.java | 3 + .../beam/runners/jstorm/JStormRunner.java | 16 +- .../beam/runners/jstorm/TestJStormRunner.java | 2 +- .../serialization/ImmutableListSerializer.java | 4 +- .../serialization/ImmutableMapSerializer.java | 3 + .../serialization/ImmutableSetSerializer.java | 3 + .../KvStoreIterableSerializer.java | 3 + .../SdkRepackImmuListSerializer.java | 3 + .../SdkRepackImmuSetSerializer.java | 3 + .../UnmodifiableCollectionsSerializer.java | 5 +- .../translation/JStormPipelineTranslator.java | 186 +++++++++++++++++++ .../translation/StormPipelineTranslator.java | 186 ------------------- .../jstorm/translation/TranslationContext.java | 9 +- .../translation/runtime/AbstractComponent.java | 4 +- .../translation/runtime/AdaptorBasicBolt.java | 5 +- .../translation/runtime/AdaptorBasicSpout.java | 5 +- .../translation/runtime/DoFnExecutor.java | 16 +- .../jstorm/translation/runtime/Executor.java | 7 +- .../translation/runtime/ExecutorContext.java | 3 + .../translation/runtime/ExecutorsBolt.java | 15 +- .../translation/runtime/FlattenExecutor.java | 6 +- .../runtime/GroupByWindowExecutor.java | 5 + .../runtime/MultiOutputDoFnExecutor.java | 7 +- .../runtime/MultiStatefulDoFnExecutor.java | 4 + .../runtime/StatefulDoFnExecutor.java | 4 + .../translation/runtime/TimerServiceImpl.java | 8 +- .../translation/runtime/TxExecutorsBolt.java | 5 +- .../runtime/TxUnboundedSourceSpout.java | 5 +- .../runtime/UnboundedSourceSpout.java | 5 +- .../runtime/WindowAssignExecutor.java | 7 +- .../runtime/state/JStormBagState.java | 5 +- .../runtime/state/JStormMapState.java | 7 +- .../translator/FlattenTranslator.java | 6 +- .../translator/GroupByKeyTranslator.java | 5 + .../translator/ParDoBoundMultiTranslator.java | 2 +- .../translator/ParDoBoundTranslator.java | 4 +- .../jstorm/translation/translator/Stream.java | 11 +- .../translator/TransformTranslator.java | 4 + .../translation/translator/ViewTranslator.java | 18 +- .../translator/WindowAssignTranslator.java | 7 +- .../jstorm/translation/util/CommonInstance.java | 5 +- .../beam/runners/jstorm/util/RunnerUtils.java | 12 +- .../jstorm/util/SerializedPipelineOptions.java | 2 +- .../jstorm/util/SingletonKeyedWorkItem.java | 3 +- .../runtime/state/JStormStateInternalsTest.java | 14 +- 45 files changed, 384 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java index 2a87756..114877a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java @@ -64,6 +64,9 @@ public interface JStormPipelineOptions extends PipelineOptions { Map getParallelismNumMap(); void setParallelismNumMap(Map parallelismNumMap); + /** + * Default value factory for topology configuration of JStorm. + */ class DefaultMapValueFactory implements DefaultValueFactory<Map> { @Override public Map create(PipelineOptions pipelineOptions) { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 5375d6e..00ec7f6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -78,17 +78,17 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { } /** - * convert pipeline options to storm configuration format - * + * convert pipeline options to storm configuration format. * @param options * @return */ private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) { Config config = new Config(); - if (options.getLocalMode()) + if (options.getLocalMode()) { config.put(Config.STORM_CLUSTER_MODE, "local"); - else + } else { config.put(Config.STORM_CLUSTER_MODE, "distributed"); + } Config.setNumWorkers(config, options.getWorkerNumber()); @@ -161,8 +161,9 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { component = spout; } else { AdaptorBasicBolt bolt = context.getBolt(id); - if (bolt != null) + if (bolt != null) { component = bolt; + } } return component; @@ -202,10 +203,11 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> { // add stream output declare for "from" component AbstractComponent component = getComponent(srcBoltId, context); - if (grouping.getType().equals(Stream.Grouping.Type.FIELDS)) + if (grouping.getType().equals(Stream.Grouping.Type.FIELDS)) { component.addKVOutputField(streamId); - else + } else { component.addOutputField(streamId); + } // "to" component declares grouping to "from" component switch (grouping.getType()) { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index e27efc0..b1b0379 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -2,7 +2,6 @@ package org.apache.beam.runners.jstorm; import static com.google.common.base.Preconditions.checkNotNull; -import avro.shaded.com.google.common.collect.Maps; import com.alibaba.jstorm.common.metric.AsmMetric; import com.alibaba.jstorm.metric.AsmMetricRegistry; import com.alibaba.jstorm.metric.AsmWindow; @@ -11,6 +10,7 @@ import com.alibaba.jstorm.metric.MetaType; import com.alibaba.jstorm.metric.MetricType; import com.alibaba.jstorm.utils.JStormUtils; import com.google.common.base.Optional; +import com.google.common.collect.Maps; import java.io.IOException; import java.util.Iterator; import java.util.Map; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java index c479f26..268774c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java @@ -10,8 +10,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableTable; import com.google.common.collect.Lists; import com.google.common.collect.Table; -import org.apache.beam.runners.jstorm.util.RunnerUtils; +/** + * Specific serializer of {@link Kryo} for ImmutableList. + */ public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> { private static final boolean DOES_NOT_ACCEPT_NULL = false; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java index 77eede3..6b998fc 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java @@ -11,6 +11,9 @@ import java.util.EnumMap; import java.util.HashMap; import java.util.Map; +/** + * Specific serializer of {@link Kryo} for ImmutableMap. + */ public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> { private static final boolean DOES_NOT_ACCEPT_NULL = true; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java index 3a43b2b..edc7b09 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java @@ -8,6 +8,9 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +/** + * Specific serializer of {@link Kryo} for ImmutableSet. + */ public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> { private static final boolean DOES_NOT_ACCEPT_NULL = false; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java index b47f3b7..3835816 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java @@ -9,6 +9,9 @@ import com.google.common.collect.Lists; import java.util.Iterator; import java.util.List; +/** + * Specific serializer of {@link Kryo} for KvStoreIterable. + */ public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> { public KvStoreIterableSerializer() { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java index dd4272c..f1ed644 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java @@ -12,6 +12,9 @@ import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable; import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists; import org.apache.beam.sdk.repackaged.com.google.common.collect.Table; +/** + * Specific serializer of {@link Kryo} for Beam SDK repackaged ImmutableList. + */ public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> { private static final boolean DOES_NOT_ACCEPT_NULL = false; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java index 6973c82..d1ed046 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java @@ -8,6 +8,9 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet; import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets; +/** + * Specific serializer of {@link Kryo} for Beam SDK repackaged ImmutableSet. + */ public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> { private static final boolean DOES_NOT_ACCEPT_NULL = false; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java index bcee778..33343fc 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java @@ -21,6 +21,9 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +/** + * Specific serializer of {@link Kryo} for Unmodifiable Collection. + */ public class UnmodifiableCollectionsSerializer extends Serializer<Object> { private static final Field SOURCE_COLLECTION_FIELD; @@ -83,7 +86,7 @@ public class UnmodifiableCollectionsSerializer extends Serializer<Object> { } } - private static enum UnmodifiableCollection { + private enum UnmodifiableCollection { COLLECTION( Collections.unmodifiableCollection(Arrays.asList("")).getClass(), SOURCE_COLLECTION_FIELD) { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java new file mode 100644 index 0000000..1449a43 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.util.List; +import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; +import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Pipleline translator of JStorm. + */ +public class JStormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { + private static final Logger LOG = LoggerFactory.getLogger(JStormPipelineTranslator.class); + private TranslationContext context; + private int depth = 0; + + public JStormPipelineTranslator(TranslationContext context) { + this.context = context; + } + + public void translate(Pipeline pipeline) { + List<PTransformOverride> transformOverrides = + ImmutableList.<PTransformOverride>builder() + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class))) + .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class))) + .add(PTransformOverride.of( + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new ReflectiveOneToOneOverrideFactory( + (ViewTranslator.CombineGloballyAsSingletonView.class)))) + .build(); + pipeline.replaceAll(transformOverrides); + pipeline.traverseTopologically(this); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node); + this.depth++; + + // check if current composite transforms need to be translated. + // If not, all sub transforms will be translated in visitPrimitiveTransform. + PTransform<?, ?> transform = node.getTransform(); + if (transform != null) { + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); + + if (translator != null && applyCanTranslate(transform, node, translator)) { + applyStreamingTransform(transform, node, translator); + LOG.info(genSpaces(this.depth) + "translated-" + node); + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + public void leaveCompositeTransform(TransformHierarchy.Node node) { + this.depth--; + LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node); + } + + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node); + + if (!node.isRootNode()) { + PTransform<?, ?> transform = node.getTransform(); + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); + if (translator == null || !applyCanTranslate(transform, node, translator)) { + LOG.info(node.getTransform().getClass().toString()); + throw new UnsupportedOperationException( + "The transform " + transform + " is currently not supported."); + } + applyStreamingTransform(transform, node, translator); + } + } + + public void visitValue(PValue value, TransformHierarchy.Node node) { + LOG.info(genSpaces(this.depth) + "visiting value {}", value); + } + + private <T extends PTransform<?, ?>> void applyStreamingTransform( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + TransformTranslator<?> translator) { + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + @SuppressWarnings("unchecked") + TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; + + context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); + typedTranslator.translateNode(typedTransform, context); + + // Maintain PValue to TupleTag map for side inputs translation. + context.getUserGraphContext().recordOutputTaggedPValue(); + } + + private <T extends PTransform<?, ?>> boolean applyCanTranslate( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + TransformTranslator<?> translator) { + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + @SuppressWarnings("unchecked") + TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; + + context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); + + return typedTranslator.canTranslate(typedTransform, context); + } + + /** + * Utility formatting method. + * + * @param n number of spaces to generate + * @return String with "|" followed by n spaces + */ + protected static String genSpaces(int n) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < n; i++) { + builder.append("| "); + } + return builder.toString(); + } + + private static class ReflectiveOneToOneOverrideFactory< + InputT extends PValue, + OutputT extends PValue, + TransformT extends PTransform<InputT, OutputT>> + extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> { + private final Class<PTransform<InputT, OutputT>> replacement; + + private ReflectiveOneToOneOverrideFactory( + Class<PTransform<InputT, OutputT>> replacement) { + this.replacement = replacement; + } + + @Override + public PTransformReplacement<InputT, OutputT> getReplacementTransform( + AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) { + PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform(); + PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement) + .withArg( + (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), + originalPTransform) + .build(); + InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values()); + return PTransformReplacement.of(inputT, replacedPTransform); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java deleted file mode 100644 index 6d6f1c6..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import java.util.List; -import org.apache.beam.runners.core.construction.PTransformMatchers; -import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; -import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; -import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.values.PValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Pipleline translator of Storm - */ -public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { - private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class); - private TranslationContext context; - private int depth = 0; - - public StormPipelineTranslator(TranslationContext context) { - this.context = context; - } - - public void translate(Pipeline pipeline) { - List<PTransformOverride> transformOverrides = - ImmutableList.<PTransformOverride>builder() - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class))) - .add(PTransformOverride.of( - PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), - new ReflectiveOneToOneOverrideFactory( - (ViewTranslator.CombineGloballyAsSingletonView.class)))) - .build(); - pipeline.replaceAll(transformOverrides); - pipeline.traverseTopologically(this); - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node); - this.depth++; - - // check if current composite transforms need to be translated. - // If not, all sub transforms will be translated in visitPrimitiveTransform. - PTransform<?, ?> transform = node.getTransform(); - if (transform != null) { - TransformTranslator translator = TranslatorRegistry.getTranslator(transform); - - if (translator != null && applyCanTranslate(transform, node, translator)) { - applyStreamingTransform(transform, node, translator); - LOG.info(genSpaces(this.depth) + "translated-" + node); - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; - } - } - return CompositeBehavior.ENTER_TRANSFORM; - } - - public void leaveCompositeTransform(TransformHierarchy.Node node) { - this.depth--; - LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node); - } - - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node); - - if (!node.isRootNode()) { - PTransform<?, ?> transform = node.getTransform(); - TransformTranslator translator = TranslatorRegistry.getTranslator(transform); - if (translator == null || !applyCanTranslate(transform, node, translator)) { - LOG.info(node.getTransform().getClass().toString()); - throw new UnsupportedOperationException( - "The transform " + transform + " is currently not supported."); - } - applyStreamingTransform(transform, node, translator); - } - } - - public void visitValue(PValue value, TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "visiting value {}", value); - } - - private <T extends PTransform<?, ?>> void applyStreamingTransform( - PTransform<?, ?> transform, - TransformHierarchy.Node node, - TransformTranslator<?> translator) { - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - @SuppressWarnings("unchecked") - TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; - - context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); - typedTranslator.translateNode(typedTransform, context); - - // Maintain PValue to TupleTag map for side inputs translation. - context.getUserGraphContext().recordOutputTaggedPValue(); - } - - private <T extends PTransform<?, ?>> boolean applyCanTranslate( - PTransform<?, ?> transform, - TransformHierarchy.Node node, - TransformTranslator<?> translator) { - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - @SuppressWarnings("unchecked") - TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; - - context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); - - return typedTranslator.canTranslate(typedTransform, context); - } - - /** - * Utility formatting method. - * - * @param n number of spaces to generate - * @return String with "|" followed by n spaces - */ - protected static String genSpaces(int n) { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < n; i++) { - builder.append("| "); - } - return builder.toString(); - } - - private static class ReflectiveOneToOneOverrideFactory< - InputT extends PValue, - OutputT extends PValue, - TransformT extends PTransform<InputT, OutputT>> - extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> { - private final Class<PTransform<InputT, OutputT>> replacement; - - private ReflectiveOneToOneOverrideFactory( - Class<PTransform<InputT, OutputT>> replacement) { - this.replacement = replacement; - } - - @Override - public PTransformReplacement<InputT, OutputT> getReplacementTransform( - AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) { - PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform(); - PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement) - .withArg( - (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), - originalPTransform) - .build(); - InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values()); - return PTransformReplacement.of(inputT, replacedPTransform); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index 526352a..1230a31 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -20,11 +20,11 @@ package org.apache.beam.runners.jstorm.translation; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import avro.shaded.com.google.common.collect.Lists; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.util.ArrayList; @@ -239,7 +239,9 @@ public class TranslationContext { } } - // TODO: add getSideInputs() and getSideOutputs(). + /** + * Context of user graph. + */ public static class UserGraphContext { private final JStormPipelineOptions options; private final Map<PValue, TupleTag> pValueToTupleTag; @@ -326,6 +328,9 @@ public class TranslationContext { } } + /** + * Context of execution graph. + */ public static class ExecutionGraphContext { private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java index 68e9e17..3d7fab8 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.jstorm.translation.util.CommonInstance; -/* +/** * Enable user to add output stream definitions by API, rather than hard-code. */ public abstract class AbstractComponent implements IComponent { @@ -65,4 +65,4 @@ public abstract class AbstractComponent implements IComponent { public void setParallelismNum(int num) { parallelismNum = num; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java index 5e9b056..d8d4d46 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java @@ -19,6 +19,9 @@ package org.apache.beam.runners.jstorm.translation.runtime; import backtype.storm.topology.IRichBatchBolt; +/** + * Adaptor bolt of JStorm extends {@link AbstractComponent}. + */ public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt { -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java index 0480518..814d416 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java @@ -19,6 +19,9 @@ package org.apache.beam.runners.jstorm.translation.runtime; import backtype.storm.topology.IRichSpout; +/** + * Adaptor bolt of JStorm extends {@link AbstractComponent}. + */ public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout { -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java index 9507948..e07d890 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java @@ -20,9 +20,9 @@ package org.apache.beam.runners.jstorm.translation.runtime; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import avro.shaded.com.google.common.collect.Iterables; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.metric.MetricClient; +import com.google.common.collect.Iterables; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -62,11 +62,19 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * JStorm {@link Executor} for {@link DoFn}. + * @param <InputT> input type + * @param <OutputT> output type + */ public class DoFnExecutor<InputT, OutputT> implements Executor { private static final long serialVersionUID = 5297603063991078668L; private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class); + /** + * Implements {@link OutputManager} in a DoFn executor. + */ public class DoFnExecutorOutputManager implements OutputManager, Serializable { private static final long serialVersionUID = -661113364735206170L; @@ -174,7 +182,7 @@ public class DoFnExecutor<InputT, OutputT> implements Executor { initService(context); // Side inputs setup - if (sideInputs != null && sideInputs.isEmpty() == false) { + if (sideInputs != null && !sideInputs.isEmpty()) { pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); watermarkHoldTag = StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST); @@ -261,10 +269,10 @@ public class DoFnExecutor<InputT, OutputT> implements Executor { } /** - * Process all pushed back elements when receiving watermark with max timestamp + * Process all pushed back elements when receiving watermark with max timestamp. */ public void processAllPushBackElements() { - if (sideInputs != null && sideInputs.isEmpty() == false) { + if (sideInputs != null && !sideInputs.isEmpty()) { BagState<WindowedValue<InputT>> pushedBackElements = pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); if (pushedBackElements != null) { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java index 1a03cb8..0ec4fdd 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java @@ -21,13 +21,16 @@ import java.io.Serializable; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; +/** + * An executor is a basic executable unit in a JStorm task. + */ public interface Executor extends Serializable { /** - * Initialization during runtime + * Initialization during runtime. */ void init(ExecutorContext context); <T> void process(TupleTag<T> tag, WindowedValue<T> elem); void cleanup(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java index 1f65921..55ca171 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java @@ -21,6 +21,9 @@ import backtype.storm.task.TopologyContext; import com.alibaba.jstorm.cache.IKvStoreManager; import com.google.auto.value.AutoValue; +/** + * Context of a executors bolt when runtime. + */ @AutoValue public abstract class ExecutorContext { public static ExecutorContext of( http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java index e80fb48..d33c17a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java @@ -19,8 +19,6 @@ package org.apache.beam.runners.jstorm.translation.runtime; import static com.google.common.base.Preconditions.checkNotNull; -import avro.shaded.com.google.common.base.Joiner; -import avro.shaded.com.google.common.collect.Sets; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.ITupleExt; @@ -31,8 +29,10 @@ import com.alibaba.jstorm.cache.KvStoreManagerFactory; import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.utils.KryoSerializer; import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.collect.FluentIterable; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -49,6 +49,9 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG. + */ public class ExecutorsBolt extends AdaptorBasicBolt { private static final long serialVersionUID = -7751043327801735211L; @@ -129,10 +132,10 @@ public class ExecutorsBolt extends AdaptorBasicBolt { // init kv store manager String storeName = String.format("task-%d", context.getThisTaskId()); String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); - IKvStoreManager kvStoreManager = isStatefulBolt ? - KvStoreManagerFactory.getKvStoreManagerWithMonitor( - context, storeName, stateStorePath, isStatefulBolt) : - KvStoreManagerFactory.getKvStoreManager( + IKvStoreManager kvStoreManager = isStatefulBolt + ? KvStoreManagerFactory.getKvStoreManagerWithMonitor( + context, storeName, stateStorePath, isStatefulBolt) + : KvStoreManagerFactory.getKvStoreManager( stormConf, storeName, stateStorePath, isStatefulBolt); this.executorContext = ExecutorContext.of(context, this, kvStoreManager); http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java index 5a07243..caf1e47 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java @@ -22,6 +22,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; +/** + * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}. + * @param <InputT> + */ public class FlattenExecutor<InputT> implements Executor { private final String description; @@ -53,4 +57,4 @@ public class FlattenExecutor<InputT> implements Executor { public String toString() { return description; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java index 625726d..0dd1af9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java @@ -52,6 +52,11 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}. + * @param <K> + * @param <V> + */ public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> { private static final long serialVersionUID = -7563050475488610553L; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java index d36d9a6..a26472c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java @@ -30,6 +30,11 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * JStorm {@link Executor} for {@link DoFn} with multi-output. + * @param <InputT> + * @param <OutputT> + */ public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> { private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); @@ -71,4 +76,4 @@ public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<Input this.outputManager = new MultiOutputDoFnExecutorOutputManager(); LOG.info("localTupleTagMap: {}", localTupleTagMap); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java index 45ac62a..5e87cff 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java @@ -32,6 +32,10 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +/** + * JStorm {@link Executor} for stateful {@link DoFn} with multi-output. + * @param <OutputT> + */ public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> { public MultiStatefulDoFnExecutor( http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java index ba0c052..77ae844 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java @@ -32,6 +32,10 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +/** + * JStorm {@link Executor} for stateful {@link DoFn}. + * @param <OutputT> + */ public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> { public StatefulDoFnExecutor( String stepName, String description, JStormPipelineOptions pipelineOptions, http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java index d2514f1..0103095 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java @@ -20,9 +20,9 @@ package org.apache.beam.runners.jstorm.translation.runtime; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import avro.shaded.com.google.common.collect.Maps; -import avro.shaded.com.google.common.collect.Sets; import com.alibaba.jstorm.utils.Pair; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,7 +47,7 @@ public class TimerServiceImpl implements TimerService { private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>(); private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>(); private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>(); - private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = + private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>(); private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>> timerDataToKeyedExecutors = Maps.newHashMap(); @@ -152,4 +152,4 @@ public class TimerServiceImpl implements TimerService { keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); timerDataToKeyedExecutors.put(timerData, keyedExecutors); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java index 2bd5f7d..8dc51b5 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java @@ -29,6 +29,9 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Transactional executors bolt handles the checkpoint and restore of state and timer. + */ public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor { private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class); @@ -127,4 +130,4 @@ public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor { throw new RuntimeException(e.getMessage()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java index 16f7d99..48b410f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java @@ -29,6 +29,9 @@ import java.util.Map; import org.apache.beam.sdk.io.UnboundedSource; import org.slf4j.LoggerFactory; +/** + * Transactional unbounded source spout handles the checkpoint and restore of state and timer. + */ public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class); @@ -150,4 +153,4 @@ public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor { public void fail(Object msgId) { throw new UnsupportedOperationException(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java index 7f98c61..006cd47 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java @@ -40,8 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Spout implementation that wraps a Beam UnboundedSource - * <p> + * Spout implementation that wraps a Beam UnboundedSource. * TODO: add wrapper to support metrics in UnboundedSource. */ public class UnboundedSourceSpout extends AdaptorBasicSpout { @@ -61,7 +60,7 @@ public class UnboundedSourceSpout extends AdaptorBasicSpout { private KryoSerializer<WindowedValue> serializer; - private long lastWaterMark = 0l; + private long lastWaterMark = 0L; public UnboundedSourceSpout( String description, http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java index 7f21d26..3cd0aa9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java @@ -29,6 +29,11 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}. + * @param <T> + * @param <W> + */ public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor { private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class); @@ -104,4 +109,4 @@ public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executo public String toString() { return description; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java index 1466f35..df54383 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java @@ -33,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * JStorm implementation of {@link BagState}. + * Implementation of {@link BagState} in JStorm runner. */ class JStormBagState<K, T> implements BagState<T> { private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class); @@ -115,6 +115,9 @@ class JStormBagState<K, T> implements BagState<T> { return ComposedKey.of(key, namespace, elemIndex); } + /** + * Implementation of Bag state Iterable. + */ private class BagStateIterable implements KvStoreIterable<T> { private class BagStateIterator implements Iterator<T> { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java index f1c1ed0..ac3f91f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java @@ -26,6 +26,11 @@ import org.apache.beam.sdk.state.ReadableState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Implementation of {@link MapState} in JStorm runner. + * @param <K> + * @param <V> + */ public class JStormMapState<K, V> implements MapState<K, V> { private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class); @@ -150,4 +155,4 @@ public class JStormMapState<K, V> implements MapState<K, V> { return this; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java index bf8d472..44ce8d8 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java @@ -26,6 +26,10 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +/** + * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}. + * @param <V> + */ public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> { @Override @@ -44,4 +48,4 @@ public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PC FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java index 85f96ce..85cb85d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java @@ -29,6 +29,11 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +/** + * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}. + * @param <K> + * @param <V> + */ public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> { // information of transform protected PCollection<KV<K, V>> input; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java index 77e4381..6e3392c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.jstorm.translation.translator; -import avro.shaded.com.google.common.collect.Maps; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.util.Iterator; import java.util.List; import java.util.Map; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java index 7b998d9..ad8f85f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.jstorm.translation.translator; -import avro.shaded.com.google.common.collect.Lists; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import java.util.List; import java.util.Map; import org.apache.beam.runners.jstorm.translation.TranslationContext; @@ -40,7 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}. + * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}. */ public class ParDoBoundTranslator<InputT, OutputT> extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java index a15a8ba..71243b9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java @@ -39,6 +39,9 @@ public abstract class Stream { producer, consumer); } + /** + * JStorm producer. + */ @AutoValue public abstract static class Producer { public abstract String getComponentId(); @@ -53,6 +56,9 @@ public abstract class Stream { } } + /** + * JStorm consumer. + */ @AutoValue public abstract static class Consumer { public abstract String getComponentId(); @@ -65,6 +71,9 @@ public abstract class Stream { } } + /** + * JStorm grouping, which define how to transfer message between two nodes. + */ @AutoValue public abstract static class Grouping { public abstract Type getType(); @@ -86,7 +95,7 @@ public abstract class Stream { } /** - * Types of stream groupings Storm allows + * Types of stream groupings Storm allows. */ public enum Type { ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java index 487cac0..bfa94a0 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java @@ -38,6 +38,10 @@ public interface TransformTranslator<T extends PTransform<?, ?>> { */ boolean canTranslate(T transform, TranslationContext context); + /** + * Default translator. + * @param <T1> + */ class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { @Override public void translateNode(T1 transform, TranslationContext context) { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java index c55c8d6..f71ee9c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java @@ -54,8 +54,7 @@ public class ViewTranslator /** * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} - * for the Flink runner in streaming mode. + * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}. */ public static class ViewAsMap<K, V> extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { @@ -93,8 +92,7 @@ public class ViewTranslator /** * Specialized expansion for {@link - * View.AsMultimap View.AsMultimap} for the - * Flink runner in streaming mode. + * View.AsMultimap View.AsMultimap}. */ public static class ViewAsMultimap<K, V> extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { @@ -135,8 +133,7 @@ public class ViewTranslator /** * Specialized implementation for - * {@link View.AsList View.AsList} for the - * JStorm runner in streaming mode. + * {@link View.AsList View.AsList}. */ public static class ViewAsList<T> extends PTransform<PCollection<T>, PCollectionView<List<T>>> { @@ -258,6 +255,12 @@ public class ViewTranslator } } + /** + * Specialized expansion for + * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}. + * @param <InputT> + * @param <OutputT> + */ public static class CombineGloballyAsSingletonView<InputT, OutputT> extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { Combine.GloballyAsSingletonView<InputT, OutputT> transform; @@ -351,8 +354,7 @@ public class ViewTranslator /** * Creates a primitive {@link PCollectionView}. - * <p> - * <p>For internal use only by runner implementors. + * For internal use only by runner implementors. * * @param <ElemT> The type of the elements of the input PCollection * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java index 6de34dd..2ccb8d7 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java @@ -21,6 +21,11 @@ import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor; import org.apache.beam.sdk.transforms.windowing.Window; +/** + * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a + * JStorm {@link WindowAssignExecutor}. + * @param <T> + */ public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { @Override @@ -35,4 +40,4 @@ public class WindowAssignTranslator<T> extends TransformTranslator.Default<Windo userGraphContext.getOutputTag()); context.addTransformExecutor(executor); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java index 596d8b4..4b92a4c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java @@ -17,9 +17,12 @@ */ package org.apache.beam.runners.jstorm.translation.util; +/** + * Common definition of JStorm runner. + */ public class CommonInstance { public static final String KEY = "Key"; public static final String VALUE = "Value"; public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK"; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java index 9fd62e4..ad83c2b 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java @@ -25,10 +25,12 @@ import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +/** + * Utils for JStorm runner. + */ public class RunnerUtils { /** - * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>> - * + * Convert {@link WindowedValue} into {@link KeyedWorkItem}. * @param elem * @return */ @@ -43,11 +45,11 @@ public class RunnerUtils { public static boolean isGroupByKeyExecutor(Executor executor) { if (executor instanceof GroupByWindowExecutor) { return true; - } else if (executor instanceof StatefulDoFnExecutor || - executor instanceof MultiStatefulDoFnExecutor) { + } else if (executor instanceof StatefulDoFnExecutor + || executor instanceof MultiStatefulDoFnExecutor) { return true; } else { return false; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java index 182794f..479afdc 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java @@ -34,7 +34,7 @@ public class SerializedPipelineOptions implements Serializable { private final byte[] serializedOptions; /** - * Lazily initialized copy of deserialized options + * Lazily initialized copy of deserialized options. */ private transient PipelineOptions pipelineOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java index cce21b3..46a12b9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.util.WindowedValue; /** * Singleton keyed word item. - * * @param <K> * @param <ElemT> */ @@ -60,4 +59,4 @@ public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> public Iterable<WindowedValue<ElemT>> elementsIterable() { return Collections.singletonList(value); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java index 2a8160c..66f33a7 100644 --- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java @@ -21,10 +21,10 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import avro.shaded.com.google.common.collect.Maps; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory; import com.alibaba.jstorm.utils.KryoSerializer; +import com.google.common.collect.Maps; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.core.StateNamespaces; @@ -175,11 +175,11 @@ public class JStormStateInternalsTest { Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read(); Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator(); Map.Entry<Integer, Integer> entry = itr.next(); - assertEquals((long) entry.getKey(), 1l); - assertEquals((long) entry.getValue(), 12l); + assertEquals((long) entry.getKey(), 1L); + assertEquals((long) entry.getValue(), 12L); entry = itr.next(); - assertEquals((long) entry.getKey(), 2l); - assertEquals((long) entry.getValue(), 22l); + assertEquals((long) entry.getKey(), 2L); + assertEquals((long) entry.getValue(), 22L); assertEquals(false, itr.hasNext()); mapStateA.remove(1); @@ -191,8 +191,8 @@ public class JStormStateInternalsTest { entries = mapStateA.entries().read(); itr = entries.iterator(); entry = itr.next(); - assertEquals((long) entry.getKey(), 2l); - assertEquals((long) entry.getValue(), 22l); + assertEquals((long) entry.getKey(), 2L); + assertEquals((long) entry.getValue(), 22L); assertEquals(false, itr.hasNext()); }
