jstorm-runner: support deleteTimer in JStormTimerInternals.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/18198330 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/18198330 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/18198330 Branch: refs/heads/jstorm-runner Commit: 18198330d42a13d3d8dd96cccdbd07ba077b9408 Parents: af5221c Author: Pei He <[email protected]> Authored: Tue Jul 18 20:07:19 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:58 2017 +0800 ---------------------------------------------------------------------- .../runners/jstorm/translation/JStormTimerInternals.java | 3 +-- .../beam/runners/jstorm/translation/TimerService.java | 2 ++ .../beam/runners/jstorm/translation/TimerServiceImpl.java | 9 +++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java index 4c96541..0e9ee35 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java @@ -69,8 +69,7 @@ class JStormTimerInternals<K> implements TimerInternals { @Override @Deprecated public void deleteTimer(TimerData timerData) { - throw new UnsupportedOperationException( - "Canceling of a timer is not yet supported."); + timerService.deleteTimer(timerData); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java index 29345aa..24a9050 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java @@ -48,4 +48,6 @@ interface TimerService extends Serializable { void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor); void fireTimers(long newWatermark); + + void deleteTimer(TimerInternals.TimerData timerData); } http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java index c2600e5..6b463db 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java @@ -152,4 +152,13 @@ class TimerServiceImpl implements TimerService { keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); timerDataToKeyedExecutors.put(timerData, keyedExecutors); } + + @Override + public void deleteTimer(TimerInternals.TimerData timerData) { + checkArgument( + TimeDomain.EVENT_TIME.equals(timerData.getDomain()), + String.format("Does not support domain: %s.", timerData.getDomain())); + eventTimeTimersQueue.remove(timerData); + timerDataToKeyedExecutors.remove(timerData); + } }
