This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 86932b9 SAMZA-2633: Rolling upgrades cause downtime to upgraded
processors for the entire deployment window (#1484)
86932b9 is described below
commit 86932b9c36a7e69cd0189e522835bb2af6bab634
Author: mynameborat <[email protected]>
AuthorDate: Fri Apr 2 17:17:06 2021 -0700
SAMZA-2633: Rolling upgrades cause downtime to upgraded processors for the
entire deployment window (#1484)
Description:
During rolling upgrades, the current debounce timer gets extended every
time when there is a quorum change notification. As a result, processors that
were upgraded earlier in the deployment window remain unavailable waiting for
work assignment. In some scenarios, this cause processors to be unavailable for
20 minutes or so depending on the size of the quorum and the debounce time
configuration. Refer to SAMZA-2633 for more information.
Changes:
Optimize the leader workflow to skip rebalance if there is no changes to
work assignment
Make processors start with most recent agreed job model on startup
Leader persists the active job model version in ZK to enable change [2]
Introduce config for applications to opt-in for the optimization
Usage Instructions:
Set job.coordinator.zk.enable-startup-with-active-job-model to true as part
of the application configuration to enable processor use the recent active job
model during startup and also enable leader to skip rebalances if the work
assignment remains the same.
---
.../versioned/jobs/samza-configurations.md | 1 +
.../java/org/apache/samza/config/ZkConfig.java | 5 +
.../org/apache/samza/job/model/JobModelUtil.java | 17 ++--
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 93 ++++++++++++++++-
.../java/org/apache/samza/zk/ZkKeyBuilder.java | 12 ++-
.../src/main/java/org/apache/samza/zk/ZkUtils.java | 36 +++++++
.../apache/samza/job/model/TestJobModelUtil.java | 27 +++--
.../org/apache/samza/zk/TestZkJobCoordinator.java | 111 ++++++++++++++++++++-
8 files changed, 281 insertions(+), 21 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 96e994a..596dc52 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -373,6 +373,7 @@ Samza supports both standalone and clustered
([YARN](yarn-jobs.html)) [deploymen
|job.coordinator.zk.connection.timeout.ms|60000|Zookeeper connection timeout
in milliseconds. Zk connection timeout controls how long client tries to
connect to ZK server before giving up.|
|job.coordinator.zk.consensus.timeout.ms|40000|Zookeeper-based coordination.
How long each processor will wait for all the processors to report acceptance
of the new job model before rolling back.|
|job.debounce.time.ms|20000|Zookeeper-based coordination. How long the Leader
processor will wait before recalculating the JobModel on change of registered
processors.|
+|job.coordinator.zk.enable-startup-with-active-job-model|false|Enable stream
processors to run with the active job model version on startup without waiting
for leader to trigger rebalance. It is useful in scenarios where processors
leave the quorum and comeback within debounce time and the work assignment for
new quorum remains unchanged. If disabled, processors will wait for leader to
generate and notify work assignment.
### <a name="metrics"></a>[6. Metrics](#metrics)
|Name|Default|Description|
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
index 21bfe76..a8788fe 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
@@ -20,6 +20,7 @@
package org.apache.samza.config;
public class ZkConfig extends MapConfig {
+ public static final String STARTUP_WITH_ACTIVE_JOB_MODEL =
"job.coordinator.zk.enable-startup-with-active-job-model";
// Connection string for ZK, format: :<hostname>:<port>,..."
public static final String ZK_CONNECT = "job.coordinator.zk.connect";
public static final String ZK_SESSION_TIMEOUT_MS =
"job.coordinator.zk.session.timeout.ms";
@@ -34,6 +35,10 @@ public class ZkConfig extends MapConfig {
super(config);
}
+ public boolean getEnableStartupWithActiveJobModel() {
+ return getBoolean(STARTUP_WITH_ACTIVE_JOB_MODEL, false);
+ }
+
public String getZkConnect() {
if (!containsKey(ZK_CONNECT)) {
throw new ConfigException("Missing " + ZK_CONNECT + " config!");
diff --git
a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index f1d6073..b529f7f 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -18,12 +18,12 @@
*/
package org.apache.samza.job.model;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -138,18 +138,17 @@ public class JobModelUtil {
return false;
}
- return compareContainerModel(first.getContainers().get(processorId),
second.getContainers().get(processorId));
+ return Objects.equals(first.getContainers().get(processorId),
second.getContainers().get(processorId));
}
/**
- * Helper method to compare the two input {@link ContainerModel}s.
- * @param first first container model
- * @param second second container model
- * @return true - if two input {@link ContainerModel} are equal
+ * Compares the {@link ContainerModel}s across the two {@link JobModel}s.
+ * @param first first job model
+ * @param second second job model
+ * @return true - if {@link ContainerModel}s are the same across {@link
JobModel}s
* false - otherwise
*/
- @VisibleForTesting
- static boolean compareContainerModel(ContainerModel first, ContainerModel
second) {
+ public static boolean compareContainerModels(JobModel first, JobModel
second) {
if (first == second) {
return true;
}
@@ -158,7 +157,7 @@ public class JobModelUtil {
return false;
}
- return first.equals(second);
+ return Objects.equals(first.getContainers(), second.getContainers());
}
private static String getJobModelKey(String version) {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 102f357..bc24752 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -94,12 +94,14 @@ public class ZkJobCoordinator implements JobCoordinator {
**/
private static final String ON_ZK_CLEANUP = "OnCleanUp";
+ // Action name when the processor starts with last agreed job model upon
start
+ static final String START_WORK_WITH_LAST_ACTIVE_JOB_MODEL =
"StartWorkWithLastActiveJobModel";
+
private final ZkUtils zkUtils;
private final String processorId;
private final Config config;
private final ZkJobCoordinatorMetrics metrics;
- private final ZkLeaderElector leaderElector;
private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false);
private final StreamMetadataCache streamMetadataCache;
private final SystemAdmins systemAdmins;
@@ -117,6 +119,7 @@ public class ZkJobCoordinator implements JobCoordinator {
private boolean hasLoadedMetadataResources = false;
private String cachedJobModelVersion = null;
private ZkBarrierForVersionUpgrade barrier;
+ private ZkLeaderElector leaderElector;
@VisibleForTesting
ZkSessionMetrics zkSessionMetrics;
@@ -163,12 +166,22 @@ public class ZkJobCoordinator implements JobCoordinator {
public void start() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
zkUtils.validateZkVersion();
- zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(),
keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(),
keyBuilder.getTaskLocalityPath()});
+ zkUtils.validatePaths(new String[]{
+ keyBuilder.getProcessorsPath(),
+ keyBuilder.getJobModelVersionPath(),
+ keyBuilder.getActiveJobModelVersionPath(),
+ keyBuilder.getJobModelPathPrefix(),
+ keyBuilder.getTaskLocalityPath()});
this.jobModelMetadataStore.init();
systemAdmins.start();
leaderElector.tryBecomeLeader();
zkUtils.subscribeToJobModelVersionChange(new
ZkJobModelVersionChangeHandler(zkUtils));
+
+ if (new ZkConfig(config).getEnableStartupWithActiveJobModel()) {
+
debounceTimer.scheduleAfterDebounceTime(START_WORK_WITH_LAST_ACTIVE_JOB_MODEL,
0,
+ this::startWorkWithLastActiveJobModel);
+ }
}
@Override
@@ -274,6 +287,21 @@ public class ZkJobCoordinator implements JobCoordinator {
LOG.info("Generating new JobModel with processors: {}.",
currentProcessorIds);
JobModel newJobModel = generateNewJobModel(processorNodes);
+ /*
+ * Leader skips the rebalance even if there are changes in the quorum as
long as the work assignment remains the same
+ * across all the processors. The optimization is useful in the following
scenarios
+ * 1. The processor in the quorum restarts within the debounce window.
Originally, this would trigger rebalance
+ * across the processors stopping and starting their work assignment
which is detrimental to availability of
+ * the system. e.g. common scenario during rolling upgrades
+ * 2. Processors in the quorum which don't have work assignment and
their failures/restarts don't impact the
+ * quorum.
+ */
+ if (new ZkConfig(config).getEnableStartupWithActiveJobModel() &&
+ JobModelUtil.compareContainerModels(newJobModel, activeJobModel)) {
+ LOG.info("Skipping rebalance since there are no changes in work
assignment");
+ return;
+ }
+
// Create checkpoint and changelog streams if they don't exist
if (!hasLoadedMetadataResources) {
loadMetadataResources(newJobModel);
@@ -412,6 +440,8 @@ public class ZkJobCoordinator implements JobCoordinator {
return;
}
+ LOG.info("Checking for work assignment changes for processor {} between
active job model {} and new job model {}",
+ processorId, activeJobModel, newJobModel);
if (JobModelUtil.compareContainerModelForProcessor(processorId,
activeJobModel, newJobModel)) {
LOG.info("Skipping job model expiration for processor {} due to no
change in work assignment.", processorId);
} else {
@@ -475,11 +505,60 @@ public class ZkJobCoordinator implements JobCoordinator {
}
@VisibleForTesting
+ void setDebounceTimer(ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
+ debounceTimer = scheduleAfterDebounceTime;
+ }
+
+ @VisibleForTesting
+ void setLeaderElector(ZkLeaderElector zkLeaderElector) {
+ leaderElector = zkLeaderElector;
+ }
+
+ @VisibleForTesting
void setZkBarrierUpgradeForVersion(ZkBarrierForVersionUpgrade
barrierUpgradeForVersion) {
barrier = barrierUpgradeForVersion;
}
/**
+ * Start the processor with the last known active job model. It is safe to
start with last active job model
+ * version in all the scenarios unless in the event of concurrent rebalance.
We define safe as a way to ensure that no
+ * two processors in the quorum have overlapping work assignments.
+ * In case of a concurrent rebalance there two scenarios
+ * 1. Job model version update happens before processor registration
+ * 2. Job model version update happens after processor registration
+ * ZK guarantees FIFO order for client operations, the processor is
guaranteed to see all the state up until its
+ * own registration.
+ * For scenario 1, due to above guarantee, the processor will not start with
old assignment due to mismatch in
+ * latest vs last active. (If there is no mismatch, the scenario reduces to
one of the safe scenarios)
+ *
+ * For scenario 2, it is possible for the processor to not see the writes by
the leader about job model version change
+ * but will eventually receive a notification on the job model version
change and act on it (potentially stop
+ * the work assignment if its not part of the job model).
+ *
+ * In the scenario where the processor doesn't start with last active job
model version, it will continue to follow
+ * the old protocol where leader should get notified about the processor
registration and potentially trigger
+ * rebalance and notify about changes in work assignment after consensus.
+ * TODO: SAMZA-2635: Rebalances in standalone doesn't handle DAG changes for
restarted processor
+ */
+ @VisibleForTesting
+ void startWorkWithLastActiveJobModel() {
+ LOG.info("Starting the processor with the recent active job model");
+ String lastActiveJobModelVersion = zkUtils.getLastActiveJobModelVersion();
+ String latestJobModelVersion = zkUtils.getJobModelVersion();
+
+ if (lastActiveJobModelVersion != null &&
lastActiveJobModelVersion.equals(latestJobModelVersion)) {
+ final JobModel lastActiveJobModel =
readJobModelFromMetadataStore(lastActiveJobModelVersion);
+
+ /*
+ * TODO: SAMZA-2645: Allow onNewJobModel as a valid state transition.
Due to this limitation, we are forced
+ * to invoke onJobModelExpired even if there is nothing to expire.
+ */
+ checkAndExpireJobModel(lastActiveJobModel);
+ onNewJobModel(lastActiveJobModel);
+ }
+ }
+
+ /**
* Builds the {@link GrouperMetadataImpl} based upon provided {@param
jobModelVersion}
* and {@param processorNodes}.
* @param jobModelVersion the most recent jobModelVersion available in the
zookeeper.
@@ -554,6 +633,16 @@ public class ZkJobCoordinator implements JobCoordinator {
if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
LOG.info("pid=" + processorId + "new version " + version + " of the
job model got confirmed");
+ /*
+ * Publish the active job model version separately to denote that
the job model version is agreed by
+ * the quorum. The active job model version is used by processors as
an optimization during their startup
+ * so that processors can start with the work assignment that was
agreed by the quorum and allows the
+ * leader to skip the rebalance if there is no change in the work
assignment for the quorum across
+ * quorum changes (processors leaving or joining)
+ */
+ if (leaderElector.amILeader()) {
+ zkUtils.publishActiveJobModelVersion(version);
+ }
onNewJobModel(getJobModel());
});
} else {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 60a1e63..fea54ce 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -27,6 +27,7 @@ import com.google.common.base.Strings;
* - /
* |- groupId/
* |- JobModelGeneration/
+ * |- activeJobModelVersion (data contains the most recent active
job model version)
* |- jobModelVersion (data contains the version)
* |- jobModelUpgradeBarrier/ (contains barrier related data)
* |- jobModels/
@@ -42,7 +43,6 @@ import com.google.common.base.Strings;
* This class provides helper methods to easily generate/parse the path in the
ZK hierarchy.
*/
public class ZkKeyBuilder {
-
static final String PROCESSORS_PATH = "processors";
static final String JOBMODEL_GENERATION_PATH = "jobModelGeneration";
static final String JOB_MODEL_UPGRADE_BARRIER_PATH =
"jobModelUpgradeBarrier";
@@ -89,6 +89,16 @@ public class ZkKeyBuilder {
return String.format("%s/%s/jobModelVersion", getRootPath(),
JOBMODEL_GENERATION_PATH);
}
+ /**
+ * Denotes the path where the most recent active job model version is
stored. The version of the job model is the
+ * most recent agreed upon version by the quorum. It differs from the
<i>jobModelVersion</i> path which may
+ * have a newer version of job model published by the leader in during
rebalance before consensus is achieved.
+ * @return the path where most recent active job model is stored
+ */
+ String getActiveJobModelVersionPath() {
+ return String.format("%s/%s/activeJobModelVersion", getRootPath(),
JOBMODEL_GENERATION_PATH);
+ }
+
String getJobModelPathPrefix() {
return String.format("%s/%s/jobModels", getRootPath(),
JOBMODEL_GENERATION_PATH);
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 72e8f8d..3084be8 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -503,6 +503,16 @@ public class ZkUtils {
}
/**
+ * Read the version of the job model that is the most recent agreed version
by the quorum.
+ * @return most recent active job model version
+ */
+ public String getLastActiveJobModelVersion() {
+ String lastActiveJobModelVersion =
zkClient.readData(keyBuilder.getActiveJobModelVersionPath(), true);
+ metrics.reads.inc();
+ return lastActiveJobModelVersion;
+ }
+
+ /**
* Generates the next JobModel version that should be used by a processor
group in a rebalancing phase
* for coordination.
* @param currentJobModelVersion the current version of JobModel.
@@ -555,6 +565,32 @@ public class ZkUtils {
"(actual data version after update = " + stat.getVersion() + ")");
}
+ /**
+ * Publish to most recent job model version that is agreed by the quorum.
+ * @param version active job model version
+ */
+ public void publishActiveJobModelVersion(String version) {
+ try {
+ zkClient.writeData(keyBuilder.getActiveJobModelVersionPath(), version);
+ metrics.writes.inc();
+ LOG.info("Published the active job model version = {} to zookeeper
successfully.", version);
+ } catch (Exception e) {
+ /*
+ * Failure to write the most recent job model version has the following
implications
+ * 1. Processors no longer benefit from the optimization to start the
container upon restarts based
+ * on the most recent active job model. It is useful in scenarios
where processors leave the quorum and
+ * comeback within debounce time and the work assignment for new
quorum remains unchanged.
+ * 2. During rolling upgrades, processors that are upgraded initially
will wait for the min(deployment
+ * window, T + debounce time) where T is the time at which the last
change notification of the quorum was received
+ * by the leader.
+ *
+ * That said, failures don't impact correctness and is better to
continue processing as opposed to bringing down
+ * the processor as a fatal error.
+ */
+ LOG.warn("Failed to persist the active job model version = {} due to
{}", version, e);
+ }
+ }
+
// validate that Zk protocol currently used by the job is the same as in
this participant
public void validateZkVersion() {
diff --git
a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
index 856b2f8..2e9b9e4 100644
--- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
@@ -38,15 +38,28 @@ public class TestJobModelUtil {
private static final String PROCESSOR_ID = "testProcessor";
@Test
- public void testCompareContainerModel() {
- assertTrue("Expecting null container models to return true",
JobModelUtil.compareContainerModel(null, null));
+ public void testCompareContainerModels() {
+ final ContainerModel mockContainerModel = mock(ContainerModel.class);
+ final JobModel first = mock(JobModel.class);
+ final JobModel second = mock(JobModel.class);
+ final String testProcessor2 = "testProcessor2";
- assertFalse("Expecting false for two different container model",
- JobModelUtil.compareContainerModel(mock(ContainerModel.class),
mock(ContainerModel.class)));
+ when(first.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID,
mockContainerModel));
+ when(second.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID,
mockContainerModel));
+
+ assertTrue("Expecting null job models to return true",
JobModelUtil.compareContainerModels(null, null));
+ assertTrue("Expecting true for job model with same container model",
+ JobModelUtil.compareContainerModels(first, second));
+
+ when(second.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID,
mock(ContainerModel.class)));
+ assertFalse("Expecting false for two different job model",
+ JobModelUtil.compareContainerModels(first, second));
+
+ when(second.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID,
mockContainerModel, testProcessor2,
+ mock(ContainerModel.class)));
+ assertFalse("Expecting false for two different job model",
+ JobModelUtil.compareContainerModels(first, second));
- final ContainerModel mockContainerModel = mock(ContainerModel.class);
- assertTrue("Expecting true for same container model",
- JobModelUtil.compareContainerModel(mockContainerModel,
mockContainerModel));
}
@Test(expected = IllegalArgumentException.class)
diff --git
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index bf681cb..ce81070 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -32,6 +32,7 @@ import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.MetadataResourceUtil;
@@ -53,6 +54,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
@@ -68,11 +70,11 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TestZkJobCoordinator {
+ private static final String LATEST_JOB_MODEL_VERSION = "2";
private static final String PROCESSOR_ID = "testProcessor";
private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
private static final String TEST_JOB_MODEL_VERSION = "1";
-
private final Config config;
private final JobModel jobModel;
private final MetadataStore zkMetadataStore;
@@ -95,7 +97,8 @@ public class TestZkJobCoordinator {
Map<String, String> configMap = ImmutableMap.of(
"job.coordinator.system", "kafka",
"job.name", "test-job",
- "systems.kafka.samza.factory",
"org.apache.samza.system.MockSystemFactory");
+ "systems.kafka.samza.factory",
"org.apache.samza.system.MockSystemFactory",
+ ZkConfig.STARTUP_WITH_ACTIVE_JOB_MODEL, "true");
config = new MapConfig(configMap);
Set<SystemStreamPartition> ssps = ImmutableSet.of(
@@ -310,6 +313,91 @@ public class TestZkJobCoordinator {
}
@Test
+ public void testStartWithActiveJobModelDisabled() {
+ final ScheduleAfterDebounceTime mockDebounceTimer =
mock(ScheduleAfterDebounceTime.class);
+ ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new
MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore,
coordinatorStreamStore);
+ zkJobCoordinator.setLeaderElector(mock(ZkLeaderElector.class));
+ zkJobCoordinator.setDebounceTimer(mockDebounceTimer);
+
+ zkJobCoordinator.start();
+
+ verifyZeroInteractions(mockDebounceTimer);
+ }
+
+ @Test
+ public void testStartWithActiveJobModelEnabled() {
+ final ScheduleAfterDebounceTime mockDebounceTimer =
mock(ScheduleAfterDebounceTime.class);
+ ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID,
config, new NoOpMetricsRegistry(), zkUtils,
+ zkMetadataStore, coordinatorStreamStore);
+ zkJobCoordinator.setLeaderElector(mock(ZkLeaderElector.class));
+ zkJobCoordinator.setDebounceTimer(mockDebounceTimer);
+
+ zkJobCoordinator.start();
+
+ verify(mockDebounceTimer, times(1)).scheduleAfterDebounceTime(
+ eq(ZkJobCoordinator.START_WORK_WITH_LAST_ACTIVE_JOB_MODEL),
+ anyLong(),
+ any());
+ }
+
+ @Test
+ public void testStartWorkWithLastActiveJobModel() {
+ final TaskName taskName = new TaskName("task1");
+ final ContainerModel mockContainerModel = mock(ContainerModel.class);
+ final JobCoordinatorListener mockListener =
mock(JobCoordinatorListener.class);
+ final JobModel mockJobModel = mock(JobModel.class);
+
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator(PROCESSOR_ID, new MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore,
coordinatorStreamStore));
+
+ when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(taskName,
mock(TaskModel.class)));
+
when(mockJobModel.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID,
mockContainerModel));
+
when(zkUtils.getLastActiveJobModelVersion()).thenReturn(TEST_JOB_MODEL_VERSION);
+ when(zkUtils.getJobModelVersion()).thenReturn(TEST_JOB_MODEL_VERSION);
+
doReturn(mockJobModel).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
+
+ zkJobCoordinator.setListener(mockListener);
+ zkJobCoordinator.startWorkWithLastActiveJobModel();
+ verify(mockListener, times(1)).onJobModelExpired();
+ verify(zkUtils, times(1)).writeTaskLocality(eq(taskName), any());
+ verify(mockListener, times(1)).onNewJobModel(PROCESSOR_ID, mockJobModel);
+ assertEquals("Active job model should be updated with the new job model",
mockJobModel,
+ zkJobCoordinator.getActiveJobModel());
+ }
+
+ @Test
+ public void testStartWorkWithLastActiveJobModelShouldNotStartContainer() {
+ final JobCoordinatorListener mockListener =
mock(JobCoordinatorListener.class);
+ ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new
MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore,
coordinatorStreamStore);
+
+ zkJobCoordinator.setListener(mockListener);
+
+
when(zkUtils.getLastActiveJobModelVersion()).thenReturn(TEST_JOB_MODEL_VERSION);
+ when(zkUtils.getJobModelVersion()).thenReturn(LATEST_JOB_MODEL_VERSION);
+
+ zkJobCoordinator.startWorkWithLastActiveJobModel();
+
+ verifyZeroInteractions(mockListener);
+ assertNull("Expected active job model to be null",
zkJobCoordinator.getActiveJobModel());
+ }
+
+ @Test
+ public void
testStartWorkWithLastActiveJobModelWithNullActiveJobModelVersion() {
+ final JobCoordinatorListener mockListener =
mock(JobCoordinatorListener.class);
+ ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new
MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore,
coordinatorStreamStore);
+
+ zkJobCoordinator.setListener(mockListener);
+
+ zkJobCoordinator.startWorkWithLastActiveJobModel();
+
+ verifyZeroInteractions(mockListener);
+ assertNull("Expected active job model to be null",
zkJobCoordinator.getActiveJobModel());
+ }
+
+ @Test
public void testLoadMetadataResources() throws IOException {
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
@@ -351,6 +439,25 @@ public class TestZkJobCoordinator {
verify(zkJobCoordinator).loadMetadataResources(eq(jobModel));
}
+ @Test
+ public void testDoOnProcessorChangeWithNoChangesToWorkAssignment() {
+ ZkBarrierForVersionUpgrade mockBarrier =
mock(ZkBarrierForVersionUpgrade.class);
+ ScheduleAfterDebounceTime mockDebounceTimer =
mock(ScheduleAfterDebounceTime.class);
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new
ZkJobCoordinator(PROCESSOR_ID, config,
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore,
coordinatorStreamStore));
+ zkJobCoordinator.setActiveJobModel(jobModel);
+ zkJobCoordinator.setDebounceTimer(mockDebounceTimer);
+ zkJobCoordinator.setZkBarrierUpgradeForVersion(mockBarrier);
+
+ doReturn(jobModel).when(zkJobCoordinator).generateNewJobModel(any());
+ zkJobCoordinator.doOnProcessorChange();
+
+ verify(zkUtils, times(0)).publishJobModelVersion(anyString(), anyString());
+ verifyZeroInteractions(mockBarrier);
+ verifyZeroInteractions(mockDebounceTimer);
+ verify(zkJobCoordinator, times(0)).loadMetadataResources(any());
+ }
+
private void
testNoChangesInWorkAssignmentHelper(BiConsumer<ZkJobCoordinator, JobModel>
testMethod,
BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod) {
final JobCoordinatorListener mockListener =
mock(JobCoordinatorListener.class);