This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aea5f7  SAMZA-2632: Processors should only write their own task 
locality to zookeeper (#1473)
2aea5f7 is described below

commit 2aea5f71f9742bf995e88608750a071c44ceade5
Author: mynameborat <[email protected]>
AuthorDate: Fri Mar 12 07:07:27 2021 -0800

    SAMZA-2632: Processors should only write their own task locality to 
zookeeper (#1473)
    
    Problem:
    Processors update the task locality for all the tasks in the job model with 
their own locality regardless of whether the tasks belong to their container 
model or not.
    Description:
    As part of SEP 11, host affinity for standalone was introduced. For this 
feature, we persist the task locality in zookeeper so that subsequent 
rebalances take this locality into account when generating job model. During 
job model consensus, we update the task locality for all the tasks and this 
results in incorrect locality depending on the order of writes.
    Changes:
    Only update the locality for the tasks that belong to the processor.
---
 .../org/apache/samza/job/model/JobModelUtil.java   | 19 ++++++++++
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |  6 +--
 .../apache/samza/job/model/TestJobModelUtil.java   | 43 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 4 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java 
b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
index f9438c3..d356baf 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
@@ -19,11 +19,14 @@
 package org.apache.samza.job.model;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -44,6 +47,22 @@ public class JobModelUtil {
   private static final String JOB_MODEL_GENERATION_KEY = 
"jobModelGeneration/jobModels";
 
   /**
+   * A helper method to fetch the task names associated with the processor 
from the job model.
+   * @param processorId processor for which task names are fetched
+   * @param jobModel job model
+   * @return a set of {@code TaskName} associated with the processor from the 
job model.
+   */
+  public static Set<TaskName> getTaskNamesForProcessor(String processorId, 
JobModel jobModel) {
+    Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
+    Preconditions.checkArgument(StringUtils.isNotBlank(processorId), 
"ProcessorId cannot be empty or null");
+
+    return Optional.ofNullable(jobModel.getContainers().get(processorId))
+        .map(ContainerModel::getTasks)
+        .map(Map::keySet)
+        .orElse(Collections.emptySet());
+  }
+
+  /**
    * Extracts the map of {@link SystemStreamPartition}s to {@link TaskName} 
from the {@link JobModel}
    *
    * @return the extracted map
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 1489f67..6526705 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -472,10 +472,8 @@ public class ZkJobCoordinator implements JobCoordinator {
           JobModel jobModel = getJobModel();
           // start the container with the new model
           if (coordinatorListener != null) {
-            for (ContainerModel containerModel : 
jobModel.getContainers().values()) {
-              for (TaskName taskName : containerModel.getTasks().keySet()) {
-                zkUtils.writeTaskLocality(taskName, locationId);
-              }
+            for (TaskName taskName : 
JobModelUtil.getTaskNamesForProcessor(processorId, jobModel)) {
+              zkUtils.writeTaskLocality(taskName, locationId);
             }
             coordinatorListener.onNewJobModel(processorId, jobModel);
           }
diff --git 
a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java 
b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
index 960bdf9..0e8baae 100644
--- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModelUtil.java
@@ -30,9 +30,52 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 public class TestJobModelUtil {
 
+  @Test
+  public void testGetTaskNamesForProcessorAbsentInJobModel() {
+    JobModel mockJobModel = mock(JobModel.class);
+    when(mockJobModel.getContainers()).thenReturn(mock(Map.class));
+
+    Set<TaskName> taskNames = 
JobModelUtil.getTaskNamesForProcessor("testProcessor", mockJobModel);
+    assertTrue("TaskNames should be empty", taskNames.isEmpty());
+  }
+
+  @Test
+  public void testGetTaskNamesForProcessorPresentInJobModel() {
+    TaskName expectedTaskName = new TaskName("testTaskName");
+    String processorId = "testProcessor";
+    JobModel mockJobModel = mock(JobModel.class);
+    ContainerModel mockContainerModel = mock(ContainerModel.class);
+    Map<String, ContainerModel> mockContainers = mock(Map.class);
+
+    when(mockContainers.get(processorId)).thenReturn(mockContainerModel);
+    
when(mockContainerModel.getTasks()).thenReturn(ImmutableMap.of(expectedTaskName,
 mock(TaskModel.class)));
+    when(mockJobModel.getContainers()).thenReturn(mockContainers);
+
+    Set<TaskName> actualTaskNames = 
JobModelUtil.getTaskNamesForProcessor(processorId, mockJobModel);
+    assertEquals("Expecting TaskNames size = 1", 1, actualTaskNames.size());
+    assertTrue("Expecting testTaskName to be returned", 
actualTaskNames.contains(expectedTaskName));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetTaskNamesForProcessorWithNullJobModel() {
+    JobModelUtil.getTaskNamesForProcessor("processor", null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetTaskNamesForProcessorWithEmptyProcessorId() {
+    JobModelUtil.getTaskNamesForProcessor("", mock(JobModel.class));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetTaskNamesForProcessorWithNullProcessorId() {
+    JobModelUtil.getTaskNamesForProcessor(null, mock(JobModel.class));
+  }
+
   @Test(expected = IllegalArgumentException.class)
   public void testTaskToSystemStreamPartitionsWithNullJobModel() {
     JobModelUtil.getTaskToSystemStreamPartitions(null);

Reply via email to