This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new da8d158  SAMZA-2638: Skip container stop/restart for processors that 
have same work assignment across rebalances (#1478)
da8d158 is described below

commit da8d1582cc4cff22c23d80dc117ea4f076ca3827
Author: mynameborat <[email protected]>
AuthorDate: Fri Mar 26 16:25:48 2021 -0700

    SAMZA-2638: Skip container stop/restart for processors that have same work 
assignment across rebalances (#1478)
    
    Problem:
    As part of rebalance, we always expire the current work assignment and 
proceed to signal consensus and start the container with new work assignment. 
It is inefficient for the processors to do the above when there is no changes 
in the work assignment between the active job model & the proposed job model.
    
    Changes:
    
    - Processors perform onJobModelExpired and onNewJobModel only if there are 
changes to their work assignment across the active and proposed job model.
    - Processors no longer shutdown if they are not part of job model. Refer to 
the SAMZA-2638 for explanations
    - Add helper methods in JobModelUtil for work assignment comparison
---
 .../org/apache/samza/job/model/JobModelUtil.java   |  42 ++++
 .../java/org/apache/samza/zk/ZkJobCoordinator.java | 130 ++++++++---
 .../apache/samza/job/model/TestJobModelUtil.java   |  64 ++++++
 .../org/apache/samza/zk/TestZkJobCoordinator.java  | 251 ++++++++++++++-------
 4 files changed, 374 insertions(+), 113 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java 
b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index d356baf..f1d6073 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.job.model;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.HashMap;
@@ -119,6 +120,47 @@ public class JobModelUtil {
     }
   }
 
+  /**
+   * Compares the {@link ContainerModel} for a given <i>processorId</i> across 
two {@link JobModel}.
+   * @param processorId processor id for which work assignments are compared
+   * @param first first job model
+   * @param second second job model
+   * @return true - if {@link ContainerModel} for the processor is same across 
the {@link JobModel}
+   *         false - otherwise
+   */
+  public static boolean compareContainerModelForProcessor(String processorId, 
JobModel first, JobModel second) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(processorId), 
"Processor id cannot be blank");
+    if (first == second) {
+      return true;
+    }
+
+    if (first == null || second == null) {
+      return false;
+    }
+
+    return compareContainerModel(first.getContainers().get(processorId), 
second.getContainers().get(processorId));
+  }
+
+  /**
+   * Helper method to compare the two input {@link ContainerModel}s.
+   * @param first first container model
+   * @param second second container model
+   * @return true - if two input {@link ContainerModel} are equal
+   *         false - otherwise
+   */
+  @VisibleForTesting
+  static boolean compareContainerModel(ContainerModel first, ContainerModel 
second) {
+    if (first == second) {
+      return true;
+    }
+
+    if (first == null || second == null) {
+      return false;
+    }
+
+    return first.equals(second);
+  }
+
   private static String getJobModelKey(String version) {
     return String.format("%s/%s", JOB_MODEL_GENERATION_KEY, version);
   }
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 6526705..102f357 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -98,7 +98,6 @@ public class ZkJobCoordinator implements JobCoordinator {
   private final String processorId;
 
   private final Config config;
-  private final ZkBarrierForVersionUpgrade barrier;
   private final ZkJobCoordinatorMetrics metrics;
   private final ZkLeaderElector leaderElector;
   private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false);
@@ -111,9 +110,13 @@ public class ZkJobCoordinator implements JobCoordinator {
   private final CoordinatorStreamStore coordinatorStreamStore;
 
   private JobCoordinatorListener coordinatorListener = null;
-  private JobModel newJobModel;
+  // denotes the most recent job model agreed by the quorum
+  private JobModel activeJobModel;
+  // denotes job model that is latest but may have not reached consensus
+  private JobModel latestJobModel;
   private boolean hasLoadedMetadataResources = false;
   private String cachedJobModelVersion = null;
+  private ZkBarrierForVersionUpgrade barrier;
 
   @VisibleForTesting
   ZkSessionMetrics zkSessionMetrics;
@@ -232,7 +235,7 @@ public class ZkJobCoordinator implements JobCoordinator {
 
   @Override
   public JobModel getJobModel() {
-    return newJobModel;
+    return latestJobModel;
   }
 
   @Override
@@ -269,11 +272,11 @@ public class ZkJobCoordinator implements JobCoordinator {
 
     // Generate the JobModel
     LOG.info("Generating new JobModel with processors: {}.", 
currentProcessorIds);
-    JobModel jobModel = generateNewJobModel(processorNodes);
+    JobModel newJobModel = generateNewJobModel(processorNodes);
 
     // Create checkpoint and changelog streams if they don't exist
     if (!hasLoadedMetadataResources) {
-      loadMetadataResources(jobModel);
+      loadMetadataResources(newJobModel);
       hasLoadedMetadataResources = true;
     }
 
@@ -283,7 +286,7 @@ public class ZkJobCoordinator implements JobCoordinator {
     LOG.info("pid=" + processorId + "Generated new JobModel with version: " + 
nextJMVersion + " and processors: " + currentProcessorIds);
 
     // Publish the new job model
-    publishJobModelToMetadataStore(jobModel, nextJMVersion);
+    publishJobModelToMetadataStore(newJobModel, nextJMVersion);
 
     // Start the barrier for the job model update
     barrier.create(nextJMVersion, currentProcessorIds);
@@ -393,6 +396,90 @@ public class ZkJobCoordinator implements JobCoordinator {
   }
 
   /**
+   * Check if the new job model contains a different work assignment for the 
processor compared the last active job
+   * model. In case of different work assignment, expire the current job model 
by invoking the <i>onJobModelExpired</i>
+   * on the registered {@link JobCoordinatorListener}.
+   * At this phase, the job model is yet to be agreed by the quorum and hence, 
this optimization helps availability of
+   * the processors in the event no changes in the work assignment.
+   *
+   * @param newJobModel new job model published by the leader
+   */
+  @VisibleForTesting
+  void checkAndExpireJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null");
+    if (coordinatorListener == null) {
+      LOG.info("Skipping job model expiration since there are no active 
listeners");
+      return;
+    }
+
+    if (JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+      LOG.info("Skipping job model expiration for processor {} due to no 
change in work assignment.", processorId);
+    } else {
+      LOG.info("Work assignment changed for the processor {}. Notifying job 
model expiration to coordinator listener", processorId);
+      coordinatorListener.onJobModelExpired();
+    }
+  }
+
+  /**
+   * Checks if the new job model contains a different work assignment for the 
processor compared to the last active
+   * job model. In case of different work assignment, update the task locality 
of the tasks associated with the
+   * processor and notify new job model to the registered {@link 
JobCoordinatorListener}.
+   *
+   * @param newJobModel new job model agreed by the quorum
+   */
+  @VisibleForTesting
+  void onNewJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null. Failing 
onNewJobModel");
+    // start the container with the new model
+    if (!JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+      LOG.info("Work assignment changed for the processor {}. Updating task 
locality and notifying coordinator listener", processorId);
+      if (newJobModel.getContainers().containsKey(processorId)) {
+        for (TaskName taskName : 
JobModelUtil.getTaskNamesForProcessor(processorId, newJobModel)) {
+          zkUtils.writeTaskLocality(taskName, locationId);
+        }
+
+        if (coordinatorListener != null) {
+          coordinatorListener.onNewJobModel(processorId, newJobModel);
+        }
+      }
+    } else {
+      /*
+       * The implication of work assignment remaining the same can be 
categorized into
+       *   1. Processor part of the job model
+       *   2. Processor not part of the job model.
+       * For both the state of the processor remains what it was when the 
rebalance started. e.g.,
+       *   [1] should continue to process its work assignment without any 
interruption as part of the rebalance. i.e.,
+       *       there will be no expiration of the existing work (a.k.a samza 
container won't be stopped) and also no
+       *       notification to StreamProcessor about the rebalance since work 
assignment didn't change.
+       *   [2] should have no work and be idle processor and will continue to 
be idle.
+       */
+      LOG.info("Skipping onNewJobModel since there are no changes in work 
assignment.");
+    }
+
+    /*
+     * Update the last active job model to new job model regardless of whether 
the work assignment for the processor
+     * has changed or not. It is important to do it so that all the processors 
has a consistent view what the latest
+     * active job model is.
+     */
+    activeJobModel = newJobModel;
+  }
+
+  @VisibleForTesting
+  JobModel getActiveJobModel() {
+    return activeJobModel;
+  }
+
+  @VisibleForTesting
+  void setActiveJobModel(JobModel jobModel) {
+    activeJobModel = jobModel;
+  }
+
+  @VisibleForTesting
+  void setZkBarrierUpgradeForVersion(ZkBarrierForVersionUpgrade 
barrierUpgradeForVersion) {
+    barrier = barrierUpgradeForVersion;
+  }
+
+  /**
    * Builds the {@link GrouperMetadataImpl} based upon provided {@param 
jobModelVersion}
    * and {@param processorNodes}.
    * @param jobModelVersion the most recent jobModelVersion available in the 
zookeeper.
@@ -467,16 +554,7 @@ public class ZkJobCoordinator implements JobCoordinator {
       if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
         debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
           LOG.info("pid=" + processorId + "new version " + version + " of the 
job model got confirmed");
-
-          // read the new Model
-          JobModel jobModel = getJobModel();
-          // start the container with the new model
-          if (coordinatorListener != null) {
-            for (TaskName taskName : 
JobModelUtil.getTaskNamesForProcessor(processorId, jobModel)) {
-              zkUtils.writeTaskLocality(taskName, locationId);
-            }
-            coordinatorListener.onNewJobModel(processorId, jobModel);
-          }
+          onNewJobModel(getJobModel());
         });
       } else {
         if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
@@ -541,21 +619,11 @@ public class ZkJobCoordinator implements JobCoordinator {
 
         LOG.info("Got a notification for new JobModel version. Path = {} 
Version = {}", dataPath, data);
 
-        newJobModel = readJobModelFromMetadataStore(jobModelVersion);
-        LOG.info("pid=" + processorId + ": new JobModel is available. Version 
=" + jobModelVersion + "; JobModel = " + newJobModel);
-
-        if (!newJobModel.getContainers().containsKey(processorId)) {
-          LOG.info("New JobModel does not contain pid={}. Stopping this 
processor. New JobModel: {}",
-              processorId, newJobModel);
-          stop();
-        } else {
-          // stop current work
-          if (coordinatorListener != null) {
-            coordinatorListener.onJobModelExpired();
-          }
-          // update ZK and wait for all the processors to get this new version
-          barrier.join(jobModelVersion, processorId);
-        }
+        latestJobModel = readJobModelFromMetadataStore(jobModelVersion);
+        LOG.info("pid=" + processorId + ": new JobModel is available. Version 
=" + jobModelVersion + "; JobModel = " + latestJobModel);
+        checkAndExpireJobModel(latestJobModel);
+        // update ZK and wait for all the processors to get this new version
+        barrier.join(jobModelVersion, processorId);
       });
     }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java 
b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
index 0e8baae..856b2f8 100644
--- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
@@ -29,11 +29,75 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 
 public class TestJobModelUtil {
+  private static final String PROCESSOR_ID = "testProcessor";
+
+  @Test
+  public void testCompareContainerModel() {
+    assertTrue("Expecting null container models to return true", 
JobModelUtil.compareContainerModel(null, null));
+
+    assertFalse("Expecting false for two different container model",
+        JobModelUtil.compareContainerModel(mock(ContainerModel.class), 
mock(ContainerModel.class)));
+
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    assertTrue("Expecting true for same container model",
+        JobModelUtil.compareContainerModel(mockContainerModel, 
mockContainerModel));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCompareContainerModelForNullProcessor() {
+    JobModelUtil.compareContainerModelForProcessor(null, mock(JobModel.class), 
mock(JobModel.class));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCompareContainerModelForBlankProcessor() {
+    JobModelUtil.compareContainerModelForProcessor("", mock(JobModel.class), 
mock(JobModel.class));
+  }
+
+  /**
+   * Tests the following scenarios as part of the same tests to reduce boiler 
plate and potentially allow
+   * parallel executions of tests for the class.
+   *  1. Test null job models for processors
+   *  2. Test same container models across job models for processors
+   *  3. Test different container models across job models for processors
+   *  4. Test absence of container model vs presence across job models for 
processors
+   *
+   * The approach below leans towards performance (parallel execution) as 
opposed to readability; i.e sharing setup of
+   * the tests and having multiple tests that test individual scenarios. 
Additionally, the individual scenarios being
+   * tested are self explanatory.
+   */
+  @Test
+  public void testCompareContainerModelForProcessor() {
+    final JobModel firstJobModel = mock(JobModel.class);
+    final JobModel secondJobModel = mock(JobModel.class);
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    Map<String, ContainerModel> mockContainerModels = mock(Map.class);
+
+    assertTrue("Null job models should return true for comparison",
+        JobModelUtil.compareContainerModelForProcessor(PROCESSOR_ID, null, 
null));
+
+    when(firstJobModel.getContainers()).thenReturn(mockContainerModels);
+    when(secondJobModel.getContainers()).thenReturn(mockContainerModels);
+    when(mockContainerModels.get(PROCESSOR_ID)).thenReturn(mockContainerModel);
+    assertTrue("Expecting both job model to have same container model for the 
processor",
+        JobModelUtil.compareContainerModelForProcessor(PROCESSOR_ID, 
firstJobModel, secondJobModel));
+
+    when(mockContainerModels.get(PROCESSOR_ID))
+        .thenReturn(mockContainerModel)
+        .thenReturn(mock(ContainerModel.class));
+    assertFalse("Expecting container models to be different across job models 
for the processor",
+        JobModelUtil.compareContainerModelForProcessor(PROCESSOR_ID, 
firstJobModel, secondJobModel));
+
+    when(mockContainerModels.get(PROCESSOR_ID)).thenReturn(null)
+        .thenReturn(mockContainerModel);
+    assertFalse("Expecting container models to be different across job models 
for the processor",
+        JobModelUtil.compareContainerModelForProcessor(PROCESSOR_ID, 
firstJobModel, secondJobModel));
+  }
 
   @Test
   public void testGetTaskNamesForProcessorAbsentInJobModel() {
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index d9d5379..bf681cb 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -26,12 +26,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
@@ -46,32 +48,49 @@ import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
 import org.apache.zookeeper.Watcher;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
-import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyObject;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 public class TestZkJobCoordinator {
+  private static final String PROCESSOR_ID = "testProcessor";
   private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
   private static final String TEST_JOB_MODEL_VERSION = "1";
 
+
   private final Config config;
   private final JobModel jobModel;
   private final MetadataStore zkMetadataStore;
   private final CoordinatorStreamStore coordinatorStreamStore;
 
+  private ZkUtils zkUtils;
+
+  @Before
+  public void setup() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    zkUtils = Mockito.mock(ZkUtils.class);
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+  }
+
   public TestZkJobCoordinator() {
     Map<String, String> configMap = ImmutableMap.of(
         "job.coordinator.system", "kafka",
@@ -91,46 +110,111 @@ public class TestZkJobCoordinator {
   }
 
   @Test
-  public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws 
Exception {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    CountDownLatch jcShutdownLatch = new CountDownLatch(1);
-    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+  public void testCheckAndExpireWithNoChangeInWorkAssignment() {
+    BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod =
+      (ignored, coordinatorListener) -> 
verifyZeroInteractions(coordinatorListener);
 
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    
testNoChangesInWorkAssignmentHelper(ZkJobCoordinator::checkAndExpireJobModel, 
verificationMethod);
+  }
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
-        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore));
-    doReturn(new JobModel(new MapConfig(), new 
HashMap<>())).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
-    doAnswer(new Answer<Void>() {
-      public Void answer(InvocationOnMock invocation) {
-        jcShutdownLatch.countDown();
-        return null;
-      }
-    }).when(zkJobCoordinator).stop();
-
-    final ZkJobCoordinator.ZkJobModelVersionChangeHandler 
zkJobModelVersionChangeHandler = zkJobCoordinator.new 
ZkJobModelVersionChangeHandler(zkUtils);
-    zkJobModelVersionChangeHandler.doHandleDataChange("path", 
TEST_JOB_MODEL_VERSION);
-    verify(zkJobCoordinator, Mockito.atMost(1)).stop();
-    assertTrue("Timed out waiting for JobCoordinator to stop", 
jcShutdownLatch.await(1, TimeUnit.MINUTES));
+  @Test
+  public void testCheckAndExpireWithChangeInWorkAssignment() {
+    final String processorId = "testProcessor";
+    JobCoordinatorListener mockListener = mock(JobCoordinatorListener.class);
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(processorId, new 
MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore);
+
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.checkAndExpireJobModel(mock(JobModel.class));
+    verify(mockListener, times(1)).onJobModelExpired();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testCheckAndExpireJobModelWithNullJobModel() {
+    final String processorId = "testProcessor";
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(processorId, new 
MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore);
+    zkJobCoordinator.checkAndExpireJobModel(null);
   }
 
   @Test
-  public void 
testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+  public void testOnNewJobModelWithChangeInWorkAssignment() {
+    final TaskName taskName = new TaskName("task1");
+    final ContainerModel mockContainerModel = mock(ContainerModel.class);
+    final JobCoordinatorListener mockListener = 
mock(JobCoordinatorListener.class);
+    final JobModel mockJobModel = mock(JobModel.class);
+
+    when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, 
mock(TaskModel.class)));
+    
when(mockJobModel.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID, 
mockContainerModel));
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new 
MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore);
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.onNewJobModel(mockJobModel);
+
+    verify(zkUtils, times(1)).writeTaskLocality(eq(taskName), any());
+    verify(mockListener, times(1)).onNewJobModel(PROCESSOR_ID, mockJobModel);
+    assertEquals("Active job model should be updated with the new job model", 
mockJobModel,
+        zkJobCoordinator.getActiveJobModel());
+  }
 
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+  @Test
+  public void testOnNewJobModelWithNoChangesInWorkAssignment() {
+    BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod = (zkUtils, 
coordinatorListener) -> {
+      verify(zkUtils, times(0)).writeTaskLocality(any(), any());
+      verifyZeroInteractions(coordinatorListener);
+    };
+
+    testNoChangesInWorkAssignmentHelper(ZkJobCoordinator::onNewJobModel, 
verificationMethod);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testOnNewJobModelWithNullJobModel() {
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new 
MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore);
+    zkJobCoordinator.onNewJobModel(null);
+  }
+
+  /**
+   * Test job model version changed changes to work assignment. In this 
scenario, existing work should
+   * be stopped a.k.a processor should stop the container through the 
listener. The processor then proceeds to join
+   * the barrier to notify its acceptance on the proposed job model.
+   */
+  @Test
+  public void testJobModelVersionChangeWithChangeInWorkAssignment() throws 
Exception {
+    BiConsumer<ZkBarrierForVersionUpgrade, JobCoordinatorListener> 
verificationMethod =
+      (barrier, listener) -> {
+        verify(listener, times(1)).onJobModelExpired();
+        verify(barrier, times(1)).join(TEST_JOB_MODEL_VERSION, PROCESSOR_ID);
+      };
+    testJobModelVersionChangeHelper(null, mock(JobModel.class), 
verificationMethod);
+  }
+
+  /**
+   * Test job model version changed without any changes to work assignment. In 
this scenario, existing work should
+   * not be stopped a.k.a processor shouldn't stop the container. However, the 
processor proceeds to join the barrier
+   * to notify its acceptance on the proposed job model.
+   */
+  @Test
+  public void testJobModelVersionChangeWithNoChangeInWorkAssignment() throws 
Exception {
+    final JobModel jobModel = mock(JobModel.class);
+    BiConsumer<ZkBarrierForVersionUpgrade, JobCoordinatorListener> 
verificationMethod =
+      (barrier, listener) -> {
+        verifyZeroInteractions(listener);
+        verify(barrier, times(1)).join(TEST_JOB_MODEL_VERSION, PROCESSOR_ID);
+      };
+    testJobModelVersionChangeHelper(jobModel, jobModel, verificationMethod);
+  }
+
+  @Test
+  public void 
testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new 
MetricsRegistryMap());
@@ -145,19 +229,12 @@ public class TestZkJobCoordinator {
   }
 
   @Test
-  public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+  public void testZookeeperSessionMetricsAreUpdatedCorrectly() {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new 
MetricsRegistryMap());
@@ -178,18 +255,11 @@ public class TestZkJobCoordinator {
 
   @Test
   public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore));
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -202,18 +272,11 @@ public class TestZkJobCoordinator {
 
   @Test
   public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore));
 
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
@@ -230,18 +293,11 @@ public class TestZkJobCoordinator {
 
   @Test
   public void 
testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore));
 
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
@@ -255,18 +311,11 @@ public class TestZkJobCoordinator {
 
   @Test
   public void testLoadMetadataResources() throws IOException {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
     StartpointManager mockStartpointManager = 
Mockito.mock(StartpointManager.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), 
zkUtils,
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator(PROCESSOR_ID, config, new NoOpMetricsRegistry(), zkUtils,
         zkMetadataStore, coordinatorStreamStore));
     
doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager();
 
@@ -285,18 +334,11 @@ public class TestZkJobCoordinator {
 
   @Test
   public void testDoOnProcessorChange() {
-    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
-    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
-    
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
-
-    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
-    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
-    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
 
     StartpointManager mockStartpointManager = 
Mockito.mock(StartpointManager.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", config,
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator(PROCESSOR_ID, config,
         new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore));
     
doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager();
 
@@ -308,4 +350,49 @@ public class TestZkJobCoordinator {
     verify(zkUtils).publishJobModelVersion(anyString(), anyString());
     verify(zkJobCoordinator).loadMetadataResources(eq(jobModel));
   }
+
+  private void 
testNoChangesInWorkAssignmentHelper(BiConsumer<ZkJobCoordinator, JobModel> 
testMethod,
+      BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod) {
+    final JobCoordinatorListener mockListener = 
mock(JobCoordinatorListener.class);
+    final JobModel mockJobModel = mock(JobModel.class);
+
+    ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new 
MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore);
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.setActiveJobModel(mockJobModel);
+
+    testMethod.accept(zkJobCoordinator, mockJobModel);
+    verificationMethod.accept(zkUtils, mockListener);
+  }
+
+  private void testJobModelVersionChangeHelper(JobModel activeJobModel, 
JobModel newJobModel,
+      BiConsumer<ZkBarrierForVersionUpgrade, JobCoordinatorListener> 
verificationMethod) throws InterruptedException {
+    final CountDownLatch completionLatch = new CountDownLatch(1);
+    final JobCoordinatorListener mockListener = 
mock(JobCoordinatorListener.class);
+    final ScheduleAfterDebounceTime mockDebounceTimer = 
mock(ScheduleAfterDebounceTime.class);
+    final ZkBarrierForVersionUpgrade mockBarrier = 
mock(ZkBarrierForVersionUpgrade.class);
+
+    doAnswer(ctx -> {
+      Object[] args = ctx.getArguments();
+      ((Runnable) args[2]).run();
+      completionLatch.countDown();
+      return null;
+    }).when(mockDebounceTimer).scheduleAfterDebounceTime(anyString(), 
anyLong(), anyObject());
+
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+        new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, 
coordinatorStreamStore));
+    zkJobCoordinator.setListener(mockListener);
+    zkJobCoordinator.setActiveJobModel(activeJobModel);
+    zkJobCoordinator.setZkBarrierUpgradeForVersion(mockBarrier);
+    zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    
doReturn(newJobModel).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
+
+    final ZkJobCoordinator.ZkJobModelVersionChangeHandler 
zkJobModelVersionChangeHandler =
+        zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils);
+    zkJobModelVersionChangeHandler.doHandleDataChange("path", 
TEST_JOB_MODEL_VERSION);
+    completionLatch.await(1, TimeUnit.SECONDS);
+
+    verificationMethod.accept(mockBarrier, mockListener);
+  }
 }

Reply via email to