http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java new file mode 100644 index 0000000..1de881f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import backtype.storm.task.TopologyContext; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.google.auto.value.AutoValue; + +@AutoValue +public abstract class ExecutorContext { + public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) { + return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager); + } + + public abstract TopologyContext getTopologyContext(); + + public abstract ExecutorsBolt getExecutorsBolt(); + + public abstract IKvStoreManager getKvStoreManager(); +}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java new file mode 100644 index 0000000..9df1e17 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import java.io.IOException; +import java.util.*; + +import avro.shaded.com.google.common.base.Joiner; +import avro.shaded.com.google.common.collect.Sets; +import backtype.storm.tuple.ITupleExt; +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.cache.KvStoreManagerFactory; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.utils.KryoSerializer; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Maps; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ExecutorsBolt extends AdaptorBasicBolt { + private static final long serialVersionUID = -7751043327801735211L; + + private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); + + protected ExecutorContext executorContext; + + protected TimerService timerService; + + // map from input tag to executor inside bolt + protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); + // set of all output tags that will be emit outside bolt + protected final Set<TupleTag> outputTags = Sets.newHashSet(); + protected final Set<TupleTag> externalOutputTags = Sets.newHashSet(); + protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet(); + protected int internalDoFnExecutorId = 1; + protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap(); + + protected OutputCollector collector; + + protected boolean isStatefulBolt = false; + + protected KryoSerializer<WindowedValue> serializer; + + public ExecutorsBolt() { + + } + + public void setStatefulBolt(boolean isStateful) { + isStatefulBolt = isStateful; + } + + public void addExecutor(TupleTag inputTag, Executor executor) { + inputTagToExecutor.put( + checkNotNull(inputTag, "inputTag"), + checkNotNull(executor, "executor")); + } + + public Map<TupleTag, Executor> getExecutors() { + return inputTagToExecutor; + } + + public void registerExecutor(Executor executor) { + if (executor instanceof DoFnExecutor) { + DoFnExecutor doFnExecutor = (DoFnExecutor) executor; + idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor); + doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId); + internalDoFnExecutorId++; + } + } + + public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() { + return idToDoFnExecutor; + } + + public void addOutputTags(TupleTag tag) { + outputTags.add(tag); + } + + public void addExternalOutputTag(TupleTag<?> tag) { + externalOutputTags.add(tag); + } + + public Set<TupleTag> getOutputTags() { + return outputTags; + } + + public ExecutorContext getExecutorContext() { + return executorContext; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + LOG.info("Start to prepare for task-{}", context.getThisTaskId()); + try { + this.collector = collector; + + // init kv store manager + String storeName = String.format("task-%d", context.getThisTaskId()); + String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); + IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) : + KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt); + this.executorContext = ExecutorContext.of(context, this, kvStoreManager); + + // init time service + timerService = initTimerService(); + + // init all internal executors + for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { + executor.init(executorContext); + if (executor instanceof DoFnExecutor) { + doFnExecutors.add((DoFnExecutor) executor); + } + } + + this.serializer = new KryoSerializer<WindowedValue>(stormConf); + + LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values()); + LOG.info("inputTagToExecutor={}", inputTagToExecutor); + LOG.info("outputTags={}", outputTags); + LOG.info("externalOutputTags={}", externalOutputTags); + LOG.info("doFnExecutors={}", doFnExecutors); + } catch (IOException e) { + throw new RuntimeException("Failed to prepare executors bolt", e); + } + } + + public TimerService initTimerService() { + TopologyContext context = executorContext.getTopologyContext(); + List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet()) + .transformAndConcat( + new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() { + @Override + public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) { + if (Common.isSystemComponent(value.getKey())) { + return Collections.EMPTY_LIST; + } else { + return value.getValue(); + } + } + }) + .toList(); + TimerService ret = new TimerServiceImpl(executorContext); + ret.init(tasks); + return ret; + } + + @Override + public void execute(Tuple input) { + // process a batch + String streamId = input.getSourceStreamId(); + ITupleExt tuple = (ITupleExt) input; + Iterator<List<Object>> valueIterator = tuple.batchValues().iterator(); + if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { + while (valueIterator.hasNext()) { + processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); + } + } else { + doFnStartBundle(); + while (valueIterator.hasNext()) { + processElement(valueIterator.next(), streamId); + } + doFnFinishBundle(); + } + } + + private void processWatermark(long watermarkTs, int sourceTask) { + long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs); + LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}", + (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime()); + if (newWaterMark != 0) { + // Some buffer windows are going to be triggered. + doFnStartBundle(); + timerService.fireTimers(newWaterMark); + + // SideInput: If receiving water mark with max timestamp, It means no more data is supposed + // to be received from now on. So we are going to process all push back data. + if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.processAllPushBackElements(); + } + } + + doFnFinishBundle(); + } + + long currentWaterMark = timerService.currentOutputWatermark(); + if (!externalOutputTags.isEmpty()) { + collector.flush(); + collector.emit( + CommonInstance.BEAM_WATERMARK_STREAM_ID, + new Values(currentWaterMark)); + LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime()); + } + } + + private void processElement(List<Object> values, String streamId) { + TupleTag inputTag = new TupleTag(streamId); + WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values); + processExecutorElem(inputTag, windowedValue); + } + + public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { + LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue()); + if (elem != null) { + Executor executor = inputTagToExecutor.get(inputTag); + if (executor != null) { + executor.process(inputTag, elem); + } + if (externalOutputTags.contains(inputTag)) { + emitOutsideBolt(inputTag, elem); + } + } else { + LOG.info("Received null elem for tag={}", inputTag); + } + } + + @Override + public void cleanup() { + for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { + executor.cleanup(); + } + executorContext.getKvStoreManager().close(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + public TimerService timerService() { + return timerService; + } + + public void setTimerService(TimerService service) { + timerService = service; + } + + private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) { + WindowedValue wv = null; + if (values.size() > 1) { + Object key = values.get(0); + WindowedValue value = serializer.deserialize((byte[]) values.get(1)); + wv = value.withValue(KV.of(key, value.getValue())); + } else { + wv = serializer.deserialize((byte[])values.get(0)); + } + return wv; + } + + protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) { + LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue()); + if (keyedEmit(outputTag.getId())) { + KV kv = (KV) outputValue.getValue(); + byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue())); + // Convert WindowedValue<KV> to <K, WindowedValue<V>> + if (kv.getKey() == null) { + // If key is null, emit "null" string here. Because, null value will be ignored in JStorm. + collector.emit(outputTag.getId(), new Values("null", immutableOutputValue)); + } else { + collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue)); + } + } else { + byte[] immutableOutputValue = serializer.serialize(outputValue); + collector.emit(outputTag.getId(), new Values(immutableOutputValue)); + } + } + + private void doFnStartBundle() { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.startBundle(); + } + } + + private void doFnFinishBundle() { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.finishBundle(); + } + } + + @Override + public String toString() { + // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString()); + List<String> ret = new ArrayList<>(); + /*ret.add("inputTags"); + for (TupleTag inputTag : inputTagToExecutor.keySet()) { + ret.add(inputTag.getId()); + }*/ + ret.add("internalExecutors"); + for (Executor executor : inputTagToExecutor.values()) { + ret.add(executor.toString()); + } + ret.add("externalOutputTags"); + for (TupleTag output : externalOutputTags) { + ret.add(output.getId()); + } + return Joiner.on('\n').join(ret).concat("\n"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java new file mode 100644 index 0000000..1ef28c9 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class FlattenExecutor<InputT> implements Executor { + + private final String description; + private TupleTag mainOutputTag; + private ExecutorContext context; + private ExecutorsBolt executorsBolt; + + public FlattenExecutor(String description, TupleTag mainTupleTag) { + this.description = checkNotNull(description, "description"); + this.mainOutputTag = mainTupleTag; + } + + @Override + public void init(ExecutorContext context) { + this.context = context; + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + executorsBolt.processExecutorElem(mainOutputTag, elem); + } + + @Override + public void cleanup() {} + + @Override + public String toString() { + return description; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java new file mode 100644 index 0000000..419a4a0 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import java.io.Serializable; +import java.util.List; + +import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; +import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; +import com.google.common.collect.ImmutableList; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext; +import org.apache.beam.runners.jstorm.util.RunnerUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkArgument; + +public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> { + private static final long serialVersionUID = -7563050475488610553L; + + private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class); + + private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable { + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + executorsBolt.processExecutorElem(tag, output); + } + } + + private KvCoder<K, V> inputKvCoder; + private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; + + public GroupByWindowExecutor( + String stepName, + String description, + TranslationContext context, + StormPipelineOptions pipelineOptions, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) { + // The doFn will be created when runtime. Just pass "null" here + super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags); + + this.outputManager = new GroupByWindowOutputManager(); + UserGraphContext userGraphContext = context.getUserGraphContext(); + PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput(); + this.inputKvCoder = (KvCoder<K, V>) input.getCoder(); + } + + private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() { + final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() { + @Override + public StateInternals stateInternalsForKey(K key) { + return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); + } + }; + TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() { + @Override + public TimerInternals timerInternalsForKey(K key) { + return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService()); + } + }; + + reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder()); + DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn = + GroupAlsoByWindowViaWindowSetNewDoFn.create( + windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(), + (SystemReduceFn) reduceFn, outputManager, mainTupleTag); + return doFn; + } + + @Override + protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() { + doFn = getGroupByWindowDoFn(); + + DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner( + this.pipelineOptions, + this.doFn, + NullSideInputReader.empty(), + this.outputManager, + this.mainTupleTag, + this.sideOutputTags, + this.stepContext, + this.windowingStrategy); + + DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner( + simpleRunner, + this.stepContext, + this.windowingStrategy); + return new DoFnRunnerWithMetrics<>( + stepName, doFnRunner, MetricsReporter.create(metricClient)); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + /** + * For GroupByKey, KV type elem is received. We need to convert the KV elem + * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner. + */ + KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); + runner.processElement(elem.withValue(keyedWorkItem)); + } + + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + StateNamespace namespace = timerData.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + + runner.processElement( + WindowedValue.valueInGlobalWindow( + KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData)))); + } + + @Override + public String toString() { + return super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java new file mode 100644 index 0000000..a022440 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + +import com.alibaba.jstorm.common.metric.AsmCounter; +import com.alibaba.jstorm.metric.MetricClient; +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsFilter; + +/** + * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine. + */ +public class MetricsReporter { + + private static final String METRIC_KEY_SEPARATOR = "__"; + private static final String COUNTER_PREFIX = "__counter"; + + private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); + private final Map<String, Long> reportedCounters = Maps.newHashMap(); + private final MetricClient metricClient; + + public static MetricsReporter create(MetricClient metricClient) { + return new MetricsReporter(metricClient); + } + + private MetricsReporter(MetricClient metricClient) { + this.metricClient = checkNotNull(metricClient, "metricClient"); + } + + public MetricsContainer getMetricsContainer(String stepName) { + return metricsContainers.getContainer(stepName); + } + + public void updateMetrics() { + MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers); + MetricQueryResults metricQueryResults = + metricResults.queryMetrics(MetricsFilter.builder().build()); + updateCounters(metricQueryResults.counters()); + } + + private void updateCounters(Iterable<MetricResult<Long>> counters) { + System.out.print("updateCounters"); + for (MetricResult<Long> metricResult : counters) { + String metricName = getMetricNameString(COUNTER_PREFIX, metricResult); + System.out.print("metricName: " + metricName); + Long updateValue = metricResult.attempted(); + Long oldValue = reportedCounters.get(metricName); + + if (oldValue == null || oldValue < updateValue) { + AsmCounter counter = metricClient.registerCounter(metricName); + Long incValue = (oldValue == null ? updateValue : updateValue - oldValue); + counter.update(incValue); + } + } + } + + private String getMetricNameString(String prefix, MetricResult<?> metricResult) { + return prefix + + METRIC_KEY_SEPARATOR + metricResult.step() + + METRIC_KEY_SEPARATOR + metricResult.name().namespace() + + METRIC_KEY_SEPARATOR + metricResult.name().name(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java new file mode 100644 index 0000000..28dc234 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> { + private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); + + /** + * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag + * is used in downstream consumer. So before output, we need to map this "local" tag to "external" + * tag. See PCollectionTuple for details. + */ + public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + if (localTupleTagMap.containsKey(tag)) { + executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output); + } else { + executorsBolt.processExecutorElem(tag, output); + } + } + } + + protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap; + + public MultiOutputDoFnExecutor( + String stepName, + String description, + StormPipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + Coder<WindowedValue<InputT>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<InputT> mainInputTag, + Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, + TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags, + Map<TupleTag<?>, TupleTag<?>> localTupleTagMap + ) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, + sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); + this.localTupleTagMap = localTupleTagMap; + this.outputManager = new MultiOutputDoFnExecutorOutputManager(); + LOG.info("localTupleTagMap: {}", localTupleTagMap); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java new file mode 100644 index 0000000..a58a818 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; +import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> { + + public MultiStatefulDoFnExecutor( + String stepName, String description, + StormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn, + Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy, + TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + if (mainInputTag.equals(tag)) { + WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; + stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, + executorContext.getExecutorsBolt().timerService())); + stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + processMainInput(elem); + } else { + processSideInput(tag, elem); + } + } + + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + stepContext.setStateInternals(new JStormStateInternals<>(key, + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + super.onTimer(key, timerData); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java new file mode 100644 index 0000000..269f03c --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; +import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> { + public StatefulDoFnExecutor( + String stepName, String description, StormPipelineOptions pipelineOptions, + DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag, + Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>> + sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, + mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + if (mainInputTag.equals(tag)) { + WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; + stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, + executorContext.getExecutorsBolt().timerService())); + stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + processMainInput(elem); + } else { + processSideInput(tag, elem); + } + } + + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + stepContext.setStateInternals(new JStormStateInternals<>(key, + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + super.onTimer(key, timerData); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java new file mode 100644 index 0000000..47db018 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import org.apache.beam.runners.core.TimerInternals; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.List; + +/** + * Interface that tracks input watermarks and manages timers in each bolt. + */ +public interface TimerService extends Serializable { + + void init(List<Integer> upStreamTasks); + + /** + * + * @param task + * @param inputWatermark + * @return new watermark if any timer is triggered during the update of watermark, otherwise 0 + */ + long updateInputWatermark(Integer task, long inputWatermark); + + long currentInputWatermark(); + + long currentOutputWatermark(); + + void clearWatermarkHold(String namespace); + + void addWatermarkHold(String namespace, Instant watermarkHold); + + void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor); + + void fireTimers(long newWatermark); +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java new file mode 100644 index 0000000..3b864d5 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import avro.shaded.com.google.common.collect.Maps; +import avro.shaded.com.google.common.collect.Sets; +import com.alibaba.jstorm.utils.Pair; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.state.TimeDomain; +import org.joda.time.Instant; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +/** + * Default implementation of {@link TimerService}. + */ +public class TimerServiceImpl implements TimerService { + private transient ExecutorContext executorContext; + private transient Map<Integer, DoFnExecutor> idToDoFnExecutor; + + private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = new ConcurrentHashMap<>(); + private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>(); + private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>(); + private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>(); + private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>(); + private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>> + timerDataToKeyedExecutors = Maps.newHashMap(); + + private boolean initialized = false; + + public TimerServiceImpl() { + } + + public TimerServiceImpl(ExecutorContext executorContext) { + this.executorContext = executorContext; + this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor(); + } + + @Override + public void init(List<Integer> upStreamTasks) { + for (Integer task : upStreamTasks) { + upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + } + initialized = true; + } + + @Override + public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) { + checkState(initialized, "TimerService has not been initialized."); + Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task); + // Make sure the input watermark don't go backward. + if (taskInputWatermark > oldTaskInputWatermark) { + upStreamTaskToInputWatermark.put(task, taskInputWatermark); + inputWatermarks.add(taskInputWatermark); + inputWatermarks.remove(oldTaskInputWatermark); + + long newLocalInputWatermark = currentInputWatermark(); + if (newLocalInputWatermark > oldTaskInputWatermark) { + return newLocalInputWatermark; + } + } + return 0; + } + + @Override + public void fireTimers(long newWatermark) { + TimerInternals.TimerData timerData; + while ((timerData = eventTimeTimersQueue.peek()) != null + && timerData.getTimestamp().getMillis() <= newWatermark) { + for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) { + DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst()); + executor.onTimer(keyedExecutor.getSecond(), timerData); + } + eventTimeTimersQueue.remove(); + timerDataToKeyedExecutors.remove(timerData); + } + } + + @Override + public long currentInputWatermark() { + return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + } + + @Override + public long currentOutputWatermark() { + if (watermarkHolds.isEmpty()) { + return currentInputWatermark(); + } else { + return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis()); + } + } + + @Override + public void clearWatermarkHold(String namespace) { + Instant currentHold = namespaceToWatermarkHold.get(namespace); + if (currentHold != null) { + watermarkHolds.remove(currentHold); + namespaceToWatermarkHold.remove(namespace); + } + } + + @Override + public void addWatermarkHold(String namespace, Instant watermarkHold) { + Instant currentHold = namespaceToWatermarkHold.get(namespace); + if (currentHold == null) { + namespaceToWatermarkHold.put(namespace, watermarkHold); + watermarkHolds.add(watermarkHold); + } else if (currentHold != null && watermarkHold.isBefore(currentHold)) { + namespaceToWatermarkHold.put(namespace, watermarkHold); + watermarkHolds.add(watermarkHold); + watermarkHolds.remove(currentHold); + } + } + + @Override + public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) { + checkArgument( + TimeDomain.EVENT_TIME.equals(timerData.getDomain()), + String.format("Does not support domain: %s.", timerData.getDomain())); + Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData); + if (keyedExecutors == null) { + keyedExecutors = Sets.newHashSet(); + eventTimeTimersQueue.add(timerData); + } + keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); + timerDataToKeyedExecutors.put(timerData, keyedExecutors); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java new file mode 100644 index 0000000..0fb88ab --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import com.alibaba.jstorm.cache.IKvStore; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor { + private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class); + + private static final String TIME_SERVICE_STORE_ID = "timer_service_store"; + private static final String TIMER_SERVICE_KET = "timer_service_key"; + + private ExecutorsBolt executorsBolt; + private IKvStoreManager kvStoreManager; + private IKvStore<String, TimerService> timerServiceStore; + + public TxExecutorsBolt(ExecutorsBolt executorsBolt) { + this.executorsBolt = executorsBolt; + this.executorsBolt.setStatefulBolt(true); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + try { + executorsBolt.prepare(stormConf, context, collector); + kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager(); + timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); + } catch (IOException e) { + LOG.error("Failed to prepare stateful bolt", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void execute(Tuple input) { + executorsBolt.execute(input); + } + + @Override + public void cleanup() { + executorsBolt.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + executorsBolt.declareOutputFields(declarer); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return executorsBolt.getComponentConfiguration(); + } + + @Override + public void initState(Object userState) { + LOG.info("Begin to init from state: {}", userState); + restore(userState); + } + + @Override + public Object finishBatch(long batchId) { + try { + timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService()); + } catch (IOException e) { + LOG.error("Failed to store current timer service status", e); + throw new RuntimeException(e.getMessage()); + } + kvStoreManager.checkpoint(batchId); + return null; + } + + @Override + public Object commit(long batchId, Object state) { + return kvStoreManager.backup(batchId); + } + + @Override + public void rollBack(Object userState) { + LOG.info("Begin to rollback from state: {}", userState); + restore(userState); + } + + @Override + public void ackCommit(long batchId, long timeStamp) { + kvStoreManager.remove(batchId); + } + + private void restore(Object userState) { + try { + // restore all states + kvStoreManager.restore(userState); + + // init timer service + timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); + TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET); + if (timerService == null) { + timerService = executorsBolt.initTimerService(); + } + executorsBolt.setTimerService(timerService); + } catch (IOException e) { + LOG.error("Failed to restore state", e); + throw new RuntimeException(e.getMessage()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java new file mode 100644 index 0000000..22dd07b --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import com.alibaba.jstorm.cache.IKvStore; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.cache.KvStoreManagerFactory; +import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor; +import org.apache.beam.sdk.io.UnboundedSource; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class); + + private static final String SOURCE_STORE_ID = "SourceCheckpoint"; + private static final String CHECKPOINT_MARK = "CheckpointMark"; + + private UnboundedSourceSpout sourceSpout; + private UnboundedSource.UnboundedReader reader; + private IKvStoreManager kvStoreManager; + private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore; + + public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) { + this.sourceSpout = sourceSpout; + } + + private void restore(Object userState) { + try { + kvStoreManager.restore(userState); + sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID); + UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK); + sourceSpout.createSourceReader(checkpointMark); + reader = sourceSpout.getUnboundedSourceReader(); + } catch (IOException e) { + LOG.error("Failed to init state", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void initState(Object userState) { + restore(userState); + } + + @Override + public Object finishBatch(long checkpointId) { + try { + // Store check point mark from unbounded source reader + UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark(); + sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark); + + // checkpoint all kv stores in current manager + kvStoreManager.checkpoint(checkpointId); + } catch (IOException e) { + LOG.error(String.format("Failed to finish batch-%s", checkpointId), e); + throw new RuntimeException(e.getMessage()); + } + return null; + } + + @Override + public Object commit(long batchId, Object state) { + // backup kv stores to remote state backend + return kvStoreManager.backup(batchId); + } + + @Override + public void rollBack(Object userState) { + restore(userState); + } + + @Override + public void ackCommit(long batchId, long timeStamp) { + // remove obsolete state in bolt local and remote state backend + kvStoreManager.remove(batchId); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + sourceSpout.declareOutputFields(declarer); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return sourceSpout.getComponentConfiguration(); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + try { + sourceSpout.open(conf, context, collector); + String storeName = String.format("task-%s", context.getThisTaskId()); + String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); + kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, storePath, true); + + reader = sourceSpout.getUnboundedSourceReader(); + } catch (IOException e) { + LOG.error("Failed to open transactional unbounded source spout", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void close() { + sourceSpout.close(); + } + + @Override + public void activate() { + sourceSpout.activate(); + } + + @Override + public void deactivate() { + sourceSpout.deactivate(); + } + + @Override + public void nextTuple() { + sourceSpout.nextTuple(); + } + + @Override + public void ack(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public void fail(Object msgId) { + throw new UnsupportedOperationException(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java new file mode 100644 index 0000000..973f703 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import com.alibaba.jstorm.utils.KryoSerializer; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Values; + +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Spout implementation that wraps a Beam UnboundedSource + * + * TODO: add wrapper to support metrics in UnboundedSource. + */ +public class UnboundedSourceSpout extends AdaptorBasicSpout { + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); + + private final String description; + private final UnboundedSource source; + private final SerializedPipelineOptions serializedOptions; + private final TupleTag<?> outputTag; + + private transient StormPipelineOptions pipelineOptions; + private transient UnboundedSource.UnboundedReader reader; + private transient SpoutOutputCollector collector; + + private volatile boolean hasNextRecord; + private AtomicBoolean activated = new AtomicBoolean(); + + private KryoSerializer<WindowedValue> serializer; + + private long lastWaterMark = 0l; + + public UnboundedSourceSpout( + String description, + UnboundedSource source, + StormPipelineOptions options, + TupleTag<?> outputTag) { + this.description = checkNotNull(description, "description"); + this.source = checkNotNull(source, "source"); + this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); + this.outputTag = checkNotNull(outputTag, "outputTag"); + } + + @Override + public synchronized void close() { + try { + activated.set(false); + this.reader.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void activate() { + activated.set(true); + + } + + @Override + public void deactivate() { + activated.set(false); + } + + @Override + public void ack(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public void fail(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + try { + this.collector = collector; + this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class); + + createSourceReader(null); + + this.serializer = new KryoSerializer<>(conf); + } catch (IOException e) { + throw new RuntimeException("Unable to create unbounded reader.", e); + } + } + + public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException { + if (reader != null) { + reader.close(); + } + reader = this.source.createReader(this.pipelineOptions, checkpointMark); + hasNextRecord = this.reader.start(); + } + + @Override + public synchronized void nextTuple() { + if (!activated.get()) { + return; + } + try { + if (!hasNextRecord) { + hasNextRecord = reader.advance(); + } + + while (hasNextRecord && activated.get()) { + Object value = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + + WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + LOG.debug("Source output: " + wv.getValue()); + if (keyedEmit(outputTag.getId())) { + KV kv = (KV) wv.getValue(); + // Convert WindowedValue<KV> to <K, WindowedValue<V>> + byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue())); + collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue)); + } else { + byte[] immutableValue = serializer.serialize(wv); + collector.emit(outputTag.getId(), new Values(immutableValue)); + } + + // move to next record + hasNextRecord = reader.advance(); + } + + Instant waterMark = reader.getWatermark(); + if (waterMark != null && lastWaterMark < waterMark.getMillis()) { + lastWaterMark = waterMark.getMillis(); + collector.flush(); + collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis())); + LOG.debug("Source output: WM-{}", waterMark.toDateTime()); + } + } catch (IOException e) { + throw new RuntimeException("Exception reading values from source.", e); + } + } + + public UnboundedSource getUnboundedSource() { + return source; + } + + public UnboundedSource.UnboundedReader getUnboundedSourceReader() { + return reader; + } + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java new file mode 100644 index 0000000..7b0e8db --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * JStorm {@link Executor} for {@link View}. + */ +public class ViewExecutor implements Executor { + + private final String description; + private final TupleTag outputTag; + private ExecutorsBolt executorsBolt; + + public ViewExecutor(String description, TupleTag outputTag) { + this.description = description; + this.outputTag = outputTag; + } + + @Override + public void init(ExecutorContext context) { + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + executorsBolt.processExecutorElem(outputTag, elem); + } + + @Override + public void cleanup() {} + + @Override + public String toString() { + return description; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java new file mode 100644 index 0000000..a6c3c16 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import com.google.common.collect.Iterables; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + +import static com.google.common.base.Preconditions.checkArgument; + +public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor { + private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class); + + private final String description; + private WindowFn<T, W> windowFn; + private ExecutorsBolt executorsBolt; + private TupleTag outputTag; + + class JStormAssignContext<InputT, W extends BoundedWindow> + extends WindowFn<InputT, W>.AssignContext { + private final WindowedValue<InputT> value; + + JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { + fn.super(); + checkArgument( + Iterables.size(value.getWindows()) == 1, + String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); + this.value = value; + } + + @Override + public InputT element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); + } + } + + public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) { + this.description = description; + this.windowFn = windowFn; + this.outputTag = outputTag; + } + + @Override + public void init(ExecutorContext context) { + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + Collection<W> windows = null; + try { + windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem)); + for (W window: windows) { + executorsBolt.processExecutorElem( + outputTag, + WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane())); + } + } catch (Exception e) { + LOG.warn("Failed to assign windows for elem=" + elem, e); + } + } + + @Override + public void cleanup() {} + + + @Override + public String toString() { + return description; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java new file mode 100644 index 0000000..eaf0549 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.state; + +import com.alibaba.jstorm.cache.ComposedKey; +import com.alibaba.jstorm.cache.IKvStore; +import com.alibaba.jstorm.cache.KvStoreIterable; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.ReadableState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * JStorm implementation of {@link BagState}. + */ +class JStormBagState<K, T> implements BagState<T> { + private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class); + + @Nullable + private final K key; + private final StateNamespace namespace; + private final IKvStore<ComposedKey, T> kvState; + private final IKvStore<ComposedKey, Object> stateInfoKvState; + private int elemIndex; + + public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState, + IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException { + this.key = key; + this.namespace = checkNotNull(namespace, "namespace"); + this.kvState = checkNotNull(kvState, "kvState"); + this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState"); + + Integer index = (Integer) stateInfoKvState.get(getComposedKey()); + this.elemIndex = index != null ? ++index : 0; + } + + @Override + public void add(T input) { + try { + kvState.put(getComposedKey(elemIndex), input); + stateInfoKvState.put(getComposedKey(), elemIndex); + elemIndex++; + } catch (IOException e) { + throw new RuntimeException(e.getCause()); + } + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public Boolean read() { + return elemIndex <= 0; + } + + @Override + public ReadableState<Boolean> readLater() { + // TODO: support prefetch. + return this; + } + }; + } + + @Override + public Iterable<T> read() { + return new BagStateIterable(elemIndex); + } + + @Override + public BagState readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + try { + for (int i = 0; i < elemIndex; i++) { + kvState.remove(getComposedKey(i)); + } + stateInfoKvState.remove(getComposedKey()); + elemIndex = 0; + } catch (IOException e) { + throw new RuntimeException(e.getCause()); + } + } + + private ComposedKey getComposedKey() { + return ComposedKey.of(key, namespace); + } + + private ComposedKey getComposedKey(int elemIndex) { + return ComposedKey.of(key, namespace, elemIndex); + } + + private class BagStateIterable implements KvStoreIterable<T> { + + private class BagStateIterator implements Iterator<T> { + private final int size; + private int cursor = 0; + + BagStateIterator() { + Integer s = null; + try { + s = (Integer) stateInfoKvState.get(getComposedKey()); + } catch (IOException e) { + LOG.error("Failed to get elemIndex for key={}", getComposedKey()); + } + this.size = s != null ? ++s : 0; + } + + @Override + public boolean hasNext() { + return cursor < size; + } + + @Override + public T next() { + if (cursor >= size) { + throw new NoSuchElementException(); + } + + T value = null; + try { + value = kvState.get(getComposedKey(cursor)); + } catch (IOException e) { + LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor)); + } + cursor++; + return value; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private final int size; + + BagStateIterable(int size) { + this.size = size; + } + + @Override + public Iterator<T> iterator() { + return new BagStateIterator(); + } + + @Override + public String toString() { + return String.format("BagStateIterable: composedKey=%s", getComposedKey()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java new file mode 100644 index 0000000..b0fe29b --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.state; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.ReadableState; +import org.apache.beam.sdk.transforms.Combine; + +/** + * JStorm implementation of {@link CombiningState}. + */ +public class JStormCombiningState<InputT, AccumT, OutputT> + implements CombiningState<InputT, AccumT, OutputT> { + + @Nullable + private final BagState<AccumT> accumBagState; + private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; + JStormCombiningState( + BagState<AccumT> accumBagState, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { + this.accumBagState = checkNotNull(accumBagState, "accumBagState"); + this.combineFn = checkNotNull(combineFn, "combineFn"); + } + + @Override + public AccumT getAccum() { + // TODO: replacing the accumBagState with the merged accum. + return combineFn.mergeAccumulators(accumBagState.read()); + } + + @Override + public void addAccum(AccumT accumT) { + accumBagState.add(accumT); + } + + @Override + public AccumT mergeAccumulators(Iterable<AccumT> iterable) { + return combineFn.mergeAccumulators(iterable); + } + + @Override + public void add(InputT input) { + accumBagState.add( + combineFn.addInput(combineFn.createAccumulator(), input)); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return accumBagState.isEmpty(); + } + + @Override + public OutputT read() { + return combineFn.extractOutput( + combineFn.mergeAccumulators(accumBagState.read())); + } + + @Override + public CombiningState<InputT, AccumT, OutputT> readLater() { + // TODO: support prefetch. + return this; + } + + @Override + public void clear() { + accumBagState.clear(); + } +}
