Repository: samza
Updated Branches:
refs/heads/master 164fa5f03 -> 9d904b157
SAMZA-1692: Standalone stability fixes.
- Currently, on session expiration processorListener with incorrect
generationId is registered with zookeeper(ZkUtils generationId is incremented
on reconnect but the generationId in processorListener is zero all the time).
When a session reconnect happens to a processor successive to leader, leader
expiration event will be skipped. This will prevent leader re-election on a
current leader death and will stall the processors group. Fix is to
reinstantiate and then register processorChangeListener on session expiration.
- Add processorId to debounce thread name (this can aid debugging when multiple
processors are running within a jvm).
- After ScheduleAfterDebounceTime queue is shutdown, don't accept new schedule
requests. Current ZkJobCoordinator shutdown sequence comprise of the following
steps
- Shutdown the ScheduleAfterDebounceTime queue.
- Stop the zkClient and relinquish it's resources.
After we shutdown ScheduleAfterDebounceTime and before zkclient is stopped, any
new operations can be scheduled in ScheduleAfterDebounceTime queue. This will
result in RejectedExecutionException, since executorService is stopped.
```
Caused by: java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask23f962a8
rejected from java.util.concurrent.ScheduledThreadPoolExecutor43408be8
```
Author: Shanthoosh Venkataraman <[email protected]>
Author: Shanthoosh Venkataraman <[email protected]>
Reviewers: Jagadish <[email protected]>
Closes #496 from shanthoosh/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d904b15
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d904b15
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d904b15
Branch: refs/heads/master
Commit: 9d904b1578827d8ffcf75f8b27d5ca1262f86118
Parents: 164fa5f
Author: Shanthoosh Venkataraman <[email protected]>
Authored: Mon May 7 13:43:05 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Mon May 7 13:43:05 2018 -0700
----------------------------------------------------------------------
.../samza/zk/ScheduleAfterDebounceTime.java | 65 ++++++++++++--------
.../org/apache/samza/zk/ZkJobCoordinator.java | 2 +-
.../org/apache/samza/zk/ZkLeaderElector.java | 3 +-
.../samza/zk/TestScheduleAfterDebounceTime.java | 16 ++++-
4 files changed, 56 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9d904b15/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index f6f2dc9..ec3521b 100644
---
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -20,13 +20,13 @@
package org.apache.samza.zk;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
*/
public class ScheduleAfterDebounceTime {
private static final Logger LOG =
LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
- private static final String DEBOUNCE_THREAD_NAME_FORMAT =
"debounce-thread-%d";
+ private static final String DEBOUNCE_THREAD_NAME_FORMAT = "Samza Debounce
Thread-%s";
// timeout to wait for a task to complete.
private static final int TIMEOUT_MS = 1000 * 10;
@@ -56,11 +56,14 @@ public class ScheduleAfterDebounceTime {
* A map from actionName to {@link ScheduledFuture} of task scheduled for
execution.
*/
private final Map<String, ScheduledFuture> futureHandles = new
ConcurrentHashMap<>();
- private boolean isShuttingDown;
+ private volatile boolean isShuttingDown;
- public ScheduleAfterDebounceTime() {
- ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build();
+ public ScheduleAfterDebounceTime(String processorId) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(String.format(DEBOUNCE_THREAD_NAME_FORMAT, processorId))
+ .setDaemon(true).build();
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(threadFactory);
+ isShuttingDown = false;
}
public void setScheduledTaskCallback(ScheduledTaskCallback
scheduledTaskCallback) {
@@ -79,16 +82,29 @@ public class ScheduleAfterDebounceTime {
* @param runnable the action to execute.
*/
public synchronized void scheduleAfterDebounceTime(String actionName, long
delayInMillis, Runnable runnable) {
- // 1. Try to cancel any existing scheduled task associated with the action.
- tryCancelScheduledAction(actionName);
+ if (!isShuttingDown) {
+ // 1. Try to cancel any existing scheduled task associated with the
action.
+ tryCancelScheduledAction(actionName);
+
+ // 2. Schedule the action.
+ ScheduledFuture scheduledFuture =
+ scheduledExecutorService.schedule(getScheduleableAction(actionName,
runnable), delayInMillis, TimeUnit.MILLISECONDS);
+
+ LOG.info("Scheduled action: {} to run after: {} milliseconds.",
actionName, delayInMillis);
+ futureHandles.put(actionName, scheduledFuture);
+ } else {
+ LOG.info("Scheduler is stopped. Not scheduling action: {} to run.",
actionName);
+ }
+ }
- // 2. Schedule the action.
- ScheduledFuture scheduledFuture =
scheduledExecutorService.schedule(getScheduleableAction(actionName, runnable),
delayInMillis, TimeUnit.MILLISECONDS);
- LOG.info("Scheduled action: {} to run after: {} milliseconds.",
actionName, delayInMillis);
- futureHandles.put(actionName, scheduledFuture);
+ public synchronized void cancelAction(String action) {
+ if (!isShuttingDown) {
+ this.tryCancelScheduledAction(action);
+ }
}
+
/**
* Stops the scheduler. After this invocation no further schedule calls will
be accepted
* and all pending enqueued tasks will be cancelled.
@@ -110,16 +126,13 @@ public class ScheduleAfterDebounceTime {
.forEach(this::tryCancelScheduledAction);
}
- public synchronized void cancelAction(String action) {
- this.tryCancelScheduledAction(action);
- }
-
/**
* Tries to cancel the task that belongs to {@code actionName} submitted to
the queue.
*
* @param actionName the name of action to cancel.
*/
private void tryCancelScheduledAction(String actionName) {
+ LOG.info("Trying to cancel the action: {}.", actionName);
ScheduledFuture scheduledFuture = futureHandles.get(actionName);
if (scheduledFuture != null && !scheduledFuture.isDone()) {
LOG.info("Attempting to cancel the future of action: {}", actionName);
@@ -146,16 +159,18 @@ public class ScheduleAfterDebounceTime {
private Runnable getScheduleableAction(String actionName, Runnable runnable)
{
return () -> {
try {
- runnable.run();
- /*
- * Expects all run() implementations <b>not to swallow the
interrupts.</b>
- * This thread is interrupted from an external source(mostly executor
service) to die.
- */
- if (Thread.currentThread().isInterrupted()) {
- LOG.warn("Action: {} is interrupted.", actionName);
- doCleanUpOnTaskException(new InterruptedException());
- } else {
- LOG.debug("Action: {} completed successfully.", actionName);
+ if (!isShuttingDown) {
+ runnable.run();
+ /*
+ * Expects all run() implementations <b>not to swallow the
interrupts.</b>
+ * This thread is interrupted from an external source(mostly
executor service) to die.
+ */
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("Action: {} is interrupted.", actionName);
+ doCleanUpOnTaskException(new InterruptedException());
+ } else {
+ LOG.info("Action: {} completed successfully.", actionName);
+ }
}
} catch (Throwable throwable) {
LOG.error("Execution of action: {} failed.", actionName, throwable);
http://git-wip-us.apache.org/repos/asf/samza/blob/9d904b15/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 1134d6f..d6f402f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -123,7 +123,7 @@ public class ZkJobCoordinator implements JobCoordinator,
ZkControllerListener {
new ZkBarrierListenerImpl());
this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
this.reporters = MetricsReporterLoader.getMetricsReporters(new
MetricsConfig(config), processorId);
- debounceTimer = new ScheduleAfterDebounceTime();
+ debounceTimer = new ScheduleAfterDebounceTime(processorId);
debounceTimer.setScheduledTaskCallback(throwable -> {
LOG.error("Received exception in debounce timer! Stopping the job
coordinator", throwable);
stop();
http://git-wip-us.apache.org/repos/asf/samza/blob/9d904b15/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index f4c1e94..c9ee1f0 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -51,7 +51,7 @@ public class ZkLeaderElector implements LeaderElector {
private final String hostName;
private AtomicBoolean isLeader = new AtomicBoolean(false);
- private final IZkDataListener previousProcessorChangeListener;
+ private IZkDataListener previousProcessorChangeListener;
private LeaderElectorListener leaderElectorListener = null;
private String currentSubscription = null;
private final Random random = new Random();
@@ -130,6 +130,7 @@ public class ZkLeaderElector implements LeaderElector {
LOG.debug(zLog("Unsubscribing data change for " +
currentSubscription));
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" +
currentSubscription,
previousProcessorChangeListener);
+ previousProcessorChangeListener = new
PreviousProcessorChangeListener(zkUtils);
}
currentSubscription = predecessor;
LOG.info(zLog("Subscribing data change for " + predecessor));
http://git-wip-us.apache.org/repos/asf/samza/blob/9d904b15/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git
a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index 7f687d7..67b2d45 100644
---
a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++
b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -34,6 +34,8 @@ public class TestScheduleAfterDebounceTime {
private static final long WAIT_TIME = 500;
+ private static final String TEST_PROCESSOR_ID = "TEST_PROCESSOR_ID";
+
@Rule
public Timeout testTimeOutInSeconds = new Timeout(10, TimeUnit.SECONDS);
@@ -52,7 +54,7 @@ public class TestScheduleAfterDebounceTime {
@Test
public void testSchedule() throws InterruptedException {
- ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+ ScheduleAfterDebounceTime scheduledQueue = new
ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
final CountDownLatch latch = new CountDownLatch(1);
final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
@@ -72,7 +74,7 @@ public class TestScheduleAfterDebounceTime {
@Test
public void testCancelAndSchedule() throws InterruptedException {
- ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+ ScheduleAfterDebounceTime scheduledQueue = new
ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
final CountDownLatch test1Latch = new CountDownLatch(1);
final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
@@ -101,7 +103,7 @@ public class TestScheduleAfterDebounceTime {
public void testRunnableWithExceptionInvokesCallback() throws
InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final Throwable[] taskCallbackException = new Exception[1];
- ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
+ ScheduleAfterDebounceTime scheduledQueue = new
ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
scheduledQueue.setScheduledTaskCallback(throwable -> {
taskCallbackException[0] = throwable;
latch.countDown();
@@ -121,4 +123,12 @@ public class TestScheduleAfterDebounceTime {
Assert.assertEquals(RuntimeException.class,
taskCallbackException[0].getClass());
scheduledQueue.stopScheduler();
}
+
+ @Test
+ public void testNewTasksScheduledAfterShutdownDoesNotThrowException() throws
InterruptedException {
+ ScheduleAfterDebounceTime scheduledQueue = new
ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
+
+ scheduledQueue.stopScheduler();
+ scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () ->
Assert.fail("New event should not be scheduled"));
+ }
}