Repository: samza
Updated Branches:
  refs/heads/master 31a9e4aea -> 727a3c19a


SAMZA-1695: Clear events in ScheduleAfterDebounceTime on session expiration

Scenario:
Let's assume there're three processors in the group [P1, P2, P3] and P1 is the 
leader.

1. Leader processor(P1) loses connectivity with a zookeeper server in the 
ensemble and it's ephemeral processor node is deleted(due to session 
expiration).
2. Immediate successor(P2) to the leader(P1) finds out that the leader is dead 
and declares itself as leader. Processor P2 Schedules onProcessorChange to 
publish JobModel.
3. ZkClient connection retry logic helps the Leader(P1) to reconnect to another 
zkServer in the ensemble and it joins as follower.
4. Processor P1 acts on the stale buffered event in the debounce queue(which it 
received when it's a leader) and acts as leader. At this point, there're two 
processors acting as leader(P1 & P2). If P1 proceeds to execute leader actions 
before P2, P2 will fail(and in worst case can cause state corruption).

Sample exception logs:
https://gist.github.com/shanthoosh/55410fe4ebf3cfb65281b35f16397cad

Author: Shanthoosh Venkataraman <[email protected]>
Author: Shanthoosh Venkataraman <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #499 from shanthoosh/remove_events_from_debounce_queue_on_session_expiry


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/727a3c19
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/727a3c19
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/727a3c19

Branch: refs/heads/master
Commit: 727a3c19a1381f35ca650fe1d0398ab2b5e142ed
Parents: 31a9e4a
Author: Shanthoosh Venkataraman <[email protected]>
Authored: Mon May 7 19:40:03 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Mon May 7 19:40:03 2018 -0700

----------------------------------------------------------------------
 .../samza/zk/ScheduleAfterDebounceTime.java     | 10 ++-
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 69 ++++++++++++--------
 .../apache/samza/zk/TestZkJobCoordinator.java   | 39 +++++++++--
 3 files changed, 83 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/727a3c19/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java 
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index ec3521b..9abc26d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -122,8 +122,14 @@ public class ScheduleAfterDebounceTime {
     scheduledExecutorService.shutdown();
 
     // should clear out the future handles as well
-    futureHandles.keySet()
-        .forEach(this::tryCancelScheduledAction);
+    cancelAllScheduledActions();
+  }
+
+  public synchronized void cancelAllScheduledActions() {
+    if (!isShuttingDown) {
+      futureHandles.keySet().forEach(this::tryCancelScheduledAction);
+      futureHandles.clear();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/727a3c19/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index d6f402f..4977bff 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -33,7 +33,6 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
@@ -41,7 +40,6 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -92,10 +90,13 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
   private final ZkBarrierForVersionUpgrade barrier;
   private final ZkJobCoordinatorMetrics metrics;
   private final Map<String, MetricsReporter> reporters;
+  private final ZkLeaderElector leaderElector;
 
   private StreamMetadataCache streamMetadataCache = null;
   private SystemAdmins systemAdmins = null;
-  private ScheduleAfterDebounceTime debounceTimer = null;
+
+  @VisibleForTesting
+  ScheduleAfterDebounceTime debounceTimer = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
   private int debounceTimeMs;
@@ -114,7 +115,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     // setup a listener for a session state change
     // we are mostly interested in "session closed" and "new session created" 
events
     zkUtils.getZkClient().subscribeStateChanges(new 
ZkSessionStateChangedListener());
-    LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
+    leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
     this.zkController = new ZkControllerImpl(processorId, zkUtils, this, 
leaderElector);
     this.barrier =  new ZkBarrierForVersionUpgrade(
@@ -218,16 +219,17 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
   //////////////////////////////////////////////// LEADER stuff 
///////////////////////////
   @Override
   public void onProcessorChange(List<String> processors) {
-    LOG.info("ZkJobCoordinator::onProcessorChange - list of processors 
changed! List size=" + processors.size());
-    debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 
debounceTimeMs,
-        () -> doOnProcessorChange(processors));
+    if (leaderElector.amILeader()) {
+      LOG.info("ZkJobCoordinator::onProcessorChange - list of processors 
changed! List size=" + processors.size());
+      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 
debounceTimeMs, () -> doOnProcessorChange(processors));
+    }
   }
 
   void doOnProcessorChange(List<String> processors) {
     // if list of processors is empty - it means we are called from 
'onBecomeLeader'
     // TODO: Handle empty currentProcessorIds.
-    List<String> currentProcessorIds = getActualProcessorIds(processors);
-    Set<String> uniqueProcessorIds = new HashSet<String>(currentProcessorIds);
+    List<String> currentProcessorIds = zkUtils.getSortedActiveProcessorsIDs();
+    Set<String> uniqueProcessorIds = new HashSet<>(currentProcessorIds);
 
     if (currentProcessorIds.size() != uniqueProcessorIds.size()) {
       LOG.info("Processors: {} has duplicates. Not generating JobModel.", 
currentProcessorIds);
@@ -350,8 +352,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
      * to host mapping) is passed in as null when building the jobModel.
      */
     JobModel model = JobModelManager.readJobModel(this.config, 
changeLogPartitionMap, null, streamMetadataCache, processors);
-    // Nuke the configuration in JobModel.
-    return new JobModel(new MapConfig(), model.getContainers());
+    return model;
   }
 
   class LeaderElectorListenerImpl implements LeaderElectorListener {
@@ -379,11 +380,9 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
       startTime = System.nanoTime();
 
       metrics.barrierCreation.inc();
-      debounceTimer.scheduleAfterDebounceTime(
-          barrierAction,
-        (new ZkConfig(config)).getZkBarrierTimeoutMs(),
-        () -> barrier.expire(version)
-      );
+      if (leaderElector.amILeader()) {
+        debounceTimer.scheduleAfterDebounceTime(barrierAction, (new 
ZkConfig(config)).getZkBarrierTimeoutMs(), () -> barrier.expire(version));
+      }
     }
 
     public void onBarrierStateChanged(final String version, 
ZkBarrierForVersionUpgrade.State state) {
@@ -397,7 +396,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
           // no-op for non-leaders
           // for leader: make sure we do not stop - so generate a new job model
           LOG.warn("Barrier for version " + version + " timed out.");
-          if (zkController.isLeader()) {
+          if (leaderElector.amILeader()) {
             LOG.info("Leader will schedule a new job model generation");
             debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 
debounceTimeMs, () ->
               {
@@ -418,13 +417,14 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
   }
 
   /// listener to handle ZK state change events
+  @VisibleForTesting
   class ZkSessionStateChangedListener implements IZkStateListener {
 
     private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";
+    private static final String ZK_SESSION_EXPIRED = "ZK_SESSION_EXPIRED";
 
     @Override
-    public void handleStateChanged(Watcher.Event.KeeperState state)
-        throws Exception {
+    public void handleStateChanged(Watcher.Event.KeeperState state) {
       switch (state) {
         case Expired:
           // if the session has expired it means that all the registration's 
ephemeral nodes are gone.
@@ -433,12 +433,26 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
           // increase generation of the ZK session. All the callbacks from the 
previous generation will be ignored.
           zkUtils.incGeneration();
 
-          if (coordinatorListener != null) {
-            coordinatorListener.onJobModelExpired();
-          }
-
           // reset all the values that might have been from the previous 
session (e.g ephemeral node path)
           zkUtils.unregister();
+          if (leaderElector.amILeader()) {
+            leaderElector.resignLeadership();
+          }
+          /**
+           * After this event, one amongst the following two things could 
potentially happen:
+           * A. On successful reconnect to another zookeeper server in 
ensemble, this processor is going to
+           * join the group again as new processor. In this case, retaining 
buffered events in debounceTimer will be unnecessary.
+           * B. If zookeeper server is unreachable, 
handleSessionEstablishmentError callback will be triggered indicating
+           * a error scenario. In this case, retaining buffered events in 
debounceTimer will be unnecessary.
+           */
+          LOG.info("Cancelling all scheduled actions in session expiration for 
processorId: {}.", processorId);
+          debounceTimer.cancelAllScheduledActions();
+          debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_EXPIRED, 0, () -> 
{
+              if (coordinatorListener != null) {
+                coordinatorListener.onJobModelExpired();
+              }
+            });
+
           return;
         case Disconnected:
           // if the session has expired it means that all the registration's 
ephemeral nodes are gone.
@@ -460,21 +474,20 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
         default:
           // received SyncConnected, ConnectedReadOnly, and SaslAuthenticated. 
NoOp
           LOG.info("Got ZK event " + state.toString() + " for processor=" + 
processorId + ". Continue");
+          return;
       }
     }
 
     @Override
-    public void handleNewSession()
-        throws Exception {
+    public void handleNewSession() {
       LOG.info("Got new session created event for processor=" + processorId);
-
+      debounceTimer.cancelAllScheduledActions();
       LOG.info("register zk controller for the new session");
       zkController.register();
     }
 
     @Override
-    public void handleSessionEstablishmentError(Throwable error)
-        throws Exception {
+    public void handleSessionEstablishmentError(Throwable error) {
       // this means we cannot connect to zookeeper to establish a session
       LOG.info("handleSessionEstablishmentError received for processor=" + 
processorId, error);
       debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> 
stop());

http://git-wip-us.apache.org/repos/asf/samza/blob/727a3c19/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 117d458..c8367fb 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -23,9 +23,14 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
+import org.apache.zookeeper.Watcher;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.mockito.Mockito.*;
+
+
 public class TestZkJobCoordinator {
   private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
   private static final String TEST_JOB_MODEL_VERSION = "1";
@@ -34,16 +39,40 @@ public class TestZkJobCoordinator {
   public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() {
     ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
     ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    
Mockito.when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 
     ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    Mockito.when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    Mockito.when(zkUtils.getZkClient()).thenReturn(mockZkClient);
-    Mockito.when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
     ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
     zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION);
 
-    Mockito.verify(zkJobCoordinator, Mockito.atMost(1)).stop();
+    verify(zkJobCoordinator, Mockito.atMost(1)).stop();
+  }
+
+  @Test
+  public void 
testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
+
+    ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    final ZkSessionStateChangedListener zkSessionStateChangedListener = 
zkJobCoordinator.new ZkSessionStateChangedListener();
+
+    
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
+
+    verify(zkUtils).incGeneration();
+    verify(mockDebounceTimer).cancelAllScheduledActions();
+    
verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"),
 Mockito.eq(0L), Mockito.any(Runnable.class));
   }
 }

Reply via email to