jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa654b3f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa654b3f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa654b3f Branch: refs/heads/jstorm-runner Commit: aa654b3f15a242221727d021cf4be676c49bd49b Parents: 6ff07fc Author: Pei He <[email protected]> Authored: Thu Jul 13 17:02:21 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 02:40:26 2017 +0800 ---------------------------------------------------------------------- .../jstorm/beam/StormPipelineOptions.java | 72 --- .../com/alibaba/jstorm/beam/StormRegistrar.java | 48 -- .../com/alibaba/jstorm/beam/StormRunner.java | 339 ---------- .../alibaba/jstorm/beam/TestJStormRunner.java | 122 ---- .../serialization/ImmutableListSerializer.java | 92 --- .../serialization/ImmutableMapSerializer.java | 62 -- .../serialization/ImmutableSetSerializer.java | 72 --- .../KvStoreIterableSerializer.java | 55 -- .../SdkRepackImmuListSerializer.java | 78 --- .../SdkRepackImmuSetSerializer.java | 72 --- .../UnmodifiableCollectionsSerializer.java | 159 ----- .../translation/StormPipelineTranslator.java | 181 ------ .../beam/translation/TranslationContext.java | 425 ------------- .../beam/translation/TranslatorRegistry.java | 76 --- .../translation/runtime/AbstractComponent.java | 71 --- .../translation/runtime/AdaptorBasicBolt.java | 24 - .../translation/runtime/AdaptorBasicSpout.java | 24 - .../beam/translation/runtime/DoFnExecutor.java | 330 ---------- .../runtime/DoFnRunnerWithMetrics.java | 90 --- .../beam/translation/runtime/Executor.java | 37 -- .../translation/runtime/ExecutorContext.java | 35 -- .../beam/translation/runtime/ExecutorsBolt.java | 332 ---------- .../translation/runtime/FlattenExecutor.java | 55 -- .../runtime/GroupByWindowExecutor.java | 160 ----- .../translation/runtime/MetricsReporter.java | 93 --- .../runtime/MultiOutputDoFnExecutor.java | 75 --- .../runtime/MultiStatefulDoFnExecutor.java | 68 -- .../runtime/StatefulDoFnExecutor.java | 67 -- .../beam/translation/runtime/TimerService.java | 52 -- .../translation/runtime/TimerServiceImpl.java | 150 ----- .../translation/runtime/TxExecutorsBolt.java | 131 ---- .../runtime/TxUnboundedSourceSpout.java | 153 ----- .../runtime/UnboundedSourceSpout.java | 198 ------ .../beam/translation/runtime/ViewExecutor.java | 55 -- .../runtime/WindowAssignExecutor.java | 108 ---- .../runtime/state/JStormBagState.java | 178 ------ .../runtime/state/JStormCombiningState.java | 88 --- .../runtime/state/JStormMapState.java | 155 ----- .../runtime/state/JStormStateInternals.java | 192 ------ .../runtime/state/JStormValueState.java | 84 --- .../runtime/state/JStormWatermarkHoldState.java | 83 --- .../runtime/timer/JStormTimerInternals.java | 99 --- .../translator/BoundedSourceTranslator.java | 50 -- .../translator/CombineGloballyTranslator.java | 24 - .../translator/CombinePerKeyTranslator.java | 24 - .../translator/FlattenTranslator.java | 49 -- .../translator/GroupByKeyTranslator.java | 69 -- .../translator/ParDoBoundMultiTranslator.java | 114 ---- .../translator/ParDoBoundTranslator.java | 106 ---- .../translator/ReshuffleTranslator.java | 24 - .../beam/translation/translator/Stream.java | 91 --- .../translator/TransformTranslator.java | 77 --- .../translator/UnboundedSourceTranslator.java | 46 -- .../translation/translator/ViewTranslator.java | 374 ----------- .../translator/WindowAssignTranslator.java | 38 -- .../translator/WindowBoundTranslator.java | 48 -- .../beam/translation/util/CommonInstance.java | 25 - .../util/DefaultSideInputReader.java | 46 -- .../translation/util/DefaultStepContext.java | 89 --- .../alibaba/jstorm/beam/util/RunnerUtils.java | 53 -- .../beam/util/SerializedPipelineOptions.java | 64 -- .../beam/util/SingletonKeyedWorkItem.java | 62 -- .../runners/jstorm/StormPipelineOptions.java | 72 +++ .../beam/runners/jstorm/StormRegistrar.java | 48 ++ .../apache/beam/runners/jstorm/StormRunner.java | 345 ++++++++++ .../beam/runners/jstorm/TestJStormRunner.java | 120 ++++ .../serialization/ImmutableListSerializer.java | 92 +++ .../serialization/ImmutableMapSerializer.java | 61 ++ .../serialization/ImmutableSetSerializer.java | 71 +++ .../KvStoreIterableSerializer.java | 55 ++ .../SdkRepackImmuListSerializer.java | 78 +++ .../SdkRepackImmuSetSerializer.java | 71 +++ .../UnmodifiableCollectionsSerializer.java | 159 +++++ .../translation/StormPipelineTranslator.java | 177 ++++++ .../jstorm/translation/TranslationContext.java | 424 +++++++++++++ .../jstorm/translation/TranslatorRegistry.java | 85 +++ .../translation/runtime/AbstractComponent.java | 70 +++ .../translation/runtime/AdaptorBasicBolt.java | 24 + .../translation/runtime/AdaptorBasicSpout.java | 24 + .../translation/runtime/DoFnExecutor.java | 328 ++++++++++ .../runtime/DoFnRunnerWithMetrics.java | 90 +++ .../jstorm/translation/runtime/Executor.java | 34 + .../translation/runtime/ExecutorContext.java | 35 ++ .../translation/runtime/ExecutorsBolt.java | 327 ++++++++++ .../translation/runtime/FlattenExecutor.java | 55 ++ .../runtime/GroupByWindowExecutor.java | 157 +++++ .../translation/runtime/MetricsReporter.java | 87 +++ .../runtime/MultiOutputDoFnExecutor.java | 75 +++ .../runtime/MultiStatefulDoFnExecutor.java | 68 ++ .../runtime/StatefulDoFnExecutor.java | 67 ++ .../translation/runtime/TimerService.java | 52 ++ .../translation/runtime/TimerServiceImpl.java | 150 +++++ .../translation/runtime/TxExecutorsBolt.java | 131 ++++ .../runtime/TxUnboundedSourceSpout.java | 153 +++++ .../runtime/UnboundedSourceSpout.java | 193 ++++++ .../translation/runtime/ViewExecutor.java | 55 ++ .../runtime/WindowAssignExecutor.java | 107 ++++ .../runtime/state/JStormBagState.java | 178 ++++++ .../runtime/state/JStormCombiningState.java | 88 +++ .../runtime/state/JStormMapState.java | 154 +++++ .../runtime/state/JStormStateInternals.java | 191 ++++++ .../runtime/state/JStormValueState.java | 84 +++ .../runtime/state/JStormWatermarkHoldState.java | 83 +++ .../runtime/timer/JStormTimerInternals.java | 99 +++ .../translator/BoundedSourceTranslator.java | 50 ++ .../translator/CombineGloballyTranslator.java | 24 + .../translator/CombinePerKeyTranslator.java | 24 + .../translator/FlattenTranslator.java | 49 ++ .../translator/GroupByKeyTranslator.java | 69 ++ .../translator/ParDoBoundMultiTranslator.java | 111 ++++ .../translator/ParDoBoundTranslator.java | 106 ++++ .../translator/ReshuffleTranslator.java | 24 + .../jstorm/translation/translator/Stream.java | 91 +++ .../translator/TransformTranslator.java | 76 +++ .../translator/UnboundedSourceTranslator.java | 46 ++ .../translation/translator/ViewTranslator.java | 374 +++++++++++ .../translator/WindowAssignTranslator.java | 38 ++ .../translator/WindowBoundTranslator.java | 47 ++ .../jstorm/translation/util/CommonInstance.java | 25 + .../util/DefaultSideInputReader.java | 46 ++ .../translation/util/DefaultStepContext.java | 89 +++ .../beam/runners/jstorm/util/RunnerUtils.java | 53 ++ .../jstorm/util/SerializedPipelineOptions.java | 64 ++ .../jstorm/util/SingletonKeyedWorkItem.java | 62 ++ .../runtime/state/JStormStateInternalsTest.java | 219 ------- .../translator/CoGroupByKeyTest.java | 302 --------- .../translation/translator/GroupByKeyTest.java | 159 ----- .../beam/translation/translator/ParDoTest.java | 626 ------------------ .../runtime/state/JStormStateInternalsTest.java | 219 +++++++ .../translator/CoGroupByKeyTest.java | 302 +++++++++ .../translation/translator/GroupByKeyTest.java | 158 +++++ .../translation/translator/ParDoTest.java | 627 +++++++++++++++++++ 132 files changed, 7791 insertions(+), 7819 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormPipelineOptions.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormPipelineOptions.java deleted file mode 100644 index d1538ce..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormPipelineOptions.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam; - -import avro.shaded.com.google.common.collect.Maps; -import org.apache.beam.sdk.options.*; - -import java.util.HashMap; -import java.util.Map; - -/** - * Options which can be used to configure a JStorm PipelineRunner. - */ -public interface StormPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { - - @Description("Indicate if the topology is running on local machine or distributed cluster") - @Default.Boolean(false) - Boolean getLocalMode(); - void setLocalMode(Boolean isLocal); - - @Description("Executing time(sec) of topology on local mode. Default is 1min.") - @Default.Long(60) - Long getLocalModeExecuteTime(); - void setLocalModeExecuteTime(Long time); - - @Description("Worker number of topology") - @Default.Integer(1) - Integer getWorkerNumber(); - void setWorkerNumber(Integer number); - - @Description("Global parallelism number of a component") - @Default.Integer(1) - Integer getParallelismNumber(); - void setParallelismNumber(Integer number); - - @Description("System topology config of JStorm") - @Default.InstanceFactory(DefaultMapValueFactory.class) - Map getTopologyConfig(); - void setTopologyConfig(Map conf); - - @Description("Indicate if it is an exactly once topology") - @Default.Boolean(false) - Boolean getExactlyOnceTopology(); - void setExactlyOnceTopology(Boolean isExactlyOnce); - - @Description("Parallelism number of a specified composite PTransform") - @Default.InstanceFactory(DefaultMapValueFactory.class) - Map getParallelismNumMap(); - void setParallelismNumMap(Map parallelismNumMap); - - class DefaultMapValueFactory implements DefaultValueFactory<Map> { - @Override - public Map create(PipelineOptions pipelineOptions) { - return Maps.newHashMap(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormRegistrar.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormRegistrar.java deleted file mode 100644 index 4ef4d01..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormRegistrar.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - -import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; -import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; - -public class StormRegistrar { - private StormRegistrar() { - } - - @AutoService(PipelineRunnerRegistrar.class) - public static class Runner implements PipelineRunnerRegistrar { - @Override - public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>> of(StormRunner.class); - } - } - - @AutoService(PipelineOptionsRegistrar.class) - public static class Options implements PipelineOptionsRegistrar { - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>> of(StormPipelineOptions.class); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormRunner.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormRunner.java deleted file mode 100644 index e5db461..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/StormRunner.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam; - -import static com.google.common.base.Preconditions.checkNotNull; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; - -import com.alibaba.jstorm.beam.serialization.*; -import com.alibaba.jstorm.beam.translation.StormPipelineTranslator; -import com.alibaba.jstorm.beam.translation.TranslationContext; -import com.alibaba.jstorm.beam.translation.runtime.AbstractComponent; -import com.alibaba.jstorm.beam.translation.runtime.AdaptorBasicBolt; -import com.alibaba.jstorm.beam.translation.runtime.AdaptorBasicSpout; -import com.alibaba.jstorm.beam.translation.runtime.ExecutorsBolt; -import com.alibaba.jstorm.beam.translation.runtime.TxExecutorsBolt; -import com.alibaba.jstorm.beam.translation.runtime.TxUnboundedSourceSpout; -import com.alibaba.jstorm.beam.translation.runtime.UnboundedSourceSpout; -import com.alibaba.jstorm.beam.translation.translator.Stream; -import com.alibaba.jstorm.beam.translation.util.CommonInstance; -import com.alibaba.jstorm.cache.KvStoreIterable; -import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.transactional.TransactionTopologyBuilder; -import com.alibaba.jstorm.utils.JStormUtils; - -import java.io.IOException; -import java.util.HashMap; - -import java.util.Map; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.joda.time.Duration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Main entry point into the Storm Runner. - * - * After reading the user defined pipeline, Beam will invoke the run() method with a representation of the pipeline. - */ -public class StormRunner extends PipelineRunner<StormRunner.StormPipelineResult> { - private static final Logger LOG = LoggerFactory.getLogger(StormRunner.class); - - private StormPipelineOptions options; - - public StormRunner(StormPipelineOptions options) { - this.options = options; - } - - public static StormRunner fromOptions(PipelineOptions options) { - StormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(StormPipelineOptions.class, options); - return new StormRunner(pipelineOptions); - } - - /** - * convert pipeline options to storm configuration format - * @param options - * @return - */ - private Config convertPipelineOptionsToConfig(StormPipelineOptions options) { - Config config = new Config(); - if (options.getLocalMode()) - config.put(Config.STORM_CLUSTER_MODE, "local"); - else - config.put(Config.STORM_CLUSTER_MODE, "distributed"); - - Config.setNumWorkers(config, options.getWorkerNumber()); - - config.putAll(options.getTopologyConfig()); - - // Setup config for runtime env - config.put("worker.external", "beam"); - config.put("topology.acker.executors", 0); - - UnmodifiableCollectionsSerializer.registerSerializers(config); - // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap - ImmutableListSerializer.registerSerializers(config); - SdkRepackImmuListSerializer.registerSerializers(config); - ImmutableSetSerializer.registerSerializers(config); - SdkRepackImmuSetSerializer.registerSerializers(config); - ImmutableMapSerializer.registerSerializers(config); - - config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class); - return config; - } - - @Override - public StormPipelineResult run(Pipeline pipeline) { - LOG.info("Running pipeline..."); - TranslationContext context = new TranslationContext(this.options); - StormPipelineTranslator transformer = new StormPipelineTranslator(context); - transformer.translate(pipeline); - LOG.info("UserGraphContext=\n{}", context.getUserGraphContext()); - LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext()); - - for (Stream stream : context.getExecutionGraphContext().getStreams()) { - LOG.info(stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId()); - } - - String topologyName = options.getJobName(); - Config config = convertPipelineOptionsToConfig(options); - - return runTopology( - topologyName, - getTopology(options, context.getExecutionGraphContext()), - config); - } - - private StormPipelineResult runTopology(String topologyName, StormTopology topology, Config config) { - try { - if (StormConfig.local_mode(config)) { - LocalCluster localCluster = LocalCluster.getInstance(); - localCluster.submitTopology(topologyName, config, topology); - return new LocalStormPipelineResult( - topologyName, config, localCluster, options.getLocalModeExecuteTime()); - } else { - StormSubmitter.submitTopology(topologyName, config, topology); - return null; - } - } catch (Exception e) { - LOG.warn("Fail to submit topology", e); - throw new RuntimeException("Fail to submit topology", e); - } - } - - public static abstract class StormPipelineResult implements PipelineResult { - - private final String topologyName; - private final Config config; - - StormPipelineResult(String topologyName, Config config) { - this.config = checkNotNull(config, "config"); - this.topologyName = checkNotNull(topologyName, "topologyName"); - } - - public State getState() { - return null; - } - - public Config getConfig() { - return config; - } - - public String getTopologyName() { - return topologyName; - } - } - - public static class LocalStormPipelineResult extends StormPipelineResult { - - private LocalCluster localCluster; - private long localModeExecuteTimeSecs; - - LocalStormPipelineResult( - String topologyName, - Config config, - LocalCluster localCluster, - long localModeExecuteTimeSecs) { - super(topologyName, config); - this.localCluster = checkNotNull(localCluster, "localCluster"); - } - - @Override - public State cancel() throws IOException { - //localCluster.deactivate(getTopologyName()); - localCluster.killTopology(getTopologyName()); - localCluster.shutdown(); - JStormUtils.sleepMs(1000); - return State.CANCELLED; - } - - @Override - public State waitUntilFinish(Duration duration) { - return waitUntilFinish(); - } - - @Override - public State waitUntilFinish() { - JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000); - try { - return cancel(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public MetricResults metrics() { - return null; - } - } - - private AbstractComponent getComponent(String id, TranslationContext.ExecutionGraphContext context) { - AbstractComponent component = null; - AdaptorBasicSpout spout = context.getSpout(id); - if (spout != null) { - component = spout; - } else { - AdaptorBasicBolt bolt = context.getBolt(id); - if (bolt != null) - component = bolt; - } - - return component; - } - - private StormTopology getTopology(StormPipelineOptions options, TranslationContext.ExecutionGraphContext context) { - boolean isExactlyOnce = options.getExactlyOnceTopology(); - TopologyBuilder builder = isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder(); - - int parallelismNumber = options.getParallelismNumber(); - Map<String, AdaptorBasicSpout> spouts = context.getSpouts(); - for (String id : spouts.keySet()) { - IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id)); - builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber)); - } - - HashMap<String, BoltDeclarer> declarers = new HashMap<>(); - Iterable<Stream> streams = context.getStreams(); - LOG.info("streams=" + streams); - for (Stream stream : streams) { - String destBoltId = stream.getConsumer().getComponentId(); - IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId)); - BoltDeclarer declarer = declarers.get(destBoltId); - if (declarer == null) { - declarer = builder.setBolt(destBoltId, bolt, - getParallelismNum(context.getBolt(destBoltId), parallelismNumber)); - declarers.put(destBoltId, declarer); - } - - Stream.Grouping grouping = stream.getConsumer().getGrouping(); - String streamId = stream.getProducer().getStreamId(); - String srcBoltId = stream.getProducer().getComponentId(); - - // add stream output declare for "from" component - AbstractComponent component = getComponent(srcBoltId, context); - if (grouping.getType().equals(Stream.Grouping.Type.FIELDS)) - component.addKVOutputField(streamId); - else - component.addOutputField(streamId); - - // "to" component declares grouping to "from" component - switch (grouping.getType()) { - case SHUFFLE: - declarer.shuffleGrouping(srcBoltId, streamId); - break; - case FIELDS: - declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields())); - break; - case ALL: - declarer.allGrouping(srcBoltId, streamId); - break; - case DIRECT: - declarer.directGrouping(srcBoltId, streamId); - break; - case GLOBAL: - declarer.globalGrouping(srcBoltId, streamId); - break; - case LOCAL_OR_SHUFFLE: - declarer.localOrShuffleGrouping(srcBoltId, streamId); - break; - case NONE: - declarer.noneGrouping(srcBoltId, streamId); - break; - default: - throw new UnsupportedOperationException("unsupported grouping type: " + grouping); - } - - // Subscribe grouping of water mark stream - component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID); - declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID); - } - - if (isExactlyOnce) { - ((TransactionTopologyBuilder) builder).enableHdfs(); - } - return builder.createTopology(); - } - - private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) { - IRichSpout ret = null; - if (isExactlyOnce) { - if (spout instanceof UnboundedSourceSpout) { - ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout); - } else { - String error = String.format("The specified type(%s) is not supported in exactly once mode yet!", spout.getClass().toString()); - throw new RuntimeException(error); - } - } else { - ret = spout; - } - return ret; - } - - private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) { - return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt; - } - - /** - * Calculate the final parallelism number according to the configured number and global number. - * @param component - * @param globalParallelismNum - * @return final parallelism number for the specified component - */ - private int getParallelismNum(AbstractComponent component, int globalParallelismNum) { - int configParallelismNum = component.getParallelismNum(); - return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/TestJStormRunner.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/TestJStormRunner.java deleted file mode 100644 index cea79a1..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/TestJStormRunner.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.alibaba.jstorm.beam; - -import avro.shaded.com.google.common.collect.Maps; -import com.alibaba.jstorm.cache.KvStoreManagerFactory; -import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.common.metric.AsmMetric; -import com.alibaba.jstorm.metric.*; -import com.alibaba.jstorm.utils.JStormUtils; -import com.google.common.base.Optional; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.PAssert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Test JStorm runner. - */ -public class TestJStormRunner extends PipelineRunner<StormRunner.StormPipelineResult> { - - private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class); - - public static TestJStormRunner fromOptions(PipelineOptions options) { - return new TestJStormRunner(options.as(StormPipelineOptions.class)); - } - - private final StormRunner stormRunner; - private final StormPipelineOptions options; - - private TestJStormRunner(StormPipelineOptions options) { - this.options = options; - Map conf = Maps.newHashMap(); - //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString()); - options.setTopologyConfig(conf); - options.setLocalMode(true); - stormRunner = StormRunner.fromOptions(checkNotNull(options, "options")); - } - - @Override - public StormRunner.StormPipelineResult run(Pipeline pipeline) { - StormRunner.StormPipelineResult result = stormRunner.run(pipeline); - - try { - int numberOfAssertions = PAssert.countAsserts(pipeline); - - LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions); - if(numberOfAssertions == 0) { - // If assert number is zero, wait 5 sec - JStormUtils.sleepMs(5000); - return result; - } else { - for (int i = 0; i < 40; ++i) { - Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions); - if (success.isPresent() && success.get()) { - return result; - } else if (success.isPresent() && !success.get()) { - throw new AssertionError("Failed assertion checks."); - } else { - JStormUtils.sleepMs(500); - } - } - LOG.info("Assertion checks timed out."); - throw new AssertionError("Assertion checks timed out."); - } - } finally { - clearPAssertCount(); - cancel(result); - } - } - - private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) { - int successes = 0; - for (AsmMetric metric : JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) { - successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); - } - int failures = 0; - for (AsmMetric metric : JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) { - failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); - } - - if (failures > 0) { - LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); - return Optional.of(false); - } else if (successes >= expectedNumberOfAssertions) { - LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); - return Optional.of(true); - } - - LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); - return Optional.absent(); - } - - private void clearPAssertCount() { - String topologyName = options.getJobName(); - AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics(); - Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator(); - while (itr.hasNext()) { - Map.Entry<String, AsmMetric> metric = itr.next(); - if (metric.getKey().contains(topologyName)) { - itr.remove(); - } - } - } - - private void cancel(StormRunner.StormPipelineResult result) { - try { - result.cancel(); - } catch (IOException e) { - throw new RuntimeException("Failed to cancel.", e); -} - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableListSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableListSerializer.java deleted file mode 100644 index 5eabb5f..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableListSerializer.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.alibaba.jstorm.beam.serialization; - -import backtype.storm.Config; -import com.alibaba.jstorm.beam.util.RunnerUtils; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import com.google.common.collect.*; - -public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> { - - private static final boolean DOES_NOT_ACCEPT_NULL = false; - private static final boolean IMMUTABLE = true; - - public ImmutableListSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } - - @Override - public void write(Kryo kryo, Output output, ImmutableList<Object> object) { - output.writeInt(object.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } - } - - @Override - public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) { - final int size = input.readInt(true); - final Object[] list = new Object[size]; - for (int i = 0; i < size; ++i) { - list[i] = kryo.readClassAndObject(input); - } - return ImmutableList.copyOf(list); - } - - /** - * Creates a new {@link ImmutableListSerializer} and registers its serializer - * for the several ImmutableList related classes. - */ - public static void registerSerializers(Config config) { - - // ImmutableList (abstract class) - // +- RegularImmutableList - // | RegularImmutableList - // +- SingletonImmutableList - // | Optimized for List with only 1 element. - // +- SubList - // | Representation for part of ImmutableList - // +- ReverseImmutableList - // | For iterating in reverse order - // +- StringAsImmutableList - // | Used by Lists#charactersOf - // +- Values (ImmutableTable values) - // Used by return value of #values() when there are multiple cells - - config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class); - - // Note: - // Only registering above is good enough for serializing/deserializing. - // but if using Kryo#copy, following is required. - - config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), ImmutableListSerializer.class); - config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), ImmutableListSerializer.class); - config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1,2,3).subList(1, 2).getClass()), ImmutableListSerializer.class); - config.registerSerialization(ImmutableList.of().reverse().getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), ImmutableListSerializer.class); - - config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), ImmutableListSerializer.class); - - Table<Integer,Integer,Integer> baseTable = HashBasedTable.create(); - baseTable.put(1, 2, 3); - baseTable.put(4, 5, 6); - Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable); - config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(table.values().getClass()), ImmutableListSerializer.class); - - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableMapSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableMapSerializer.java deleted file mode 100644 index 9980292..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableMapSerializer.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.alibaba.jstorm.beam.serialization; - -import backtype.storm.Config; -import com.alibaba.jstorm.beam.util.RunnerUtils; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; - -import java.util.EnumMap; -import java.util.HashMap; -import java.util.Map; - -public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> { - - private static final boolean DOES_NOT_ACCEPT_NULL = true; - private static final boolean IMMUTABLE = true; - - public ImmutableMapSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } - - @Override - public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) { - kryo.writeObject(output, Maps.newHashMap(immutableMap)); - } - - @Override - public ImmutableMap<Object, Object> read(Kryo kryo, Input input, Class<ImmutableMap<Object, ? extends Object>> type) { - Map map = kryo.readObject(input, HashMap.class); - return ImmutableMap.copyOf(map); - } - - /** - * Creates a new {@link ImmutableMapSerializer} and registers its serializer - * for the several ImmutableMap related classes. - */ - public static void registerSerializers(Config config) { - - config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class); - config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class); - - Object o1 = new Object(); - Object o2 = new Object(); - - config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class); - config.registerSerialization(ImmutableMap.of(o1, o1, o2, o2).getClass(), ImmutableMapSerializer.class); - Map<DummyEnum,Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class); - for (DummyEnum e : DummyEnum.values()) { - enumMap.put(e, o1); - } - - config.registerSerialization(ImmutableMap.copyOf(enumMap).getClass(), ImmutableMapSerializer.class); - } - - private enum DummyEnum { - VALUE1, - VALUE2 - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableSetSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableSetSerializer.java deleted file mode 100644 index a0d1627..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/ImmutableSetSerializer.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.alibaba.jstorm.beam.serialization; - -import backtype.storm.Config; -import com.alibaba.jstorm.beam.util.RunnerUtils; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; - -public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> { - - private static final boolean DOES_NOT_ACCEPT_NULL = false; - private static final boolean IMMUTABLE = true; - - public ImmutableSetSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } - - @Override - public void write(Kryo kryo, Output output, ImmutableSet<Object> object) { - output.writeInt(object.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } - } - - @Override - public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) { - final int size = input.readInt(true); - ImmutableSet.Builder<Object> builder = ImmutableSet.builder(); - for (int i = 0; i < size; ++i) { - builder.add(kryo.readClassAndObject(input)); - } - return builder.build(); - } - - /** - * Creates a new {@link ImmutableSetSerializer} and registers its serializer - * for the several ImmutableSet related classes. - */ - public static void registerSerializers(Config config) { - - // ImmutableList (abstract class) - // +- EmptyImmutableSet - // | EmptyImmutableSet - // +- SingletonImmutableSet - // | Optimized for Set with only 1 element. - // +- RegularImmutableSet - // | RegularImmutableList - // +- EnumImmutableSet - // | EnumImmutableSet - - config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class); - - // Note: - // Only registering above is good enough for serializing/deserializing. - // but if using Kryo#copy, following is required. - - config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class); - config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class); - config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), ImmutableSetSerializer.class); - - config.registerSerialization( - Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), ImmutableSetSerializer.class); - } - - private enum SomeEnum { - A, B, C - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/KvStoreIterableSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/KvStoreIterableSerializer.java deleted file mode 100644 index 1207763..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/KvStoreIterableSerializer.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.alibaba.jstorm.beam.serialization; - -import com.alibaba.jstorm.cache.KvStoreIterable; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import com.google.common.collect.Lists; - -import java.util.Iterator; -import java.util.List; - -public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> { - - public KvStoreIterableSerializer() { - - } - - @Override - public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) { - List<Object> values = Lists.newArrayList(object); - output.writeInt(values.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } - } - - @Override - public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) { - final int size = input.readInt(true); - List<Object> values = Lists.newArrayList(); - for (int i = 0; i < size; ++i) { - values.add(kryo.readClassAndObject(input)); - } - - return new KvStoreIterable<Object>() { - Iterable<Object> values; - - @Override - public Iterator<Object> iterator() { - return values.iterator(); - } - - public KvStoreIterable init(Iterable<Object> values) { - this.values = values; - return this; - } - - @Override - public String toString() { - return values.toString(); - } - }.init(values); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/SdkRepackImmuListSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/SdkRepackImmuListSerializer.java deleted file mode 100644 index 2fc2067..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/SdkRepackImmuListSerializer.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.alibaba.jstorm.beam.serialization; - - -import backtype.storm.Config; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import org.apache.beam.sdk.repackaged.com.google.common.collect.*; - -public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> { - - private static final boolean DOES_NOT_ACCEPT_NULL = false; - private static final boolean IMMUTABLE = true; - - public SdkRepackImmuListSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } - - @Override - public void write(Kryo kryo, Output output, ImmutableList<Object> object) { - output.writeInt(object.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } - } - - @Override - public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) { - final int size = input.readInt(true); - final Object[] list = new Object[size]; - for (int i = 0; i < size; ++i) { - list[i] = kryo.readClassAndObject(input); - } - return ImmutableList.copyOf(list); - } - - /** - * Creates a new {@link ImmutableListSerializer} and registers its serializer - * for the several ImmutableList related classes. - */ - public static void registerSerializers(Config config) { - - // ImmutableList (abstract class) - // +- RegularImmutableList - // | RegularImmutableList - // +- SingletonImmutableList - // | Optimized for List with only 1 element. - // +- SubList - // | Representation for part of ImmutableList - // +- ReverseImmutableList - // | For iterating in reverse order - // +- StringAsImmutableList - // | Used by Lists#charactersOf - // +- Values (ImmutableTable values) - // Used by return value of #values() when there are multiple cells - - config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class); - - // Note: - // Only registering above is good enough for serializing/deserializing. - // but if using Kryo#copy, following is required. - - config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class); - config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class); - config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), SdkRepackImmuListSerializer.class); - config.registerSerialization(ImmutableList.of().reverse().getClass(), SdkRepackImmuListSerializer.class); - - config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), SdkRepackImmuListSerializer.class); - - Table<Integer,Integer,Integer> baseTable = HashBasedTable.create(); - baseTable.put(1, 2, 3); - baseTable.put(4, 5, 6); - Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable); - config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class); - - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/SdkRepackImmuSetSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/SdkRepackImmuSetSerializer.java deleted file mode 100644 index 0cbcad9..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/SdkRepackImmuSetSerializer.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.alibaba.jstorm.beam.serialization; - -import backtype.storm.Config; -import com.alibaba.jstorm.beam.util.RunnerUtils; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import org.apache.beam.sdk.repackaged.com.google.common.collect.*; - -public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> { - - private static final boolean DOES_NOT_ACCEPT_NULL = false; - private static final boolean IMMUTABLE = true; - - public SdkRepackImmuSetSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } - - @Override - public void write(Kryo kryo, Output output, ImmutableSet<Object> object) { - output.writeInt(object.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } - } - - @Override - public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) { - final int size = input.readInt(true); - ImmutableSet.Builder<Object> builder = ImmutableSet.builder(); - for (int i = 0; i < size; ++i) { - builder.add(kryo.readClassAndObject(input)); - } - return builder.build(); - } - - /** - * Creates a new {@link ImmutableSetSerializer} and registers its serializer - * for the several ImmutableSet related classes. - */ - public static void registerSerializers(Config config) { - - // ImmutableList (abstract class) - // +- EmptyImmutableSet - // | EmptyImmutableSet - // +- SingletonImmutableSet - // | Optimized for Set with only 1 element. - // +- RegularImmutableSet - // | RegularImmutableList - // +- EnumImmutableSet - // | EnumImmutableSet - - config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class); - - // Note: - // Only registering above is good enough for serializing/deserializing. - // but if using Kryo#copy, following is required. - - config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class); - config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class); - config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), SdkRepackImmuSetSerializer.class); - - config.registerSerialization( - Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), SdkRepackImmuSetSerializer.class); - } - - private enum SomeEnum { - A, B, C - } -} - http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/UnmodifiableCollectionsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/UnmodifiableCollectionsSerializer.java deleted file mode 100644 index 1eb7146..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/serialization/UnmodifiableCollectionsSerializer.java +++ /dev/null @@ -1,159 +0,0 @@ -package com.alibaba.jstorm.beam.serialization; - -import backtype.storm.Config; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; - -import java.lang.reflect.Field; -import java.util.*; - -public class UnmodifiableCollectionsSerializer extends Serializer<Object> { - - private static final Field SOURCE_COLLECTION_FIELD; - private static final Field SOURCE_MAP_FIELD; - - static { - try { - SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection" ) - .getDeclaredField( "c" ); - SOURCE_COLLECTION_FIELD.setAccessible( true ); - - - SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap" ) - .getDeclaredField( "m" ); - SOURCE_MAP_FIELD.setAccessible( true ); - } catch ( final Exception e ) { - throw new RuntimeException( "Could not access source collection" + - " field in java.util.Collections$UnmodifiableCollection.", e ); - } - } - - @Override - public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) { - final int ordinal = input.readInt( true ); - final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; - final Object sourceCollection = kryo.readClassAndObject( input ); - return unmodifiableCollection.create( sourceCollection ); - } - - @Override - public void write(final Kryo kryo, final Output output, final Object object) { - try { - final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( object.getClass() ); - // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") - output.writeInt( unmodifiableCollection.ordinal(), true ); - kryo.writeClassAndObject( output, unmodifiableCollection.sourceCollectionField.get( object ) ); - } catch ( final RuntimeException e ) { - // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... - // handles SerializationException specifically (resizing the buffer)... - throw e; - } catch ( final Exception e ) { - throw new RuntimeException( e ); - } - } - - @Override - public Object copy(Kryo kryo, Object original) { - try { - final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( original.getClass() ); - Object sourceCollectionCopy = kryo.copy(unmodifiableCollection.sourceCollectionField.get(original)); - return unmodifiableCollection.create( sourceCollectionCopy ); - } catch ( final RuntimeException e ) { - // Don't eat and wrap RuntimeExceptions - throw e; - } catch ( final Exception e ) { - throw new RuntimeException( e ); - } - } - - private static enum UnmodifiableCollection { - COLLECTION( Collections.unmodifiableCollection( Arrays.asList( "" ) ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableCollection( (Collection<?>) sourceCollection ); - } - }, - RANDOM_ACCESS_LIST( Collections.unmodifiableList( new ArrayList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableList( (List<?>) sourceCollection ); - } - }, - LIST( Collections.unmodifiableList( new LinkedList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableList( (List<?>) sourceCollection ); - } - }, - SET( Collections.unmodifiableSet( new HashSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableSet( (Set<?>) sourceCollection ); - } - }, - SORTED_SET( Collections.unmodifiableSortedSet( new TreeSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableSortedSet( (SortedSet<?>) sourceCollection ); - } - }, - MAP( Collections.unmodifiableMap( new HashMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) { - - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableMap( (Map<?, ?>) sourceCollection ); - } - - }, - SORTED_MAP( Collections.unmodifiableSortedMap( new TreeMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) { - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableSortedMap( (SortedMap<?, ?>) sourceCollection ); - } - }; - - private final Class<?> type; - private final Field sourceCollectionField; - - private UnmodifiableCollection( final Class<?> type, final Field sourceCollectionField ) { - this.type = type; - this.sourceCollectionField = sourceCollectionField; - } - - /** - * @param sourceCollection - */ - public abstract Object create( Object sourceCollection ); - - static UnmodifiableCollection valueOfType(final Class<?> type ) { - for( final UnmodifiableCollection item : values() ) { - if ( item.type.equals( type ) ) { - return item; - } - } - throw new IllegalArgumentException( "The type " + type + " is not supported." ); - } - - } - - /** - * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer - * for the several unmodifiable Collections that can be created via {@link Collections}, - * including {@link Map}s. - * - * @see Collections#unmodifiableCollection(Collection) - * @see Collections#unmodifiableList(List) - * @see Collections#unmodifiableSet(Set) - * @see Collections#unmodifiableSortedSet(SortedSet) - * @see Collections#unmodifiableMap(Map) - * @see Collections#unmodifiableSortedMap(SortedMap) - */ - public static void registerSerializers( Config config ) { - UnmodifiableCollection.values(); - for ( final UnmodifiableCollection item : UnmodifiableCollection.values() ) { - config.registerSerialization( item.type, UnmodifiableCollectionsSerializer.class ); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/StormPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/StormPipelineTranslator.java deleted file mode 100644 index 7eae1da..0000000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/StormPipelineTranslator.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation; - -import com.alibaba.jstorm.beam.translation.translator.ViewTranslator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import org.apache.beam.runners.core.construction.PTransformMatchers; -import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.PTransformMatcher; -import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.values.PValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.beam.translation.translator.TransformTranslator; - -import java.util.List; -import java.util.Map; - -/** - * Pipleline translator of Storm - */ -public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { - private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class); - private TranslationContext context; - private int depth = 0; - - public StormPipelineTranslator(TranslationContext context) { - this.context = context; - } - - public void translate(Pipeline pipeline) { - List<PTransformOverride> transformOverrides = - ImmutableList.<PTransformOverride>builder() - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class), - new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), - new ReflectiveOneToOneOverrideFactory((ViewTranslator.CombineGloballyAsSingletonView.class)))) - .build(); - pipeline.replaceAll(transformOverrides); - pipeline.traverseTopologically(this); - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node); - this.depth++; - - // check if current composite transforms need to be translated. - // If not, all sub transforms will be translated in visitPrimitiveTransform. - PTransform<?, ?> transform = node.getTransform(); - if (transform != null) { - TransformTranslator translator = TranslatorRegistry.getTranslator(transform); - - if (translator != null && applyCanTranslate(transform, node, translator)) { - applyStreamingTransform(transform, node, translator); - LOG.info(genSpaces(this.depth) + "translated-" + node); - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; - } - } - return CompositeBehavior.ENTER_TRANSFORM; - } - - public void leaveCompositeTransform(TransformHierarchy.Node node) { - this.depth--; - LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node); - } - - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node); - - if (!node.isRootNode()) { - PTransform<?, ?> transform = node.getTransform(); - TransformTranslator translator = TranslatorRegistry.getTranslator(transform); - if (translator == null || !applyCanTranslate(transform, node, translator)) { - LOG.info(node.getTransform().getClass().toString()); - throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); - } - applyStreamingTransform(transform, node, translator); - } - } - - public void visitValue(PValue value, TransformHierarchy.Node node) { - LOG.info(genSpaces(this.depth) + "visiting value {}", value); - } - - private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node, - TransformTranslator<?> translator) { - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - @SuppressWarnings("unchecked") - TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; - - context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); - typedTranslator.translateNode(typedTransform, context); - - // Maintain PValue to TupleTag map for side inputs translation. - context.getUserGraphContext().recordOutputTaggedPValue(); - } - - private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, TransformTranslator<?> translator) { - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - @SuppressWarnings("unchecked") - TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; - - context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform()); - - return typedTranslator.canTranslate(typedTransform, context); - } - - /** - * Utility formatting method. - * - * @param n number of spaces to generate - * @return String with "|" followed by n spaces - */ - protected static String genSpaces(int n) { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < n; i++) { - builder.append("| "); - } - return builder.toString(); - } - - private static class ReflectiveOneToOneOverrideFactory< - InputT extends PValue, - OutputT extends PValue, - TransformT extends PTransform<InputT, OutputT>> - extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> { - private final Class<PTransform<InputT, OutputT>> replacement; - - private ReflectiveOneToOneOverrideFactory( - Class<PTransform<InputT, OutputT>> replacement) { - this.replacement = replacement; - } - - @Override - public PTransformReplacement<InputT, OutputT> getReplacementTransform(AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) { - PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform(); - PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement) - .withArg((Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), originalPTransform) - .build(); - InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values()); - return PTransformReplacement.of(inputT, replacedPTransform); - } - } -}
