Repository: incubator-beam
Updated Branches:
  refs/heads/master 0d0a5e287 -> 4843dc59c


Require TimeDomain to delete a timer


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35a02740
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35a02740
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35a02740

Branch: refs/heads/master
Commit: 35a02740748182ee52729d8bfb621a3c342b8312
Parents: 0d0a5e2
Author: Kenneth Knowles <k...@google.com>
Authored: Tue Dec 20 20:09:25 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Dec 21 08:20:28 2016 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java          |  8 ++++++++
 .../beam/runners/core/InMemoryTimerInternals.java  |  8 ++++++++
 .../beam/runners/direct/DirectTimerInternals.java  |  8 ++++++++
 .../wrappers/streaming/WindowDoFnOperator.java     |  9 +++++++++
 .../org/apache/beam/sdk/util/TimerInternals.java   | 17 +++++++++++++++--
 5 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 48ac177..49ec1c8 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -425,12 +425,19 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator {
    */
   public class ApexTimerInternals implements TimerInternals {
 
+    @Deprecated
     @Override
     public void setTimer(TimerData timerData) {
       registerActiveTimer(context.element().key(), timerData);
     }
 
     @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
+      throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");
+    }
+
+    @Deprecated
+    @Override
     public void deleteTimer(TimerData timerKey) {
       unregisterActiveTimer(context.element().key(), timerKey);
     }
@@ -463,6 +470,7 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator {
       throw new UnsupportedOperationException("Setting timer by ID not yet 
supported.");
     }
 
+    @Deprecated
     @Override
     public void deleteTimer(StateNamespace namespace, String timerId) {
       throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5fcd088..5ddd5a7 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -107,6 +107,7 @@ public class InMemoryTimerInternals implements 
TimerInternals {
     throw new UnsupportedOperationException("Setting a timer by ID is not yet 
supported.");
   }
 
+  @Deprecated
   @Override
   public void setTimer(TimerData timerData) {
     WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), 
timerData);
@@ -116,10 +117,17 @@ public class InMemoryTimerInternals implements 
TimerInternals {
   }
 
   @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
+    throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
+  }
+
+  @Deprecated
+  @Override
   public void deleteTimer(StateNamespace namespace, String timerId) {
     throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
   }
 
+  @Deprecated
   @Override
   public void deleteTimer(TimerData timer) {
     WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), 
timer);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 8970b4b..5ca276d 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -52,16 +52,24 @@ class DirectTimerInternals implements TimerInternals {
     throw new UnsupportedOperationException("Setting timer by ID not yet 
supported.");
   }
 
+  @Deprecated
   @Override
   public void setTimer(TimerData timerData) {
     timerUpdateBuilder.setTimer(timerData);
   }
 
   @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
+    throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");
+  }
+
+  @Deprecated
+  @Override
   public void deleteTimer(StateNamespace namespace, String timerId) {
     throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");
   }
 
+  @Deprecated
   @Override
   public void deleteTimer(TimerData timerKey) {
     timerUpdateBuilder.deletedTimer(timerKey);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9cea529..5398d7b 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -455,6 +455,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
           throw new UnsupportedOperationException("Setting a timer by ID is 
not yet supported.");
         }
 
+        @Deprecated
         @Override
         public void setTimer(TimerData timerKey) {
           if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
@@ -468,11 +469,19 @@ public class WindowDoFnOperator<K, InputT, OutputT>
         }
 
         @Override
+        public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
+          throw new UnsupportedOperationException(
+              "Canceling of a timer by ID is not yet supported.");
+        }
+
+        @Deprecated
+        @Override
         public void deleteTimer(StateNamespace namespace, String timerId) {
           throw new UnsupportedOperationException(
               "Canceling of a timer by ID is not yet supported.");
         }
 
+        @Deprecated
         @Override
         public void deleteTimer(TimerData timerKey) {
           if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index c3e498e..0bfcddc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
@@ -61,18 +62,30 @@ public interface TimerInternals {
   void setTimer(StateNamespace namespace, String timerId, Instant target, 
TimeDomain timeDomain);
 
   /**
-   * Sets the timer described by {@code timerData}.
+   * @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}.
    */
+  @Deprecated
   void setTimer(TimerData timerData);
 
   /**
    * Deletes the given timer.
+   *
+   * <p>A timer's ID is enforced to be unique in validation of a {@link DoFn}, 
but runners
+   * often manage timers for different time domains in very different ways, 
thus the
+   * {@link TimeDomain} is a required parameter.
+   */
+  void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain);
+
+  /**
+   * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}.
    */
+  @Deprecated
   void deleteTimer(StateNamespace namespace, String timerId);
 
   /**
-   * Deletes the timer with the ID contained in the provided {@link TimerData}.
+   * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}.
    */
+  @Deprecated
   void deleteTimer(TimerData timerKey);
 
   /**

Reply via email to