This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 22c307178 SAMZA-2747: Standby bug fixes (#1614)
22c307178 is described below

commit 22c307178dcb0dd035d962694dfd64e18ff760f8
Author: Bharath Kumarasubramanian <[email protected]>
AuthorDate: Tue Jun 7 17:27:06 2022 -0700

    SAMZA-2747: Standby bug fixes (#1614)
    
    * SAMZA-2747: Standby bug fixes
    
    * Fix checkstyle
---
 .../samza/storage/ContainerStorageManager.java     |  93 ++++++++-----
 .../samza/storage/TestContainerStorageManager.java | 153 +++++++++++++++++++++
 2 files changed, 215 insertions(+), 31 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 6bd13cf01..c2e02a4d0 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -193,15 +193,15 @@ public class ContainerStorageManager {
       Clock clock) {
     this.checkpointManager = checkpointManager;
     this.containerModel = containerModel;
-    this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel, 
sideInputSystemStreams);
-    this.sideInputStoreNames = sideInputSystemStreams.keySet();
+    this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel, 
sideInputSystemStreams, changelogSystemStreams);
+    this.sideInputStoreNames = getSideInputStores(containerModel, 
sideInputSystemStreams, changelogSystemStreams);
     this.sideInputTaskLatches = new HashMap<>();
     this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
         .flatMap(m -> m.values().stream())
         .flatMap(Collection::stream)
         .findAny()
         .isPresent();
-    this.changelogSystemStreams = getChangelogSystemStreams(containerModel, 
changelogSystemStreams); // handling standby tasks
+    this.changelogSystemStreams = 
getActiveTaskChangelogSystemStreams(containerModel, changelogSystemStreams);
 
     LOG.info("Starting with changelogSystemStreams = {} taskSideInputStoreSSPs 
= {}", this.changelogSystemStreams, this.taskSideInputStoreSSPs);
 
@@ -214,7 +214,7 @@ public class ContainerStorageManager {
 
     if (loggedStoreBaseDirectory != null && 
loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
       LOG.warn("Logged and non-logged store base directory are configured to 
same path: {}. It is recommended to configure"
-          + "them separately to ensure clean up of non-logged store data 
doesn't accidentally impact logged store data.",
+              + "them separately to ensure clean up of non-logged store data 
doesn't accidentally impact logged store data.",
           loggedStoreBaseDirectory);
     }
 
@@ -307,36 +307,17 @@ public class ContainerStorageManager {
   }
 
   /**
-   * Add all sideInputs to a map of maps, indexed first by taskName, then by 
sideInput store name.
-   *
-   * @param containerModel the containerModel to use
-   * @param sideInputSystemStreams the map of store to sideInput system stream
-   * @return taskSideInputSSPs map
-   */
-  private Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
getTaskSideInputSSPs(ContainerModel containerModel, Map<String, 
Set<SystemStream>> sideInputSystemStreams) {
-    Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = 
new HashMap<>();
-
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      sideInputSystemStreams.keySet().forEach(storeName -> {
-        Set<SystemStreamPartition> taskSideInputs = 
taskModel.getSystemStreamPartitions().stream().filter(ssp -> 
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
-        taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
-      });
-    });
-    return taskSideInputSSPs;
-  }
-
-  /**
-   * For each standby task, we remove its changeLogSSPs from changelogSSP map 
and add it to the task's taskSideInputSSPs.
-   * The task's sideInputManager will consume and restore these as well.
+   * Remove changeLogSSPs that are associated with standby tasks from 
changelogSSP map and only return changelogSSPs
+   * associated with the active tasks.
+   * The standby changelogs will be consumed and restored as side inputs.
    *
    * @param containerModel the container's model
    * @param changelogSystemStreams the passed in set of changelogSystemStreams
    * @return A map of changeLogSSP to storeName across all tasks, assuming no 
two stores have the same changelogSSP
    */
-  private Map<String, SystemStream> getChangelogSystemStreams(ContainerModel 
containerModel,
+  @VisibleForTesting
+  Map<String, SystemStream> getActiveTaskChangelogSystemStreams(ContainerModel 
containerModel,
       Map<String, SystemStream> changelogSystemStreams) {
-
     if (MapUtils.invertMap(changelogSystemStreams).size() != 
changelogSystemStreams.size()) {
       throw new SamzaException("Two stores cannot have the same changelog 
system-stream");
     }
@@ -348,11 +329,9 @@ public class ContainerStorageManager {
     );
 
     getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
-      taskSideInputStoreSSPs.putIfAbsent(taskName, new HashMap<>());
       changelogSystemStreams.forEach((storeName, systemStream) -> {
         SystemStreamPartition ssp = new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition());
         changelogSSPToStore.remove(ssp);
-        taskSideInputStoreSSPs.get(taskName).put(storeName, 
Collections.singleton(ssp));
       });
     });
 
@@ -361,6 +340,58 @@ public class ContainerStorageManager {
         .collect(Collectors.toMap(Map.Entry::getKey, x -> 
x.getValue().getSystemStream()));
   }
 
+  /**
+   * Fetch the side input stores. For active containers, the stores correspond 
to the side inputs and for standbys, they
+   * include the durable stores.
+   * @param containerModel the container's model
+   * @param sideInputSystemStreams the map of store to side input system 
streams
+   * @param changelogSystemStreams the map of store to changelog system streams
+   * @return A set of side input stores
+   */
+  @VisibleForTesting
+  Set<String> getSideInputStores(ContainerModel containerModel,
+      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, 
SystemStream> changelogSystemStreams) {
+    // add all the side input stores by default regardless of active vs standby
+    Set<String> sideInputStores = new 
HashSet<>(sideInputSystemStreams.keySet());
+
+    // In case of standby tasks, we treat the stores that have changelogs as 
side input stores for bootstrapping state
+    if (getTasks(containerModel, TaskMode.Standby).size() > 0) {
+      sideInputStores.addAll(changelogSystemStreams.keySet());
+    }
+    return sideInputStores;
+  }
+
+  /**
+   * Add all sideInputs to a map of maps, indexed first by taskName, then by 
sideInput store name.
+   *
+   * @param containerModel the containerModel to use
+   * @param sideInputSystemStreams the map of store to sideInput system stream
+   * @param changelogSystemStreams the map of store to changelog system stream
+   * @return taskSideInputSSPs map
+   */
+  @VisibleForTesting
+  Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
getTaskSideInputSSPs(ContainerModel containerModel,
+      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, 
SystemStream> changelogSystemStreams) {
+    Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = 
new HashMap<>();
+
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
+      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+      sideInputSystemStreams.keySet().forEach(storeName -> {
+        Set<SystemStreamPartition> taskSideInputs = 
taskModel.getSystemStreamPartitions().stream().filter(ssp -> 
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
+        taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
+      });
+    });
+
+    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
+      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+      changelogSystemStreams.forEach((storeName, systemStream) -> {
+        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition());
+        taskSideInputSSPs.get(taskName).put(storeName, 
Collections.singleton(ssp));
+      });
+    });
+
+    return taskSideInputSSPs;
+  }
 
   /**
    *  Creates SystemConsumer objects for store restoration, creating one 
consumer per system.
@@ -488,7 +519,7 @@ public class ContainerStorageManager {
   // Helper method to filter active Tasks from the container model
   private static Map<TaskName, TaskModel> getTasks(ContainerModel 
containerModel, TaskMode taskMode) {
     return containerModel.getTasks().entrySet().stream()
-            .filter(x -> 
x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
+        .filter(x -> 
x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
   }
 
   /**
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 3e3e01dec..eed7d5442 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -46,6 +46,7 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.serializers.Serde;
@@ -68,6 +69,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.collection.JavaConverters;
 
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 
@@ -85,6 +87,7 @@ public class TestContainerStorageManager {
   private Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
   private SamzaContainerMetrics samzaContainerMetrics;
   private Map<TaskName, TaskModel> tasks;
+  private StandbyTestContext testContext;
 
   private volatile int systemConsumerCreationCount;
   private volatile int systemConsumerStartCount;
@@ -266,6 +269,7 @@ public class TestContainerStorageManager {
         DEFAULT_STORE_BASE_DIR,
         null,
         new SystemClock());
+    this.testContext = new StandbyTestContext();
   }
 
   @Test
@@ -504,4 +508,153 @@ public class TestContainerStorageManager {
     Assert.assertEquals(ImmutableSet.of("storeName0"),
         factoriesToStores.get("factory2"));
   }
+
+  @Test
+  public void getActiveTaskChangelogSystemStreams() {
+    Map<String, SystemStream> storeToChangelogSystemStreams =
+        
containerStorageManager.getActiveTaskChangelogSystemStreams(testContext.standbyContainerModel,
+            testContext.storesToSystemStreams);
+
+    assertEquals("Standby container should have no active change log", 
Collections.emptyMap(),
+        storeToChangelogSystemStreams);
+  }
+
+  @Test
+  public void 
getActiveTaskChangelogSystemStreamsForActiveAndStandbyContainer() {
+    Map<String, SystemStream> expectedStoreToChangelogSystemStreams =
+        testContext.storesToSystemStreams;
+    Map<String, SystemStream> storeToChangelogSystemStreams = 
containerStorageManager.getActiveTaskChangelogSystemStreams(
+        testContext.activeAndStandbyContainerModel, 
testContext.storesToSystemStreams);
+
+    assertEquals("Active and standby container model should have non empty 
store to changelog mapping",
+        expectedStoreToChangelogSystemStreams, storeToChangelogSystemStreams);
+  }
+
+  @Test
+  public void getActiveTaskChangelogSystemStreamsForStandbyContainer() {
+    Map<String, SystemStream> expectedStoreToChangelogSystemStreams =
+        testContext.storesToSystemStreams;
+    Map<String, SystemStream> storeToChangelogSystemStreams = 
containerStorageManager.getActiveTaskChangelogSystemStreams(
+        testContext.activeContainerModel, testContext.storesToSystemStreams);
+
+    assertEquals("Active container model should have non empty store to 
changelog mapping",
+        expectedStoreToChangelogSystemStreams, storeToChangelogSystemStreams);
+  }
+
+  @Test
+  public void getSideInputStoresForActiveContainer() {
+    Set<String> expectedSideInputStores = testContext.activeStores;
+    Set<String> actualSideInputStores =
+        
containerStorageManager.getSideInputStores(testContext.activeContainerModel,
+            testContext.sideInputStoresToSystemStreams, 
testContext.storesToSystemStreams);
+
+    assertEquals("Mismatch in stores", expectedSideInputStores, 
actualSideInputStores);
+  }
+
+  @Test
+  public void getSideInputStoresForStandbyContainer() {
+    final Set<String> expectedSideInputStores = testContext.standbyStores;
+    Set<String> actualSideInputStores =
+        
containerStorageManager.getSideInputStores(testContext.standbyContainerModel,
+            testContext.sideInputStoresToSystemStreams, 
testContext.storesToSystemStreams);
+
+    assertEquals("Mismatch in side input stores", expectedSideInputStores, 
actualSideInputStores);
+  }
+
+  @Test
+  public void getTaskSideInputSSPsForActiveContainer() {
+    Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
expectedSideInputSSPs = testContext.activeSideInputSSPs;
+    Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs 
=
+        
containerStorageManager.getTaskSideInputSSPs(testContext.activeContainerModel,
+            Collections.emptyMap(), testContext.storesToSystemStreams);
+
+    assertEquals("Mismatch in task name --> store --> SSP mapping", 
expectedSideInputSSPs, actualSideInputSSPs);
+  }
+
+  @Test
+  public void getTaskSideInputSSPsForStandbyContainerWithSideInput() {
+    Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
expectedSideInputSSPs = testContext.standbyWithSideInputSSPs;
+    Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs 
=
+        
containerStorageManager.getTaskSideInputSSPs(testContext.standbyContainerModelWithSideInputs,
+            testContext.sideInputStoresToSystemStreams, 
testContext.storesToSystemStreams);
+
+    assertEquals("Mismatch in task name --> store --> SSP mapping", 
expectedSideInputSSPs, actualSideInputSSPs);
+  }
+
+  @Test
+  public void getTaskSideInputSSPsForStandbyContainerWithoutSideInputs() {
+    Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
expectedSideInputSSPs = testContext.standbyChangelogSSPs;
+    Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs 
=
+        
containerStorageManager.getTaskSideInputSSPs(testContext.standbyContainerModel,
+            Collections.emptyMap(), testContext.storesToSystemStreams);
+
+    assertEquals("Mismatch in task name --> store --> SSP mapping", 
expectedSideInputSSPs, actualSideInputSSPs);
+  }
+
+  /**
+   * A container class to hold test fields and expected state for standby and 
side input related tests
+   */
+  private static class StandbyTestContext {
+    private static final String ACTIVE_TASK = "active-task-1";
+    private static final TaskName ACTIVE_TASK_NAME = new TaskName(ACTIVE_TASK);
+    private static final TaskModel ACTIVE_TASK_MODEL =
+        new TaskModel(ACTIVE_TASK_NAME, Collections.emptySet(), new 
Partition(1));
+
+    private static final String STANDBY_TASK_2 = "standby-task-2";
+    private static final TaskName STANDBY_TASK_NAME_2 = new 
TaskName(STANDBY_TASK_2);
+    private static final Set<SystemStreamPartition> STANDBY_TASK_INPUT_SSP =
+        ImmutableSet.of(new SystemStreamPartition("test", "side-input-stream", 
new Partition(2)));
+    private static final TaskModel STANDBY_TASK_MODEL_WITH_SIDE_INPUT =
+        new TaskModel(STANDBY_TASK_NAME_2, STANDBY_TASK_INPUT_SSP, new 
Partition(0), TaskMode.Standby);
+
+    private static final String STANDBY_TASK = "standby-task";
+    private static final TaskName STANDBY_TASK_NAME = new 
TaskName(STANDBY_TASK);
+    private static final SystemStreamPartition STANDBY_CHANGELOG_SSP =
+        new SystemStreamPartition("test", "stream", new Partition(0));
+    private static final TaskModel STANDBY_TASK_MODEL =
+        new TaskModel(STANDBY_TASK_NAME, Collections.emptySet(), new 
Partition(0), TaskMode.Standby);
+
+    private static final String SIDE_INPUT_STORE = "side-input-store";
+    private static final SystemStream SIDE_INPUT_SYSTEM_STREAM = new 
SystemStream("test", "side-input-stream");
+    private static final String TEST_STORE = "test-store";
+    private static final SystemStream TEST_SYSTEM_STREAM = new 
SystemStream("test", "stream");
+
+    private final ContainerModel activeContainerModel;
+    private final ContainerModel activeAndStandbyContainerModel;
+    private final ContainerModel standbyContainerModel;
+    private final ContainerModel standbyContainerModelWithSideInputs;
+
+    private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
activeSideInputSSPs;
+    private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
standbyChangelogSSPs;
+    private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
standbyWithSideInputSSPs;
+
+    private final Map<String, SystemStream> storesToSystemStreams;
+    private final Map<String, Set<SystemStream>> 
sideInputStoresToSystemStreams;
+    private final Set<String> activeStores;
+    private final Set<String> standbyStores;
+
+    public StandbyTestContext() {
+      Map<TaskName, TaskModel> activeTasks = ImmutableMap.of(ACTIVE_TASK_NAME, 
ACTIVE_TASK_MODEL);
+      Map<TaskName, TaskModel> standbyTasks = 
ImmutableMap.of(STANDBY_TASK_NAME, STANDBY_TASK_MODEL);
+
+      activeContainerModel = new ContainerModel("active-container-model", 
activeTasks);
+      activeAndStandbyContainerModel = new 
ContainerModel("active-standby-container-model",
+          ImmutableMap.<TaskName, 
TaskModel>builder().putAll(activeTasks).putAll(standbyTasks).build());
+      standbyContainerModel = new ContainerModel("standby-container-model", 
standbyTasks);
+      standbyContainerModelWithSideInputs = new 
ContainerModel("standby-container-with-side-input",
+          ImmutableMap.of(STANDBY_TASK_NAME_2, 
STANDBY_TASK_MODEL_WITH_SIDE_INPUT));
+
+      activeStores = ImmutableSet.of(SIDE_INPUT_STORE);
+      standbyStores = ImmutableSet.of(SIDE_INPUT_STORE, TEST_STORE);
+
+      sideInputStoresToSystemStreams = ImmutableMap.of(SIDE_INPUT_STORE, 
ImmutableSet.of(SIDE_INPUT_SYSTEM_STREAM));
+      storesToSystemStreams = ImmutableMap.of(TEST_STORE, TEST_SYSTEM_STREAM);
+
+      activeSideInputSSPs = ImmutableMap.of(ACTIVE_TASK_NAME, 
Collections.emptyMap());
+      standbyChangelogSSPs =
+          ImmutableMap.of(STANDBY_TASK_NAME, ImmutableMap.of(TEST_STORE, 
ImmutableSet.of(STANDBY_CHANGELOG_SSP)));
+      standbyWithSideInputSSPs = ImmutableMap.of(STANDBY_TASK_NAME_2,
+          ImmutableMap.of(TEST_STORE, ImmutableSet.of(STANDBY_CHANGELOG_SSP), 
SIDE_INPUT_STORE, STANDBY_TASK_INPUT_SSP));
+    }
+  }
 }

Reply via email to