Repository: samza
Updated Branches:
  refs/heads/master 164fa5f03 -> 9d904b157


SAMZA-1692: Standalone stability fixes.

- Currently, on session expiration processorListener with incorrect 
generationId is registered with zookeeper(ZkUtils generationId is incremented 
on reconnect but the generationId in processorListener is zero all the time). 
When a session reconnect happens to a processor successive to leader, leader 
expiration event will be skipped. This will prevent leader re-election on a 
current leader death and will stall the processors group. Fix is to 
reinstantiate and then register processorChangeListener on session expiration.
- Add processorId to debounce thread name (this can aid debugging when multiple 
processors are running within a jvm).
- After ScheduleAfterDebounceTime queue is shutdown, don't accept new schedule 
requests. Current ZkJobCoordinator shutdown sequence comprise of the following 
steps
     - Shutdown the ScheduleAfterDebounceTime queue.
     - Stop the zkClient  and relinquish it's resources.

After we shutdown ScheduleAfterDebounceTime and before zkclient is stopped, any 
new operations can be scheduled in ScheduleAfterDebounceTime queue. This will 
result in RejectedExecutionException, since executorService is stopped.

```
Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask23f962a8 
rejected from java.util.concurrent.ScheduledThreadPoolExecutor43408be8
```

Author: Shanthoosh Venkataraman <[email protected]>
Author: Shanthoosh Venkataraman <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #496 from shanthoosh/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d904b15
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d904b15
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d904b15

Branch: refs/heads/master
Commit: 9d904b1578827d8ffcf75f8b27d5ca1262f86118
Parents: 164fa5f
Author: Shanthoosh Venkataraman <[email protected]>
Authored: Mon May 7 13:43:05 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Mon May 7 13:43:05 2018 -0700

----------------------------------------------------------------------
 .../samza/zk/ScheduleAfterDebounceTime.java     | 65 ++++++++++++--------
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  2 +-
 .../org/apache/samza/zk/ZkLeaderElector.java    |  3 +-
 .../samza/zk/TestScheduleAfterDebounceTime.java | 16 ++++-
 4 files changed, 56 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9d904b15/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java 
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index f6f2dc9..ec3521b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -20,13 +20,13 @@
 package org.apache.samza.zk;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.Optional;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ScheduleAfterDebounceTime {
   private static final Logger LOG = 
LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
-  private static final String DEBOUNCE_THREAD_NAME_FORMAT = 
"debounce-thread-%d";
+  private static final String DEBOUNCE_THREAD_NAME_FORMAT = "Samza Debounce 
Thread-%s";
 
   // timeout to wait for a task to complete.
   private static final int TIMEOUT_MS = 1000 * 10;
@@ -56,11 +56,14 @@ public class ScheduleAfterDebounceTime {
    * A map from actionName to {@link ScheduledFuture} of task scheduled for 
execution.
    */
   private final Map<String, ScheduledFuture> futureHandles = new 
ConcurrentHashMap<>();
-  private boolean isShuttingDown;
+  private volatile boolean isShuttingDown;
 
-  public ScheduleAfterDebounceTime() {
-    ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build();
+  public ScheduleAfterDebounceTime(String processorId) {
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat(String.format(DEBOUNCE_THREAD_NAME_FORMAT, processorId))
+        .setDaemon(true).build();
     this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
+    isShuttingDown = false;
   }
 
   public void setScheduledTaskCallback(ScheduledTaskCallback 
scheduledTaskCallback) {
@@ -79,16 +82,29 @@ public class ScheduleAfterDebounceTime {
    * @param runnable the action to execute.
    */
   public synchronized void scheduleAfterDebounceTime(String actionName, long 
delayInMillis, Runnable runnable) {
-    // 1. Try to cancel any existing scheduled task associated with the action.
-    tryCancelScheduledAction(actionName);
+    if (!isShuttingDown) {
+      // 1. Try to cancel any existing scheduled task associated with the 
action.
+      tryCancelScheduledAction(actionName);
+
+      // 2. Schedule the action.
+      ScheduledFuture scheduledFuture =
+          scheduledExecutorService.schedule(getScheduleableAction(actionName, 
runnable), delayInMillis, TimeUnit.MILLISECONDS);
+
+      LOG.info("Scheduled action: {} to run after: {} milliseconds.", 
actionName, delayInMillis);
+      futureHandles.put(actionName, scheduledFuture);
+    } else {
+      LOG.info("Scheduler is stopped. Not scheduling action: {} to run.", 
actionName);
+    }
+  }
 
-    // 2. Schedule the action.
-    ScheduledFuture scheduledFuture = 
scheduledExecutorService.schedule(getScheduleableAction(actionName, runnable), 
delayInMillis, TimeUnit.MILLISECONDS);
 
-    LOG.info("Scheduled action: {} to run after: {} milliseconds.", 
actionName, delayInMillis);
-    futureHandles.put(actionName, scheduledFuture);
+  public synchronized void cancelAction(String action) {
+    if (!isShuttingDown) {
+      this.tryCancelScheduledAction(action);
+    }
   }
 
+
   /**
    * Stops the scheduler. After this invocation no further schedule calls will 
be accepted
    * and all pending enqueued tasks will be cancelled.
@@ -110,16 +126,13 @@ public class ScheduleAfterDebounceTime {
         .forEach(this::tryCancelScheduledAction);
   }
 
-  public synchronized void cancelAction(String action) {
-    this.tryCancelScheduledAction(action);
-  }
-
   /**
    * Tries to cancel the task that belongs to {@code actionName} submitted to 
the queue.
    *
    * @param actionName the name of action to cancel.
    */
   private void tryCancelScheduledAction(String actionName) {
+    LOG.info("Trying to cancel the action: {}.", actionName);
     ScheduledFuture scheduledFuture = futureHandles.get(actionName);
     if (scheduledFuture != null && !scheduledFuture.isDone()) {
       LOG.info("Attempting to cancel the future of action: {}", actionName);
@@ -146,16 +159,18 @@ public class ScheduleAfterDebounceTime {
   private Runnable getScheduleableAction(String actionName, Runnable runnable) 
{
     return () -> {
       try {
-        runnable.run();
-        /*
-         * Expects all run() implementations <b>not to swallow the 
interrupts.</b>
-         * This thread is interrupted from an external source(mostly executor 
service) to die.
-         */
-        if (Thread.currentThread().isInterrupted()) {
-          LOG.warn("Action: {} is interrupted.", actionName);
-          doCleanUpOnTaskException(new InterruptedException());
-        } else {
-          LOG.debug("Action: {} completed successfully.", actionName);
+        if (!isShuttingDown) {
+          runnable.run();
+          /*
+           * Expects all run() implementations <b>not to swallow the 
interrupts.</b>
+           * This thread is interrupted from an external source(mostly 
executor service) to die.
+           */
+          if (Thread.currentThread().isInterrupted()) {
+            LOG.warn("Action: {} is interrupted.", actionName);
+            doCleanUpOnTaskException(new InterruptedException());
+          } else {
+            LOG.info("Action: {} completed successfully.", actionName);
+          }
         }
       } catch (Throwable throwable) {
         LOG.error("Execution of action: {} failed.", actionName, throwable);

http://git-wip-us.apache.org/repos/asf/samza/blob/9d904b15/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 1134d6f..d6f402f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -123,7 +123,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
         new ZkBarrierListenerImpl());
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
     this.reporters = MetricsReporterLoader.getMetricsReporters(new 
MetricsConfig(config), processorId);
-    debounceTimer = new ScheduleAfterDebounceTime();
+    debounceTimer = new ScheduleAfterDebounceTime(processorId);
     debounceTimer.setScheduledTaskCallback(throwable -> {
         LOG.error("Received exception in debounce timer! Stopping the job 
coordinator", throwable);
         stop();

http://git-wip-us.apache.org/repos/asf/samza/blob/9d904b15/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index f4c1e94..c9ee1f0 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -51,7 +51,7 @@ public class ZkLeaderElector implements LeaderElector {
   private final String hostName;
 
   private AtomicBoolean isLeader = new AtomicBoolean(false);
-  private final IZkDataListener previousProcessorChangeListener;
+  private IZkDataListener previousProcessorChangeListener;
   private LeaderElectorListener leaderElectorListener = null;
   private String currentSubscription = null;
   private final Random random = new Random();
@@ -130,6 +130,7 @@ public class ZkLeaderElector implements LeaderElector {
         LOG.debug(zLog("Unsubscribing data change for " + 
currentSubscription));
         zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + 
currentSubscription,
             previousProcessorChangeListener);
+        previousProcessorChangeListener = new 
PreviousProcessorChangeListener(zkUtils);
       }
       currentSubscription = predecessor;
       LOG.info(zLog("Subscribing data change for " + predecessor));

http://git-wip-us.apache.org/repos/asf/samza/blob/9d904b15/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
 
b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index 7f687d7..67b2d45 100644
--- 
a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ 
b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -34,6 +34,8 @@ public class TestScheduleAfterDebounceTime {
 
   private static final long WAIT_TIME = 500;
 
+  private static final String TEST_PROCESSOR_ID = "TEST_PROCESSOR_ID";
+
   @Rule
   public Timeout testTimeOutInSeconds = new Timeout(10, TimeUnit.SECONDS);
 
@@ -52,7 +54,7 @@ public class TestScheduleAfterDebounceTime {
 
   @Test
   public void testSchedule() throws InterruptedException {
-    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    ScheduleAfterDebounceTime scheduledQueue = new 
ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
     final CountDownLatch latch = new CountDownLatch(1);
 
     final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
@@ -72,7 +74,7 @@ public class TestScheduleAfterDebounceTime {
 
   @Test
   public void testCancelAndSchedule() throws InterruptedException {
-    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    ScheduleAfterDebounceTime scheduledQueue = new 
ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
     final CountDownLatch test1Latch = new CountDownLatch(1);
 
     final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
@@ -101,7 +103,7 @@ public class TestScheduleAfterDebounceTime {
   public void testRunnableWithExceptionInvokesCallback() throws 
InterruptedException {
     final CountDownLatch latch = new CountDownLatch(1);
     final Throwable[] taskCallbackException = new Exception[1];
-    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+    ScheduleAfterDebounceTime scheduledQueue = new 
ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
     scheduledQueue.setScheduledTaskCallback(throwable -> {
         taskCallbackException[0] = throwable;
         latch.countDown();
@@ -121,4 +123,12 @@ public class TestScheduleAfterDebounceTime {
     Assert.assertEquals(RuntimeException.class, 
taskCallbackException[0].getClass());
     scheduledQueue.stopScheduler();
   }
+
+  @Test
+  public void testNewTasksScheduledAfterShutdownDoesNotThrowException() throws 
InterruptedException {
+    ScheduleAfterDebounceTime scheduledQueue = new 
ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
+
+    scheduledQueue.stopScheduler();
+    scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () -> 
Assert.fail("New event should not be scheduled"));
+  }
 }

Reply via email to