This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new a29f29a SAMZA-2519: Support duplicate timer registration (#1355)
a29f29a is described below
commit a29f29a6d56a4dfb1c062a97728bcb11bd824275
Author: mynameborat <[email protected]>
AuthorDate: Fri May 8 10:20:34 2020 -0700
SAMZA-2519: Support duplicate timer registration (#1355)
---
.../apache/samza/scheduler/EpochTimeScheduler.java | 35 +++++-
.../samza/scheduler/TestEpochTimeScheduler.java | 121 +++++++++++++++++++++
2 files changed, 151 insertions(+), 5 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
index cbebbde..4b1e281 100644
---
a/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
+++
b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
@@ -19,14 +19,15 @@
package org.apache.samza.scheduler;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Per-task scheduler for keyed timers.
@@ -36,7 +37,7 @@ import static com.google.common.base.Preconditions.checkState;
* 3) triggers listener whenever a timer fires.
*/
public class EpochTimeScheduler {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(EpochTimeScheduler.class);
/**
* For run loop to listen to timer firing so it can schedule the callbacks.
*/
@@ -57,9 +58,33 @@ public class EpochTimeScheduler {
this.executor = executor;
}
+ @VisibleForTesting
+ Map<Object, ScheduledFuture> getScheduledFutures() {
+ return scheduledFutures;
+ }
+
public <K> void setTimer(K key, long timestamp, ScheduledCallback<K>
callback) {
- checkState(!scheduledFutures.containsKey(key),
- String.format("Duplicate key %s registration for the same timer",
key));
+ if (scheduledFutures.containsKey(key)) {
+ LOG.warn("Registering duplicate callback for key: {}. Attempting to
cancel the previous callback", key);
+ ScheduledFuture<?> scheduledFuture = scheduledFutures.get(key);
+
+ /*
+ * We can have a race between the time we check for the presence of the
key and the time we attempt to cancel;
+ * Hence we check for non-null criteria to ensure the executor hasn't
kicked off the callback for the key which
+ * removes the future from the map before invoking onTimer.
+ * 1. In the event that callback is running then we will not attempt to
interrupt the action and
+ * cancel will return as unsuccessful.
+ * 2. In case of the callback successfully executed, we want to allow
duplicate registration to keep the
+ * behavior consistent with the scenario where the callback is
already executed or in progress even before
+ * we entered this condition.
+ */
+ if (scheduledFuture != null
+ && !scheduledFuture.cancel(false)
+ && !scheduledFuture.isDone()) {
+ LOG.warn("Failed to cancel the previous callback successfully.
Ignoring the current request to register new callback");
+ return;
+ }
+ }
final long delay = timestamp - System.currentTimeMillis();
final ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
diff --git
a/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
index 5db908c..4fd3dcf 100644
---
a/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
+++
b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
@@ -21,6 +21,8 @@ package org.apache.samza.scheduler;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.apache.samza.task.MessageCollector;
@@ -57,6 +59,125 @@ public class TestEpochTimeScheduler {
}
@Test
+ @SuppressWarnings("unchecked")
+ public void testDuplicateTimerWithCancelableCallback() {
+ final String timerKey = "timer-1";
+ ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+ ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class);
+ ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+ when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(true);
+ when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+ .thenReturn(mockScheduledFuture1)
+ .thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return mockScheduledFuture2;
+ });
+
+ EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+ long timestamp = System.currentTimeMillis() + 10000;
+
+ ScheduledCallback<String> expectedScheduledCallback =
mock(ScheduledCallback.class);
+ scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+ scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback);
+
+ // verify the interactions with the scheduled future and the scheduler
+ verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(),
anyObject());
+ verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+
+ // verify the ready timer and its callback contents to ensure the second
invocation callback overwrites the
+ // first callback
+ Set<Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback>>
readyTimers =
+ scheduler.removeReadyTimers().entrySet();
+ assertEquals("Only one timer should be ready to be fired",
readyTimers.size(), 1);
+
+ Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback> timerEntry =
readyTimers.iterator().next();
+ assertEquals("Expected the scheduled callback from the second invocation",
+ timerEntry.getValue(),
+ expectedScheduledCallback);
+ assertEquals("Expected timer-1 as the key for ready timer",
+ timerEntry.getKey().getKey(),
+ timerKey);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testDuplicateTimerWithUnsuccessfulCancellation() {
+ final String timerKey = "timer-1";
+ ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+ ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+ when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false);
+ when(mockScheduledFuture1.isDone()).thenReturn(false);
+ when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+ .thenReturn(mockScheduledFuture1);
+
+ EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+ long timestamp = System.currentTimeMillis() + 10000;
+
+ scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+ scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+
+ // verify the interactions with the scheduled future and the scheduler
+ verify(executor, times(1)).schedule((Runnable) anyObject(), anyLong(),
anyObject());
+ verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+ verify(mockScheduledFuture1, times(1)).isDone();
+
+ Map<Object, ScheduledFuture> scheduledFutures =
scheduler.getScheduledFutures();
+ assertTrue("Expected the timer to be in the queue",
scheduledFutures.containsKey(timerKey));
+ assertEquals("Expected the scheduled callback from the first invocation",
+ scheduledFutures.get(timerKey),
+ mockScheduledFuture1);
+ }
+
+ @Test
+ public void testDuplicateTimerWithFinishedCallbacks() {
+ final String timerKey = "timer-1";
+ ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+ ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class);
+ ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+ when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false);
+ when(mockScheduledFuture1.isDone()).thenReturn(true);
+ when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+ .thenReturn(mockScheduledFuture1)
+ .thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return mockScheduledFuture2;
+ });
+
+ EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+ long timestamp = System.currentTimeMillis() + 10000;
+
+ ScheduledCallback<String> expectedScheduledCallback =
mock(ScheduledCallback.class);
+ scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+ scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback);
+
+ // verify the interactions with the scheduled future and the scheduler
+ verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(),
anyObject());
+ verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+ verify(mockScheduledFuture1, times(1)).isDone();
+
+ // verify the ready timer and its callback contents to ensure the second
invocation callback overwrites the
+ // first callback
+ Set<Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback>>
readyTimers =
+ scheduler.removeReadyTimers().entrySet();
+ assertEquals("Only one timer should be ready to be fired",
readyTimers.size(), 1);
+
+ Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback> timerEntry =
readyTimers.iterator().next();
+ assertEquals("Expected the scheduled callback from the second invocation",
+ timerEntry.getValue(),
+ expectedScheduledCallback);
+ assertEquals("Expected timer-1 as the key for ready timer",
+ timerEntry.getKey().getKey(),
+ timerKey);
+ }
+
+ @Test
public void testSingleTimer() {
EpochTimeScheduler scheduler =
EpochTimeScheduler.create(createExecutorService());
List<String> results = new ArrayList<>();