This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 516a40d SAMZA-2663: Handle job model expiration and new job model
flows for multiple incomplete rebalances (#1528)
516a40d is described below
commit 516a40d8c4eb67cad07a379f28e92fbfb4f459d0
Author: mynameborat <[email protected]>
AuthorDate: Sat Sep 11 10:40:24 2021 -0700
SAMZA-2663: Handle job model expiration and new job model flows for
multiple incomplete rebalances (#1528)
Problem:
As part of SAMZA-2638, we introduced skipping container restart and stops
on no changes to work assignment for processors across rebalances. However, we
only update the active job model with the proposed job model on starting the
container as part of onNewJobModel. This leads to a scenario where the
processor is stopped but the future rebalances assume the container is still
running. More information on scenario below.
Changes:
Track job model expiration
onNewJobModel triggers new job model as long as the active job model has
been expired
Handle no change in work assignment optimization only during
checkJobModelExpired flow.
---
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 18 ++++++-
.../org/apache/samza/zk/TestZkJobCoordinator.java | 56 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index f41ee8c..447143a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -110,6 +110,10 @@ public class ZkJobCoordinator implements JobCoordinator {
private final MetadataStore jobModelMetadataStore;
private final CoordinatorStreamStore coordinatorStreamStore;
+ // It is sufficient for the field to be volatile as the flows that
read/update execute on debounce timer which is single threaded
+ // Choice of atomic boolean is purely for convenience for operations like
compareAndSet to enforce invariant checks.
+ private final AtomicBoolean jobModelExpired = new AtomicBoolean(false);
+
private JobCoordinatorListener coordinatorListener = null;
// denotes the most recent job model agreed by the quorum
private JobModel activeJobModel;
@@ -441,6 +445,7 @@ public class ZkJobCoordinator implements JobCoordinator {
} else {
LOG.info("Work assignment changed for the processor {}. Notifying job
model expiration to coordinator listener", processorId);
coordinatorListener.onJobModelExpired();
+ jobModelExpired.set(true);
}
}
@@ -455,7 +460,7 @@ public class ZkJobCoordinator implements JobCoordinator {
void onNewJobModel(JobModel newJobModel) {
Preconditions.checkNotNull(newJobModel, "JobModel cannot be null. Failing
onNewJobModel");
// start the container with the new model
- if (!JobModelUtil.compareContainerModelForProcessor(processorId,
activeJobModel, newJobModel)) {
+ if (jobModelExpired.compareAndSet(true, false)) {
LOG.info("Work assignment changed for the processor {}. Updating task
locality and notifying coordinator listener", processorId);
if (newJobModel.getContainers().containsKey(processorId)) {
for (TaskName taskName :
JobModelUtil.getTaskNamesForProcessor(processorId, newJobModel)) {
@@ -468,6 +473,7 @@ public class ZkJobCoordinator implements JobCoordinator {
}
} else {
/*
+ * We don't expire the job model if the proposed work assignment is same
as the current work assignment.
* The implication of work assignment remaining the same can be
categorized into
* 1. Processor part of the job model
* 2. Processor not part of the job model.
@@ -499,6 +505,16 @@ public class ZkJobCoordinator implements JobCoordinator {
}
@VisibleForTesting
+ boolean getJobModelExpired() {
+ return jobModelExpired.get();
+ }
+
+ @VisibleForTesting
+ void setJobModelExpired(boolean value) {
+ jobModelExpired.set(value);
+ }
+
+ @VisibleForTesting
void setDebounceTimer(ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
debounceTimer = scheduleAfterDebounceTime;
}
diff --git
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 0ccb8d9..586dfc0 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -54,7 +54,9 @@ import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
@@ -66,6 +68,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
@@ -113,6 +116,58 @@ public class TestZkJobCoordinator {
}
@Test
+ public void testCheckAndExpireWithMultipleRebalances() {
+ final TaskName taskName = new TaskName("task1");
+ final ContainerModel mockContainerModel = mock(ContainerModel.class);
+ final JobCoordinatorListener mockListener =
mock(JobCoordinatorListener.class);
+ final JobModel jobModelVersion1 = mock(JobModel.class);
+ final JobModel jobModelVersion2 = mock(JobModel.class);
+ final JobModel jobModelVersion3 = jobModelVersion1;
+
+ when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(taskName,
mock(TaskModel.class)));
+
when(jobModelVersion3.getContainers()).thenReturn(ImmutableMap.of(PROCESSOR_ID,
mockContainerModel));
+
+ ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new
MapConfig(), new NoOpMetricsRegistry(),
+ zkUtils, zkMetadataStore, coordinatorStreamStore);
+ zkJobCoordinator.setListener(mockListener);
+ zkJobCoordinator.setActiveJobModel(jobModelVersion1);
+
+ /*
+ * The following mimics the scenario where new work assignment(V2) is
proposed by the leader and the work assignment
+ * differs from the active work assignment(V1) and hence results in job
model expiration
+ */
+ zkJobCoordinator.checkAndExpireJobModel(jobModelVersion2);
+
+ verify(mockListener, times(1)).onJobModelExpired();
+ assertTrue("JobModelExpired should be true for work assignment changes",
zkJobCoordinator.getJobModelExpired());
+ assertEquals("Active job model shouldn't be updated", jobModelVersion1,
zkJobCoordinator.getActiveJobModel());
+
+ /*
+ * The following mimics the scenario where leader kicked off another
rebalance where the new work assignment(V3)
+ * is same as the old work assignment(V1) and doesn't trigger job model
expiration. We check the interactions w/
+ * the listener to ensure job model expiration isn't invoked. However, the
previous rebalance should have already
+ * triggered job model expiration and set the job model expired flag to
true
+ */
+ zkJobCoordinator.checkAndExpireJobModel(jobModelVersion1);
+ verifyNoMoreInteractions(mockListener);
+ assertTrue("JobModelExpired should remain unchanged",
zkJobCoordinator.getJobModelExpired());
+ assertEquals("Active job model shouldn't be updated", jobModelVersion1,
zkJobCoordinator.getActiveJobModel());
+
+
+ /*
+ * The following mimics the scenario where the new work assignment(V3)
proposed by the leader is accepted and
+ * on new job model is invoked. Even though the work assignment remains
the same w/ the active job model version,
+ * onNewJobModel is invoked on the listener as an intermediate rebalance
expired the old work assignment(V1)
+ */
+ zkJobCoordinator.onNewJobModel(jobModelVersion3);
+ verify(mockListener, times(1)).onNewJobModel(PROCESSOR_ID,
jobModelVersion3);
+ verify(zkUtils, times(1)).writeTaskLocality(any(), any());
+
+ assertEquals("Active job model should be updated to new job model",
zkJobCoordinator.getActiveJobModel(), jobModelVersion3);
+ assertFalse("JobModelExpired should be set to false after onNewJobModel",
zkJobCoordinator.getJobModelExpired());
+ }
+
+ @Test
public void testCheckAndExpireWithNoChangeInWorkAssignment() {
BiConsumer<ZkUtils, JobCoordinatorListener> verificationMethod =
(ignored, coordinatorListener) ->
verifyZeroInteractions(coordinatorListener);
@@ -155,6 +210,7 @@ public class TestZkJobCoordinator {
ZkJobCoordinator zkJobCoordinator = new ZkJobCoordinator(PROCESSOR_ID, new
MapConfig(),
new NoOpMetricsRegistry(), zkUtils, zkMetadataStore,
coordinatorStreamStore);
zkJobCoordinator.setListener(mockListener);
+ zkJobCoordinator.setJobModelExpired(true);
zkJobCoordinator.onNewJobModel(mockJobModel);
verify(zkUtils, times(1)).writeTaskLocality(eq(taskName), any());