http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java index 9df1e17..e80fb48 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; -import java.io.IOException; -import java.util.*; +import static com.google.common.base.Preconditions.checkNotNull; import avro.shaded.com.google.common.base.Joiner; import avro.shaded.com.google.common.collect.Sets; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; import backtype.storm.tuple.ITupleExt; -import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.cache.KvStoreManagerFactory; import com.alibaba.jstorm.cluster.Common; @@ -31,6 +33,14 @@ import com.alibaba.jstorm.utils.KryoSerializer; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -39,289 +49,287 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import static com.google.common.base.Preconditions.checkNotNull; - public class ExecutorsBolt extends AdaptorBasicBolt { - private static final long serialVersionUID = -7751043327801735211L; + private static final long serialVersionUID = -7751043327801735211L; - private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); + private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); - protected ExecutorContext executorContext; + protected ExecutorContext executorContext; - protected TimerService timerService; + protected TimerService timerService; - // map from input tag to executor inside bolt - protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); - // set of all output tags that will be emit outside bolt - protected final Set<TupleTag> outputTags = Sets.newHashSet(); - protected final Set<TupleTag> externalOutputTags = Sets.newHashSet(); - protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet(); - protected int internalDoFnExecutorId = 1; - protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap(); + // map from input tag to executor inside bolt + protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap(); + // set of all output tags that will be emit outside bolt + protected final Set<TupleTag> outputTags = Sets.newHashSet(); + protected final Set<TupleTag> externalOutputTags = Sets.newHashSet(); + protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet(); + protected int internalDoFnExecutorId = 1; + protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap(); - protected OutputCollector collector; + protected OutputCollector collector; - protected boolean isStatefulBolt = false; + protected boolean isStatefulBolt = false; - protected KryoSerializer<WindowedValue> serializer; + protected KryoSerializer<WindowedValue> serializer; - public ExecutorsBolt() { + public ExecutorsBolt() { - } - - public void setStatefulBolt(boolean isStateful) { - isStatefulBolt = isStateful; - } - - public void addExecutor(TupleTag inputTag, Executor executor) { - inputTagToExecutor.put( - checkNotNull(inputTag, "inputTag"), - checkNotNull(executor, "executor")); - } - - public Map<TupleTag, Executor> getExecutors() { - return inputTagToExecutor; - } - - public void registerExecutor(Executor executor) { - if (executor instanceof DoFnExecutor) { - DoFnExecutor doFnExecutor = (DoFnExecutor) executor; - idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor); - doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId); - internalDoFnExecutorId++; - } - } - - public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() { - return idToDoFnExecutor; - } + } - public void addOutputTags(TupleTag tag) { - outputTags.add(tag); - } + public void setStatefulBolt(boolean isStateful) { + isStatefulBolt = isStateful; + } - public void addExternalOutputTag(TupleTag<?> tag) { - externalOutputTags.add(tag); - } + public void addExecutor(TupleTag inputTag, Executor executor) { + inputTagToExecutor.put( + checkNotNull(inputTag, "inputTag"), + checkNotNull(executor, "executor")); + } - public Set<TupleTag> getOutputTags() { - return outputTags; - } + public Map<TupleTag, Executor> getExecutors() { + return inputTagToExecutor; + } - public ExecutorContext getExecutorContext() { - return executorContext; + public void registerExecutor(Executor executor) { + if (executor instanceof DoFnExecutor) { + DoFnExecutor doFnExecutor = (DoFnExecutor) executor; + idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor); + doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId); + internalDoFnExecutorId++; } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - LOG.info("Start to prepare for task-{}", context.getThisTaskId()); - try { - this.collector = collector; - - // init kv store manager - String storeName = String.format("task-%d", context.getThisTaskId()); - String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); - IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) : - KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt); - this.executorContext = ExecutorContext.of(context, this, kvStoreManager); - - // init time service - timerService = initTimerService(); - - // init all internal executors - for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { - executor.init(executorContext); - if (executor instanceof DoFnExecutor) { - doFnExecutors.add((DoFnExecutor) executor); - } - } - - this.serializer = new KryoSerializer<WindowedValue>(stormConf); - - LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values()); - LOG.info("inputTagToExecutor={}", inputTagToExecutor); - LOG.info("outputTags={}", outputTags); - LOG.info("externalOutputTags={}", externalOutputTags); - LOG.info("doFnExecutors={}", doFnExecutors); - } catch (IOException e) { - throw new RuntimeException("Failed to prepare executors bolt", e); + } + + public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() { + return idToDoFnExecutor; + } + + public void addOutputTags(TupleTag tag) { + outputTags.add(tag); + } + + public void addExternalOutputTag(TupleTag<?> tag) { + externalOutputTags.add(tag); + } + + public Set<TupleTag> getOutputTags() { + return outputTags; + } + + public ExecutorContext getExecutorContext() { + return executorContext; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + LOG.info("Start to prepare for task-{}", context.getThisTaskId()); + try { + this.collector = collector; + + // init kv store manager + String storeName = String.format("task-%d", context.getThisTaskId()); + String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); + IKvStoreManager kvStoreManager = isStatefulBolt ? + KvStoreManagerFactory.getKvStoreManagerWithMonitor( + context, storeName, stateStorePath, isStatefulBolt) : + KvStoreManagerFactory.getKvStoreManager( + stormConf, storeName, stateStorePath, isStatefulBolt); + this.executorContext = ExecutorContext.of(context, this, kvStoreManager); + + // init time service + timerService = initTimerService(); + + // init all internal executors + for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { + executor.init(executorContext); + if (executor instanceof DoFnExecutor) { + doFnExecutors.add((DoFnExecutor) executor); } - } + } - public TimerService initTimerService() { - TopologyContext context = executorContext.getTopologyContext(); - List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet()) - .transformAndConcat( - new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() { - @Override - public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) { - if (Common.isSystemComponent(value.getKey())) { - return Collections.EMPTY_LIST; - } else { - return value.getValue(); - } - } - }) - .toList(); - TimerService ret = new TimerServiceImpl(executorContext); - ret.init(tasks); - return ret; - } + this.serializer = new KryoSerializer<WindowedValue>(stormConf); - @Override - public void execute(Tuple input) { - // process a batch - String streamId = input.getSourceStreamId(); - ITupleExt tuple = (ITupleExt) input; - Iterator<List<Object>> valueIterator = tuple.batchValues().iterator(); - if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { - while (valueIterator.hasNext()) { - processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); - } - } else { - doFnStartBundle(); - while (valueIterator.hasNext()) { - processElement(valueIterator.next(), streamId); - } - doFnFinishBundle(); - } + LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values()); + LOG.info("inputTagToExecutor={}", inputTagToExecutor); + LOG.info("outputTags={}", outputTags); + LOG.info("externalOutputTags={}", externalOutputTags); + LOG.info("doFnExecutors={}", doFnExecutors); + } catch (IOException e) { + throw new RuntimeException("Failed to prepare executors bolt", e); } - - private void processWatermark(long watermarkTs, int sourceTask) { - long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs); - LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}", - (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime()); - if (newWaterMark != 0) { - // Some buffer windows are going to be triggered. - doFnStartBundle(); - timerService.fireTimers(newWaterMark); - - // SideInput: If receiving water mark with max timestamp, It means no more data is supposed - // to be received from now on. So we are going to process all push back data. - if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.processAllPushBackElements(); + } + + public TimerService initTimerService() { + TopologyContext context = executorContext.getTopologyContext(); + List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet()) + .transformAndConcat( + new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() { + @Override + public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) { + if (Common.isSystemComponent(value.getKey())) { + return Collections.EMPTY_LIST; + } else { + return value.getValue(); } - } - - doFnFinishBundle(); - } - - long currentWaterMark = timerService.currentOutputWatermark(); - if (!externalOutputTags.isEmpty()) { - collector.flush(); - collector.emit( - CommonInstance.BEAM_WATERMARK_STREAM_ID, - new Values(currentWaterMark)); - LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime()); - } - } - - private void processElement(List<Object> values, String streamId) { - TupleTag inputTag = new TupleTag(streamId); - WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values); - processExecutorElem(inputTag, windowedValue); + } + }) + .toList(); + TimerService ret = new TimerServiceImpl(executorContext); + ret.init(tasks); + return ret; + } + + @Override + public void execute(Tuple input) { + // process a batch + String streamId = input.getSourceStreamId(); + ITupleExt tuple = (ITupleExt) input; + Iterator<List<Object>> valueIterator = tuple.batchValues().iterator(); + if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { + while (valueIterator.hasNext()) { + processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); + } + } else { + doFnStartBundle(); + while (valueIterator.hasNext()) { + processElement(valueIterator.next(), streamId); + } + doFnFinishBundle(); } - - public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { - LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue()); - if (elem != null) { - Executor executor = inputTagToExecutor.get(inputTag); - if (executor != null) { - executor.process(inputTag, elem); - } - if (externalOutputTags.contains(inputTag)) { - emitOutsideBolt(inputTag, elem); - } - } else { - LOG.info("Received null elem for tag={}", inputTag); + } + + private void processWatermark(long watermarkTs, int sourceTask) { + long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs); + LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}", + (new Instant(watermarkTs)).toDateTime(), + sourceTask, + (new Instant(newWaterMark)).toDateTime()); + if (newWaterMark != 0) { + // Some buffer windows are going to be triggered. + doFnStartBundle(); + timerService.fireTimers(newWaterMark); + + // SideInput: If receiving water mark with max timestamp, It means no more data is supposed + // to be received from now on. So we are going to process all push back data. + if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.processAllPushBackElements(); } - } + } - @Override - public void cleanup() { - for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { - executor.cleanup(); - } - executorContext.getKvStoreManager().close(); + doFnFinishBundle(); } - @Override - public Map<String, Object> getComponentConfiguration() { - return null; + long currentWaterMark = timerService.currentOutputWatermark(); + if (!externalOutputTags.isEmpty()) { + collector.flush(); + collector.emit( + CommonInstance.BEAM_WATERMARK_STREAM_ID, + new Values(currentWaterMark)); + LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime()); } - - public TimerService timerService() { - return timerService; + } + + private void processElement(List<Object> values, String streamId) { + TupleTag inputTag = new TupleTag(streamId); + WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values); + processExecutorElem(inputTag, windowedValue); + } + + public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { + LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue()); + if (elem != null) { + Executor executor = inputTagToExecutor.get(inputTag); + if (executor != null) { + executor.process(inputTag, elem); + } + if (externalOutputTags.contains(inputTag)) { + emitOutsideBolt(inputTag, elem); + } + } else { + LOG.info("Received null elem for tag={}", inputTag); } + } - public void setTimerService(TimerService service) { - timerService = service; + @Override + public void cleanup() { + for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) { + executor.cleanup(); } - - private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) { - WindowedValue wv = null; - if (values.size() > 1) { - Object key = values.get(0); - WindowedValue value = serializer.deserialize((byte[]) values.get(1)); - wv = value.withValue(KV.of(key, value.getValue())); - } else { - wv = serializer.deserialize((byte[])values.get(0)); - } - return wv; + executorContext.getKvStoreManager().close(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + public TimerService timerService() { + return timerService; + } + + public void setTimerService(TimerService service) { + timerService = service; + } + + private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) { + WindowedValue wv = null; + if (values.size() > 1) { + Object key = values.get(0); + WindowedValue value = serializer.deserialize((byte[]) values.get(1)); + wv = value.withValue(KV.of(key, value.getValue())); + } else { + wv = serializer.deserialize((byte[]) values.get(0)); } - - protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) { - LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue()); - if (keyedEmit(outputTag.getId())) { - KV kv = (KV) outputValue.getValue(); - byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue())); - // Convert WindowedValue<KV> to <K, WindowedValue<V>> - if (kv.getKey() == null) { - // If key is null, emit "null" string here. Because, null value will be ignored in JStorm. - collector.emit(outputTag.getId(), new Values("null", immutableOutputValue)); - } else { - collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue)); - } - } else { - byte[] immutableOutputValue = serializer.serialize(outputValue); - collector.emit(outputTag.getId(), new Values(immutableOutputValue)); - } + return wv; + } + + protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) { + LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue()); + if (keyedEmit(outputTag.getId())) { + KV kv = (KV) outputValue.getValue(); + byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue())); + // Convert WindowedValue<KV> to <K, WindowedValue<V>> + if (kv.getKey() == null) { + // If key is null, emit "null" string here. Because, null value will be ignored in JStorm. + collector.emit(outputTag.getId(), new Values("null", immutableOutputValue)); + } else { + collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue)); + } + } else { + byte[] immutableOutputValue = serializer.serialize(outputValue); + collector.emit(outputTag.getId(), new Values(immutableOutputValue)); } + } - private void doFnStartBundle() { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.startBundle(); - } + private void doFnStartBundle() { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.startBundle(); } + } - private void doFnFinishBundle() { - for (DoFnExecutor doFnExecutor : doFnExecutors) { - doFnExecutor.finishBundle(); - } + private void doFnFinishBundle() { + for (DoFnExecutor doFnExecutor : doFnExecutors) { + doFnExecutor.finishBundle(); } + } - @Override - public String toString() { - // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString()); - List<String> ret = new ArrayList<>(); + @Override + public String toString() { + // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString()); + List<String> ret = new ArrayList<>(); /*ret.add("inputTags"); for (TupleTag inputTag : inputTagToExecutor.keySet()) { ret.add(inputTag.getId()); }*/ - ret.add("internalExecutors"); - for (Executor executor : inputTagToExecutor.values()) { - ret.add(executor.toString()); - } - ret.add("externalOutputTags"); - for (TupleTag output : externalOutputTags) { - ret.add(output.getId()); - } - return Joiner.on('\n').join(ret).concat("\n"); + ret.add("internalExecutors"); + for (Executor executor : inputTagToExecutor.values()) { + ret.add(executor.toString()); + } + ret.add("externalOutputTags"); + for (TupleTag output : externalOutputTags) { + ret.add(output.getId()); } + return Joiner.on('\n').join(ret).concat("\n"); + } }
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java index 1ef28c9..5a07243 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java @@ -17,39 +17,40 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; +import static com.google.common.base.Preconditions.checkNotNull; + import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; -import static com.google.common.base.Preconditions.checkNotNull; - public class FlattenExecutor<InputT> implements Executor { - private final String description; - private TupleTag mainOutputTag; - private ExecutorContext context; - private ExecutorsBolt executorsBolt; - - public FlattenExecutor(String description, TupleTag mainTupleTag) { - this.description = checkNotNull(description, "description"); - this.mainOutputTag = mainTupleTag; - } - - @Override - public void init(ExecutorContext context) { - this.context = context; - this.executorsBolt = context.getExecutorsBolt(); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - executorsBolt.processExecutorElem(mainOutputTag, elem); - } - - @Override - public void cleanup() {} - - @Override - public String toString() { - return description; - } + private final String description; + private TupleTag mainOutputTag; + private ExecutorContext context; + private ExecutorsBolt executorsBolt; + + public FlattenExecutor(String description, TupleTag mainTupleTag) { + this.description = checkNotNull(description, "description"); + this.mainOutputTag = mainTupleTag; + } + + @Override + public void init(ExecutorContext context) { + this.context = context; + this.executorsBolt = context.getExecutorsBolt(); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + executorsBolt.processExecutorElem(mainOutputTag, elem); + } + + @Override + public void cleanup() { + } + + @Override + public String toString() { + return description; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java index 299ceb2..625726d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java @@ -17,18 +17,17 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; -import java.io.Serializable; -import java.util.List; +import static com.google.common.base.Preconditions.checkArgument; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; -import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.List; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.StateNamespace; @@ -36,122 +35,138 @@ import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.runners.jstorm.translation.TranslationContext; +import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext; +import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; +import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; +import org.apache.beam.runners.jstorm.util.RunnerUtils; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; - -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext; -import org.apache.beam.runners.jstorm.util.RunnerUtils; +import org.apache.beam.sdk.values.WindowingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.base.Preconditions.checkArgument; - -public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> { - private static final long serialVersionUID = -7563050475488610553L; - - private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class); - - private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable { - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - executorsBolt.processExecutorElem(tag, output); - } - } - - private KvCoder<K, V> inputKvCoder; - private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; - - public GroupByWindowExecutor( - String stepName, - String description, - TranslationContext context, - JStormPipelineOptions pipelineOptions, - WindowingStrategy<?, ?> windowingStrategy, - TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) { - // The doFn will be created when runtime. Just pass "null" here - super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags); - - this.outputManager = new GroupByWindowOutputManager(); - UserGraphContext userGraphContext = context.getUserGraphContext(); - PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput(); - this.inputKvCoder = (KvCoder<K, V>) input.getCoder(); - } - - private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() { - final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() { - @Override - public StateInternals stateInternalsForKey(K key) { - return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); - } - }; - TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() { - @Override - public TimerInternals timerInternalsForKey(K key) { - return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService()); - } - }; - - reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder()); - DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn = - GroupAlsoByWindowViaWindowSetNewDoFn.create( - windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(), - (SystemReduceFn) reduceFn, outputManager, mainTupleTag); - return doFn; - } - - @Override - protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() { - doFn = getGroupByWindowDoFn(); - - DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner( - this.pipelineOptions, - this.doFn, - NullSideInputReader.empty(), - this.outputManager, - this.mainTupleTag, - this.sideOutputTags, - this.stepContext, - this.windowingStrategy); - - DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner( - simpleRunner, - this.stepContext, - this.windowingStrategy); - return new DoFnRunnerWithMetrics<>( - stepName, doFnRunner, MetricsReporter.create(metricClient)); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - /** - * For GroupByKey, KV type elem is received. We need to convert the KV elem - * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner. - */ - KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); - runner.processElement(elem.withValue(keyedWorkItem)); - } +public class GroupByWindowExecutor<K, V> + extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> { + private static final long serialVersionUID = -7563050475488610553L; - @Override - public void onTimer(Object key, TimerInternals.TimerData timerData) { - StateNamespace namespace = timerData.getNamespace(); - checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class); - runner.processElement( - WindowedValue.valueInGlobalWindow( - KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData)))); - } + private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable { @Override - public String toString() { - return super.toString(); + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + executorsBolt.processExecutorElem(tag, output); } + } + + private KvCoder<K, V> inputKvCoder; + private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; + + public GroupByWindowExecutor( + String stepName, + String description, + TranslationContext context, + JStormPipelineOptions pipelineOptions, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) { + // The doFn will be created when runtime. Just pass "null" here + super( + stepName, + description, + pipelineOptions, + null, + null, + windowingStrategy, + null, + null, + null, + mainTupleTag, + sideOutputTags); + + this.outputManager = new GroupByWindowOutputManager(); + UserGraphContext userGraphContext = context.getUserGraphContext(); + PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput(); + this.inputKvCoder = (KvCoder<K, V>) input.getCoder(); + } + + private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() { + final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() { + @Override + public StateInternals stateInternalsForKey(K key) { + return new JStormStateInternals<K>( + key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId); + } + }; + TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() { + @Override + public TimerInternals timerInternalsForKey(K key) { + return new JStormTimerInternals<>( + key, + GroupByWindowExecutor.this, + executorContext.getExecutorsBolt().timerService()); + } + }; + + reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder()); + DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn = + GroupAlsoByWindowViaWindowSetNewDoFn.create( + windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(), + (SystemReduceFn) reduceFn, outputManager, mainTupleTag); + return doFn; + } + + @Override + protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() { + doFn = getGroupByWindowDoFn(); + + DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner( + this.pipelineOptions, + this.doFn, + NullSideInputReader.empty(), + this.outputManager, + this.mainTupleTag, + this.sideOutputTags, + this.stepContext, + this.windowingStrategy); + + DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = + DoFnRunners.lateDataDroppingRunner( + simpleRunner, + this.stepContext, + this.windowingStrategy); + return new DoFnRunnerWithMetrics<>( + stepName, doFnRunner, MetricsReporter.create(metricClient)); + } + + @Override + public void process(TupleTag tag, WindowedValue elem) { + /** + * For GroupByKey, KV type elem is received. We need to convert the KV elem + * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner. + */ + KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); + runner.processElement(elem.withValue(keyedWorkItem)); + } + + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + StateNamespace namespace = timerData.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + + runner.processElement( + WindowedValue.valueInGlobalWindow( + KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData)))); + } + + @Override + public String toString() { + return super.toString(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java index cb15ea2..d36d9a6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; +import java.util.Collection; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; @@ -27,49 +30,45 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.List; -import java.util.Map; - public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> { - private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); + private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); - /** - * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag - * is used in downstream consumer. So before output, we need to map this "local" tag to "external" - * tag. See PCollectionTuple for details. - */ - public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - if (localTupleTagMap.containsKey(tag)) { - executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output); - } else { - executorsBolt.processExecutorElem(tag, output); - } - } + /** + * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated + * tag is used in downstream consumer. So before output, we need to map this "local" tag to + * "external" tag. See PCollectionTuple for details. + */ + public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + if (localTupleTagMap.containsKey(tag)) { + executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output); + } else { + executorsBolt.processExecutorElem(tag, output); + } } + } - protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap; + protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap; - public MultiOutputDoFnExecutor( - String stepName, - String description, - JStormPipelineOptions pipelineOptions, - DoFn<InputT, OutputT> doFn, - Coder<WindowedValue<InputT>> inputCoder, - WindowingStrategy<?, ?> windowingStrategy, - TupleTag<InputT> mainInputTag, - Collection<PCollectionView<?>> sideInputs, - Map<TupleTag, PCollectionView<?>> sideInputTagToView, - TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags, - Map<TupleTag<?>, TupleTag<?>> localTupleTagMap - ) { - super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, - sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); - this.localTupleTagMap = localTupleTagMap; - this.outputManager = new MultiOutputDoFnExecutorOutputManager(); - LOG.info("localTupleTagMap: {}", localTupleTagMap); - } + public MultiOutputDoFnExecutor( + String stepName, + String description, + JStormPipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + Coder<WindowedValue<InputT>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, + TupleTag<InputT> mainInputTag, + Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, + TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags, + Map<TupleTag<?>, TupleTag<?>> localTupleTagMap + ) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, + sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); + this.localTupleTagMap = localTupleTagMap; + this.outputManager = new MultiOutputDoFnExecutorOutputManager(); + LOG.info("localTupleTagMap: {}", localTupleTagMap); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java index dd7921f..45ac62a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java @@ -17,10 +17,13 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; -import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; @@ -29,40 +32,37 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; -import java.util.Collection; -import java.util.List; -import java.util.Map; - public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> { - public MultiStatefulDoFnExecutor( - String stepName, String description, - JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn, - Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy, - TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs, - Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag, - List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) { - super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap); - } + public MultiStatefulDoFnExecutor( + String stepName, String description, + JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn, + Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy, + TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs, + Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag, + List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, + sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap); + } - @Override - public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { - if (mainInputTag.equals(tag)) { - WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; - stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, - executorContext.getExecutorsBolt().timerService())); - stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - processMainInput(elem); - } else { - processSideInput(tag, elem); - } + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + if (mainInputTag.equals(tag)) { + WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; + stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, + executorContext.getExecutorsBolt().timerService())); + stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + processMainInput(elem); + } else { + processSideInput(tag, elem); } + } - @Override - public void onTimer(Object key, TimerInternals.TimerData timerData) { - stepContext.setStateInternals(new JStormStateInternals<>(key, - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - super.onTimer(key, timerData); - } + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + stepContext.setStateInternals(new JStormStateInternals<>(key, + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + super.onTimer(key, timerData); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java index 7d20a4c..ba0c052 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java @@ -17,10 +17,13 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; -import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; @@ -29,39 +32,35 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; -import java.util.Collection; -import java.util.List; -import java.util.Map; - public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> { - public StatefulDoFnExecutor( - String stepName, String description, JStormPipelineOptions pipelineOptions, - DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder, - WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag, - Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>> - sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) { - super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, - mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); - } + public StatefulDoFnExecutor( + String stepName, String description, JStormPipelineOptions pipelineOptions, + DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder, + WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag, + Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>> + sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) { + super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, + mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); + } - @Override - public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { - if (mainInputTag.equals(tag)) { - WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; - stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, - executorContext.getExecutorsBolt().timerService())); - stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - processMainInput(elem); - } else { - processSideInput(tag, elem); - } + @Override + public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) { + if (mainInputTag.equals(tag)) { + WindowedValue<KV> kvElem = (WindowedValue<KV>) elem; + stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, + executorContext.getExecutorsBolt().timerService())); + stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + processMainInput(elem); + } else { + processSideInput(tag, elem); } + } - @Override - public void onTimer(Object key, TimerInternals.TimerData timerData) { - stepContext.setStateInternals(new JStormStateInternals<>(key, - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - super.onTimer(key, timerData); - } + @Override + public void onTimer(Object key, TimerInternals.TimerData timerData) { + stepContext.setStateInternals(new JStormStateInternals<>(key, + kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); + super.onTimer(key, timerData); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java index 47db018..5c41bda 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,36 +17,35 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; -import org.apache.beam.runners.core.TimerInternals; -import org.joda.time.Instant; - import java.io.Serializable; import java.util.List; +import org.apache.beam.runners.core.TimerInternals; +import org.joda.time.Instant; /** * Interface that tracks input watermarks and manages timers in each bolt. */ public interface TimerService extends Serializable { - void init(List<Integer> upStreamTasks); + void init(List<Integer> upStreamTasks); - /** - * - * @param task - * @param inputWatermark - * @return new watermark if any timer is triggered during the update of watermark, otherwise 0 - */ - long updateInputWatermark(Integer task, long inputWatermark); + /** + * + * @param task + * @param inputWatermark + * @return new watermark if any timer is triggered during the update of watermark, otherwise 0 + */ + long updateInputWatermark(Integer task, long inputWatermark); - long currentInputWatermark(); + long currentInputWatermark(); - long currentOutputWatermark(); + long currentOutputWatermark(); - void clearWatermarkHold(String namespace); + void clearWatermarkHold(String namespace); - void addWatermarkHold(String namespace, Instant watermarkHold); + void addWatermarkHold(String namespace, Instant watermarkHold); - void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor); + void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor); - void fireTimers(long newWatermark); + void fireTimers(long newWatermark); } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java index 3b864d5..d2514f1 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,134 +17,139 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + import avro.shaded.com.google.common.collect.Maps; import avro.shaded.com.google.common.collect.Sets; import com.alibaba.jstorm.utils.Pair; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.joda.time.Instant; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - /** * Default implementation of {@link TimerService}. */ public class TimerServiceImpl implements TimerService { - private transient ExecutorContext executorContext; - private transient Map<Integer, DoFnExecutor> idToDoFnExecutor; - - private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = new ConcurrentHashMap<>(); - private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>(); - private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>(); - private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>(); - private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>(); - private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>> - timerDataToKeyedExecutors = Maps.newHashMap(); - - private boolean initialized = false; - - public TimerServiceImpl() { - } - - public TimerServiceImpl(ExecutorContext executorContext) { - this.executorContext = executorContext; - this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor(); - } - - @Override - public void init(List<Integer> upStreamTasks) { - for (Integer task : upStreamTasks) { - upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); - inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); - } - initialized = true; - } - - @Override - public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) { - checkState(initialized, "TimerService has not been initialized."); - Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task); - // Make sure the input watermark don't go backward. - if (taskInputWatermark > oldTaskInputWatermark) { - upStreamTaskToInputWatermark.put(task, taskInputWatermark); - inputWatermarks.add(taskInputWatermark); - inputWatermarks.remove(oldTaskInputWatermark); - - long newLocalInputWatermark = currentInputWatermark(); - if (newLocalInputWatermark > oldTaskInputWatermark) { - return newLocalInputWatermark; - } - } - return 0; + private transient ExecutorContext executorContext; + private transient Map<Integer, DoFnExecutor> idToDoFnExecutor; + + private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = + new ConcurrentHashMap<>(); + private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>(); + private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>(); + private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>(); + private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = + new PriorityQueue<>(); + private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>> + timerDataToKeyedExecutors = Maps.newHashMap(); + + private boolean initialized = false; + + public TimerServiceImpl() { + } + + public TimerServiceImpl(ExecutorContext executorContext) { + this.executorContext = executorContext; + this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor(); + } + + @Override + public void init(List<Integer> upStreamTasks) { + for (Integer task : upStreamTasks) { + upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); } - - @Override - public void fireTimers(long newWatermark) { - TimerInternals.TimerData timerData; - while ((timerData = eventTimeTimersQueue.peek()) != null - && timerData.getTimestamp().getMillis() <= newWatermark) { - for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) { - DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst()); - executor.onTimer(keyedExecutor.getSecond(), timerData); - } - eventTimeTimersQueue.remove(); - timerDataToKeyedExecutors.remove(timerData); - } + initialized = true; + } + + @Override + public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) { + checkState(initialized, "TimerService has not been initialized."); + Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task); + // Make sure the input watermark don't go backward. + if (taskInputWatermark > oldTaskInputWatermark) { + upStreamTaskToInputWatermark.put(task, taskInputWatermark); + inputWatermarks.add(taskInputWatermark); + inputWatermarks.remove(oldTaskInputWatermark); + + long newLocalInputWatermark = currentInputWatermark(); + if (newLocalInputWatermark > oldTaskInputWatermark) { + return newLocalInputWatermark; + } } - - @Override - public long currentInputWatermark() { - return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + return 0; + } + + @Override + public void fireTimers(long newWatermark) { + TimerInternals.TimerData timerData; + while ((timerData = eventTimeTimersQueue.peek()) != null + && timerData.getTimestamp().getMillis() <= newWatermark) { + for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) { + DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst()); + executor.onTimer(keyedExecutor.getSecond(), timerData); + } + eventTimeTimersQueue.remove(); + timerDataToKeyedExecutors.remove(timerData); } - - @Override - public long currentOutputWatermark() { - if (watermarkHolds.isEmpty()) { - return currentInputWatermark(); - } else { - return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis()); - } + } + + @Override + public long currentInputWatermark() { + return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + } + + @Override + public long currentOutputWatermark() { + if (watermarkHolds.isEmpty()) { + return currentInputWatermark(); + } else { + return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis()); } - - @Override - public void clearWatermarkHold(String namespace) { - Instant currentHold = namespaceToWatermarkHold.get(namespace); - if (currentHold != null) { - watermarkHolds.remove(currentHold); - namespaceToWatermarkHold.remove(namespace); - } + } + + @Override + public void clearWatermarkHold(String namespace) { + Instant currentHold = namespaceToWatermarkHold.get(namespace); + if (currentHold != null) { + watermarkHolds.remove(currentHold); + namespaceToWatermarkHold.remove(namespace); } - - @Override - public void addWatermarkHold(String namespace, Instant watermarkHold) { - Instant currentHold = namespaceToWatermarkHold.get(namespace); - if (currentHold == null) { - namespaceToWatermarkHold.put(namespace, watermarkHold); - watermarkHolds.add(watermarkHold); - } else if (currentHold != null && watermarkHold.isBefore(currentHold)) { - namespaceToWatermarkHold.put(namespace, watermarkHold); - watermarkHolds.add(watermarkHold); - watermarkHolds.remove(currentHold); - } + } + + @Override + public void addWatermarkHold(String namespace, Instant watermarkHold) { + Instant currentHold = namespaceToWatermarkHold.get(namespace); + if (currentHold == null) { + namespaceToWatermarkHold.put(namespace, watermarkHold); + watermarkHolds.add(watermarkHold); + } else if (currentHold != null && watermarkHold.isBefore(currentHold)) { + namespaceToWatermarkHold.put(namespace, watermarkHold); + watermarkHolds.add(watermarkHold); + watermarkHolds.remove(currentHold); } - - @Override - public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) { - checkArgument( - TimeDomain.EVENT_TIME.equals(timerData.getDomain()), - String.format("Does not support domain: %s.", timerData.getDomain())); - Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData); - if (keyedExecutors == null) { - keyedExecutors = Sets.newHashSet(); - eventTimeTimersQueue.add(timerData); - } - keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); - timerDataToKeyedExecutors.put(timerData, keyedExecutors); + } + + @Override + public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) { + checkArgument( + TimeDomain.EVENT_TIME.equals(timerData.getDomain()), + String.format("Does not support domain: %s.", timerData.getDomain())); + Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData); + if (keyedExecutors == null) { + keyedExecutors = Sets.newHashSet(); + eventTimeTimersQueue.add(timerData); } + keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); + timerDataToKeyedExecutors.put(timerData, keyedExecutors); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java index 0fb88ab..2bd5f7d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java @@ -24,108 +24,107 @@ import backtype.storm.tuple.Tuple; import com.alibaba.jstorm.cache.IKvStore; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor { - private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class); - - private static final String TIME_SERVICE_STORE_ID = "timer_service_store"; - private static final String TIMER_SERVICE_KET = "timer_service_key"; - - private ExecutorsBolt executorsBolt; - private IKvStoreManager kvStoreManager; - private IKvStore<String, TimerService> timerServiceStore; - - public TxExecutorsBolt(ExecutorsBolt executorsBolt) { - this.executorsBolt = executorsBolt; - this.executorsBolt.setStatefulBolt(true); - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - try { - executorsBolt.prepare(stormConf, context, collector); - kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager(); - timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); - } catch (IOException e) { - LOG.error("Failed to prepare stateful bolt", e); - throw new RuntimeException(e.getMessage()); - } - } - - @Override - public void execute(Tuple input) { - executorsBolt.execute(input); - } - - @Override - public void cleanup() { - executorsBolt.cleanup(); + private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class); + + private static final String TIME_SERVICE_STORE_ID = "timer_service_store"; + private static final String TIMER_SERVICE_KET = "timer_service_key"; + + private ExecutorsBolt executorsBolt; + private IKvStoreManager kvStoreManager; + private IKvStore<String, TimerService> timerServiceStore; + + public TxExecutorsBolt(ExecutorsBolt executorsBolt) { + this.executorsBolt = executorsBolt; + this.executorsBolt.setStatefulBolt(true); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + try { + executorsBolt.prepare(stormConf, context, collector); + kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager(); + timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); + } catch (IOException e) { + LOG.error("Failed to prepare stateful bolt", e); + throw new RuntimeException(e.getMessage()); } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - executorsBolt.declareOutputFields(declarer); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return executorsBolt.getComponentConfiguration(); - } - - @Override - public void initState(Object userState) { - LOG.info("Begin to init from state: {}", userState); - restore(userState); - } - - @Override - public Object finishBatch(long batchId) { - try { - timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService()); - } catch (IOException e) { - LOG.error("Failed to store current timer service status", e); - throw new RuntimeException(e.getMessage()); - } - kvStoreManager.checkpoint(batchId); - return null; + } + + @Override + public void execute(Tuple input) { + executorsBolt.execute(input); + } + + @Override + public void cleanup() { + executorsBolt.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + executorsBolt.declareOutputFields(declarer); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return executorsBolt.getComponentConfiguration(); + } + + @Override + public void initState(Object userState) { + LOG.info("Begin to init from state: {}", userState); + restore(userState); + } + + @Override + public Object finishBatch(long batchId) { + try { + timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService()); + } catch (IOException e) { + LOG.error("Failed to store current timer service status", e); + throw new RuntimeException(e.getMessage()); } - - @Override - public Object commit(long batchId, Object state) { - return kvStoreManager.backup(batchId); - } - - @Override - public void rollBack(Object userState) { - LOG.info("Begin to rollback from state: {}", userState); - restore(userState); - } - - @Override - public void ackCommit(long batchId, long timeStamp) { - kvStoreManager.remove(batchId); - } - - private void restore(Object userState) { - try { - // restore all states - kvStoreManager.restore(userState); - - // init timer service - timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); - TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET); - if (timerService == null) { - timerService = executorsBolt.initTimerService(); - } - executorsBolt.setTimerService(timerService); - } catch (IOException e) { - LOG.error("Failed to restore state", e); - throw new RuntimeException(e.getMessage()); - } + kvStoreManager.checkpoint(batchId); + return null; + } + + @Override + public Object commit(long batchId, Object state) { + return kvStoreManager.backup(batchId); + } + + @Override + public void rollBack(Object userState) { + LOG.info("Begin to rollback from state: {}", userState); + restore(userState); + } + + @Override + public void ackCommit(long batchId, long timeStamp) { + kvStoreManager.remove(batchId); + } + + private void restore(Object userState) { + try { + // restore all states + kvStoreManager.restore(userState); + + // init timer service + timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); + TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET); + if (timerService == null) { + timerService = executorsBolt.initTimerService(); + } + executorsBolt.setTimerService(timerService); + } catch (IOException e) { + LOG.error("Failed to restore state", e); + throw new RuntimeException(e.getMessage()); } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java index 22dd07b..16f7d99 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java @@ -24,130 +24,130 @@ import com.alibaba.jstorm.cache.IKvStore; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.cache.KvStoreManagerFactory; import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor; -import org.apache.beam.sdk.io.UnboundedSource; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Map; +import org.apache.beam.sdk.io.UnboundedSource; +import org.slf4j.LoggerFactory; public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor { - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class); - - private static final String SOURCE_STORE_ID = "SourceCheckpoint"; - private static final String CHECKPOINT_MARK = "CheckpointMark"; - - private UnboundedSourceSpout sourceSpout; - private UnboundedSource.UnboundedReader reader; - private IKvStoreManager kvStoreManager; - private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore; - - public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) { - this.sourceSpout = sourceSpout; - } - - private void restore(Object userState) { - try { - kvStoreManager.restore(userState); - sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID); - UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK); - sourceSpout.createSourceReader(checkpointMark); - reader = sourceSpout.getUnboundedSourceReader(); - } catch (IOException e) { - LOG.error("Failed to init state", e); - throw new RuntimeException(e.getMessage()); - } - } - - @Override - public void initState(Object userState) { - restore(userState); - } - - @Override - public Object finishBatch(long checkpointId) { - try { - // Store check point mark from unbounded source reader - UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark(); - sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark); - - // checkpoint all kv stores in current manager - kvStoreManager.checkpoint(checkpointId); - } catch (IOException e) { - LOG.error(String.format("Failed to finish batch-%s", checkpointId), e); - throw new RuntimeException(e.getMessage()); - } - return null; - } - - @Override - public Object commit(long batchId, Object state) { - // backup kv stores to remote state backend - return kvStoreManager.backup(batchId); - } - - @Override - public void rollBack(Object userState) { - restore(userState); - } - - @Override - public void ackCommit(long batchId, long timeStamp) { - // remove obsolete state in bolt local and remote state backend - kvStoreManager.remove(batchId); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - sourceSpout.declareOutputFields(declarer); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return sourceSpout.getComponentConfiguration(); - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - try { - sourceSpout.open(conf, context, collector); - String storeName = String.format("task-%s", context.getThisTaskId()); - String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); - kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, storePath, true); - - reader = sourceSpout.getUnboundedSourceReader(); - } catch (IOException e) { - LOG.error("Failed to open transactional unbounded source spout", e); - throw new RuntimeException(e.getMessage()); - } - } - - @Override - public void close() { - sourceSpout.close(); - } - - @Override - public void activate() { - sourceSpout.activate(); - } - - @Override - public void deactivate() { - sourceSpout.deactivate(); - } - - @Override - public void nextTuple() { - sourceSpout.nextTuple(); - } - - @Override - public void ack(Object msgId) { - throw new UnsupportedOperationException(); - } - - @Override - public void fail(Object msgId) { - throw new UnsupportedOperationException(); - } + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class); + + private static final String SOURCE_STORE_ID = "SourceCheckpoint"; + private static final String CHECKPOINT_MARK = "CheckpointMark"; + + private UnboundedSourceSpout sourceSpout; + private UnboundedSource.UnboundedReader reader; + private IKvStoreManager kvStoreManager; + private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore; + + public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) { + this.sourceSpout = sourceSpout; + } + + private void restore(Object userState) { + try { + kvStoreManager.restore(userState); + sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID); + UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK); + sourceSpout.createSourceReader(checkpointMark); + reader = sourceSpout.getUnboundedSourceReader(); + } catch (IOException e) { + LOG.error("Failed to init state", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void initState(Object userState) { + restore(userState); + } + + @Override + public Object finishBatch(long checkpointId) { + try { + // Store check point mark from unbounded source reader + UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark(); + sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark); + + // checkpoint all kv stores in current manager + kvStoreManager.checkpoint(checkpointId); + } catch (IOException e) { + LOG.error(String.format("Failed to finish batch-%s", checkpointId), e); + throw new RuntimeException(e.getMessage()); + } + return null; + } + + @Override + public Object commit(long batchId, Object state) { + // backup kv stores to remote state backend + return kvStoreManager.backup(batchId); + } + + @Override + public void rollBack(Object userState) { + restore(userState); + } + + @Override + public void ackCommit(long batchId, long timeStamp) { + // remove obsolete state in bolt local and remote state backend + kvStoreManager.remove(batchId); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + sourceSpout.declareOutputFields(declarer); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return sourceSpout.getComponentConfiguration(); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + try { + sourceSpout.open(conf, context, collector); + String storeName = String.format("task-%s", context.getThisTaskId()); + String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); + kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor( + context, storeName, storePath, true); + + reader = sourceSpout.getUnboundedSourceReader(); + } catch (IOException e) { + LOG.error("Failed to open transactional unbounded source spout", e); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void close() { + sourceSpout.close(); + } + + @Override + public void activate() { + sourceSpout.activate(); + } + + @Override + public void deactivate() { + sourceSpout.deactivate(); + } + + @Override + public void nextTuple() { + sourceSpout.nextTuple(); + } + + @Override + public void ack(Object msgId) { + throw new UnsupportedOperationException(); + } + + @Override + public void fail(Object msgId) { + throw new UnsupportedOperationException(); + } } \ No newline at end of file
