This closes #1160: Support set and delete of timer by ID in InMemoryTimerInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7ee8c86d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7ee8c86d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7ee8c86d Branch: refs/heads/python-sdk Commit: 7ee8c86d3b0553d8cb7de60b0dc1a03103dfbbc5 Parents: a9447a2 df2e540 Author: Kenneth Knowles <k...@google.com> Authored: Wed Dec 21 11:02:02 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 11:02:02 2016 -0800 ---------------------------------------------------------------------- .../runners/core/InMemoryTimerInternals.java | 65 +++++++---- .../core/InMemoryTimerInternalsTest.java | 112 +++++++++++++------ 2 files changed, 120 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ee8c86d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --cc runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 5ddd5a7,292ac23..2c3d78a --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@@ -104,10 -106,9 +106,10 @@@ public class InMemoryTimerInternals imp @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); } + @Deprecated @Override public void setTimer(TimerData timerData) { WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); @@@ -117,17 -133,13 +134,20 @@@ } @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + } + + @Deprecated + @Override public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + TimerData existing = existingTimers.get(namespace, timerId); + if (existing != null) { + deleteTimer(existing); + } } + @Deprecated @Override public void deleteTimer(TimerData timer) { WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);