Repository: beam Updated Branches: refs/heads/jstorm-runner 80bd7f8be -> d24d2831d
JStorm-runner: Implementation of processing timer Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a0d3896 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a0d3896 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a0d3896 Branch: refs/heads/jstorm-runner Commit: 6a0d389667369ec7d4f85469e6954d47097b7b68 Parents: 80bd7f8 Author: basti.lj <[email protected]> Authored: Thu Sep 7 18:42:10 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Sep 7 19:08:20 2017 +0800 ---------------------------------------------------------------------- .../jstorm/translation/ExecutorsBolt.java | 39 +++++++++----- .../jstorm/translation/TimerService.java | 3 +- .../jstorm/translation/TimerServiceImpl.java | 54 +++++++++++--------- 3 files changed, 58 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6a0d3896/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index 3d58a37..aca2ca4 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -19,12 +19,14 @@ package org.apache.beam.runners.jstorm.translation; import static com.google.common.base.Preconditions.checkNotNull; +import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBatchBolt; import backtype.storm.tuple.ITupleExt; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.TupleUtils; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.cache.KvStoreManagerFactory; import com.alibaba.jstorm.cluster.Common; @@ -44,6 +46,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -230,20 +233,25 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { @Override public void execute(Tuple input) { - // process a batch - String streamId = input.getSourceStreamId(); - ITupleExt tuple = (ITupleExt) input; - Iterator<List<Object>> valueIterator = tuple.batchValues().iterator(); - if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { - while (valueIterator.hasNext()) { - processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); - } + if (TupleUtils.isTick(input)) { + // tick to trigger processing timer + timerService.fireTimers(Instant.now().getMillis(), TimeDomain.PROCESSING_TIME); } else { - doFnStartBundle(); - while (valueIterator.hasNext()) { - processElement(valueIterator.next(), streamId); + // process a batch + String streamId = input.getSourceStreamId(); + ITupleExt tuple = (ITupleExt) input; + Iterator<List<Object>> valueIterator = tuple.batchValues().iterator(); + if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { + while (valueIterator.hasNext()) { + processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); + } + } else { + doFnStartBundle(); + while (valueIterator.hasNext()) { + processElement(valueIterator.next(), streamId); + } + doFnFinishBundle(); } - doFnFinishBundle(); } } @@ -256,7 +264,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { if (newWaterMark != 0) { // Some buffer windows are going to be triggered. doFnStartBundle(); - timerService.fireTimers(newWaterMark); + timerService.fireTimers(newWaterMark, TimeDomain.EVENT_TIME); // SideInput: If receiving water mark with max timestamp, It means no more data is supposed // to be received from now on. So we are going to process all push back data. @@ -310,7 +318,10 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { @Override public Map<String, Object> getComponentConfiguration() { - return null; + Map conf = Maps.newHashMap(); + // Add tick tuple for triggering processing timer + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1); + return conf; } public TimerService timerService() { http://git-wip-us.apache.org/repos/asf/beam/blob/6a0d3896/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java index 159fe70..1265143 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.jstorm.translation; import java.io.Serializable; import java.util.List; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.state.TimeDomain; import org.joda.time.Instant; /** @@ -47,7 +48,7 @@ interface TimerService extends Serializable { void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor); - void fireTimers(long newWatermark); + void fireTimers(long currentTime, TimeDomain timeDomain); void deleteTimer(TimerInternals.TimerData timerData); } http://git-wip-us.apache.org/repos/asf/beam/blob/6a0d3896/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java index 027fc14..ea4b1bb 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.jstorm.translation; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.alibaba.jstorm.utils.Pair; @@ -49,6 +48,8 @@ class TimerServiceImpl implements TimerService { private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>(); private final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>(); + private final PriorityQueue<TimerInternals.TimerData> processTimeTimersQueue = + new PriorityQueue<>(); private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>> timerDataToKeyedExecutors = Maps.newHashMap(); @@ -90,20 +91,6 @@ class TimerServiceImpl implements TimerService { } @Override - public void fireTimers(long newWatermark) { - TimerInternals.TimerData timerData; - while ((timerData = eventTimeTimersQueue.peek()) != null - && timerData.getTimestamp().getMillis() <= newWatermark) { - for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) { - DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst()); - executor.onTimer(keyedExecutor.getSecond(), timerData); - } - eventTimeTimersQueue.remove(); - timerDataToKeyedExecutors.remove(timerData); - } - } - - @Override public long currentInputWatermark() { return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); } @@ -141,24 +128,45 @@ class TimerServiceImpl implements TimerService { @Override public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) { - checkArgument( - TimeDomain.EVENT_TIME.equals(timerData.getDomain()), - String.format("Does not support domain: %s.", timerData.getDomain())); Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData); if (keyedExecutors == null) { keyedExecutors = Sets.newHashSet(); - eventTimeTimersQueue.add(timerData); + getTimerQueue(timerData.getDomain()).add(timerData); } keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); timerDataToKeyedExecutors.put(timerData, keyedExecutors); } @Override + public void fireTimers(long currentTime, TimeDomain timeDomain) { + TimerInternals.TimerData timerData; + PriorityQueue<TimerInternals.TimerData> timerQueue = getTimerQueue(timeDomain); + while ((timerData = timerQueue.peek()) != null + && timerData.getTimestamp().getMillis() <= currentTime) { + for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) { + DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst()); + executor.onTimer(keyedExecutor.getSecond(), timerData); + } + timerQueue.remove(); + timerDataToKeyedExecutors.remove(timerData); + } + } + + @Override public void deleteTimer(TimerInternals.TimerData timerData) { - checkArgument( - TimeDomain.EVENT_TIME.equals(timerData.getDomain()), - String.format("Does not support domain: %s.", timerData.getDomain())); - eventTimeTimersQueue.remove(timerData); + getTimerQueue(timerData.getDomain()).remove(timerData); timerDataToKeyedExecutors.remove(timerData); } + + private PriorityQueue<TimerInternals.TimerData> getTimerQueue(TimeDomain timeDomain) { + switch (timeDomain) { + case EVENT_TIME : + return eventTimeTimersQueue; + case PROCESSING_TIME: + return processTimeTimersQueue; + default: + throw new IllegalArgumentException( + String.format("Does not support domain: %s.", timeDomain)); + } + } }
