This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 516a40d  SAMZA-2663: Handle job model expiration and new job model 
flows for multiple incomplete rebalances (#1528)
516a40d is described below

commit 516a40d8c4eb67cad07a379f28e92fbfb4f459d0
Author: mynameborat <[email protected]>
AuthorDate: Sat Sep 11 10:40:24 2021 -0700

    SAMZA-2663: Handle job model expiration and new job model flows for 
multiple incomplete rebalances (#1528)
    
    Problem:
    As part of SAMZA-2638, we introduced skipping container restart and stops 
on no changes to work assignment for processors across rebalances. However, we 
only update the active job model with the proposed job model on starting the 
container as part of onNewJobModel. This leads to a scenario where the 
processor is stopped but the future rebalances assume the container is still 
running. More information on scenario below.
    
    Changes:
    
    Track job model expiration
    onNewJobModel triggers new job model as long as the active job model has 
been expired
    Handle no change in work assignment optimization only during 
checkJobModelExpired flow.
---
 .../java/org/apache/samza/zk/ZkJobCoordinator.java | 18 ++++++-
 .../org/apache/samza/zk/TestZkJobCoordinator.java  | 56 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)

diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index f41ee8c..447143a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -110,6 +110,10 @@ public class ZkJobCoordinator implements JobCoordinator {
   private final MetadataStore jobModelMetadataStore;
   private final CoordinatorStreamStore coordinatorStreamStore;
 
+  // It is sufficient for the field to be volatile as the flows that 
read/update execute on debounce timer which is single threaded
+  // Choice of atomic boolean is purely for convenience for operations like 
compareAndSet to enforce invariant checks.
+  private final AtomicBoolean jobModelExpired = new AtomicBoolean(false);
+
   private JobCoordinatorListener coordinatorListener = null;
   // denotes the most recent job model agreed by the quorum
   private JobModel activeJobModel;
@@ -441,6 +445,7 @@ public class ZkJobCoordinator implements JobCoordinator {
     } else {
       LOG.info("Work assignment changed for the processor {}. Notifying job 
model expiration to coordinator listener", processorId);
       coordinatorListener.onJobModelExpired();
+      jobModelExpired.set(true);
     }
   }
 
@@ -455,7 +460,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   void onNewJobModel(JobModel newJobModel) {
     Preconditions.checkNotNull(newJobModel, "JobModel cannot be null. Failing 
onNewJobModel");
     // start the container with the new model
-    if (!JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+    if (jobModelExpired.compareAndSet(true, false)) {
       LOG.info("Work assignment changed for the processor {}. Updating task 
locality and notifying coordinator listener", processorId);
       if (newJobModel.getContainers().containsKey(processorId)) {
         for (TaskName taskName : 
JobModelUtil.getTaskNamesForProcessor(processorId, newJobModel)) {
@@ -468,6 +473,7 @@ public class ZkJobCoordinator implements JobCoordinator {
       }
     } else {
       /*
+       * We don't expire the job model if the proposed work assignment is same 
as the current work assignment.
        * The implication of work assignment remaining the same can be 
categorized into
        *   1. Processor part of the job model
        *   2. Processor not part of the job model.
@@ -499,6 +505,16 @@ public class ZkJobCoordinator implements JobCoordinator {
   }
 
   @VisibleForTesting
+  boolean getJobModelExpired() {
+    return jobModelExpired.get();
+  }
+
+  @VisibleForTesting
+  void setJobModelExpired(boolean value) {
+    jobModelExpired.set(value);
+  }
+
+  @VisibleForTesting
   void setDebounceTimer(ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
     debounceTimer = scheduleAfterDebounceTime;
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 0ccb8d9..586dfc0 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -54,7 +54,9 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
@@ -66,6 +68,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
@@ -113,6 +116,58 @@ public class TestZkJobCoordinator {
   }
 
   @Test
+  public void testCheckAndExpireWithMultipleRebalances() {
+    final TaskName taskName = new TaskName("task1");
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    final JobCoordinatorListener mockListener = 
mock(JobCoordinatorListener.class);
+    final JobModel jobModelVersion1 = mock(JobModel.class);
+    final JobModel jobModelVersion2 = mock(JobModel.class);
+    final JobModel jobModelVersion3 = jobModelVersion1;
+
+    when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, 
mock(TaskModel.class)));
+    
when(jobModelVersion3.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, 
mockContainerModel));
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new 
MapConfig(), new NoOpMetricsRegistry(),
+        zkUtils, zkMetadataStore, coordinatorStreamStore);
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.setActiveJobModel(jobModelVersion1);
+
+    /*
+     * The following mimics the scenario where new work assignment(V2) is 
proposed by the leader and the work assignment
+     * differs from the active work assignment(V1) and hence results in job 
model expiration
+     */
+    zkJobCoordinator.checkAndExpireJobModel(jobModelVersion2);
+
+    verify(mockListener, times(1)).onJobModelExpired();
+    assertTrue("JobModelExpired should be true for work assignment changes", 
zkJobCoordinator.getJobModelExpired());
+    assertEquals("Active job model shouldn't be updated", jobModelVersion1, 
zkJobCoordinator.getActiveJobModel());
+
+    /*
+     * The following mimics the scenario where leader kicked off another 
rebalance where the new work assignment(V3)
+     * is same as the old work assignment(V1) and doesn't trigger job model 
expiration. We check the interactions w/
+     * the listener to ensure job model expiration isn't invoked. However, the 
previous rebalance should have already
+     * triggered job model expiration and set the job model expired flag to 
true
+     */
+    zkJobCoordinator.checkAndExpireJobModel(jobModelVersion1);
+    verifyNoMoreInteractions(mockListener);
+    assertTrue("JobModelExpired should remain unchanged", 
zkJobCoordinator.getJobModelExpired());
+    assertEquals("Active job model shouldn't be updated", jobModelVersion1, 
zkJobCoordinator.getActiveJobModel());
+
+
+    /*
+     * The following mimics the scenario where the new work assignment(V3) 
proposed by the leader is accepted and
+     * on new job model is invoked. Even though the work assignment remains 
the same w/ the active job model version,
+     * onNewJobModel is invoked on the listener as an intermediate rebalance 
expired the old work assignment(V1)
+     */
+    zkJobCoordinator.onNewJobModel(jobModelVersion3);
+    verify(mockListener, times(1)).onNewJobModel(PROCESSOR_ID, 
jobModelVersion3);
+    verify(zkUtils, times(1)).writeTaskLocality(any(), any());
+
+    assertEquals("Active job model should be updated to new job model", 
zkJobCoordinator.getActiveJobModel(), jobModelVersion3);
+    assertFalse("JobModelExpired should be set to false after onNewJobModel", 
zkJobCoordinator.getJobModelExpired());
+  }
+
+  @Test
   public void testCheckAndExpireWithNoChangeInWorkAssignment() {
     BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod =
       (ignored, coordinatorListener) -> 
verifyZeroInteractions(coordinatorListener);
@@ -155,6 +210,7 @@ public class TestZkJobCoordinator {
     ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new 
MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore);
     zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.setJobModelExpired(true);
     zkJobCoordinator.onNewJobModel(mockJobModel);
 
     verify(zkUtils, times(1)).writeTaskLocality(eq(taskName), any());

Reply via email to