This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 22c307178 SAMZA-2747: Standby bug fixes (#1614)
22c307178 is described below
commit 22c307178dcb0dd035d962694dfd64e18ff760f8
Author: Bharath Kumarasubramanian <[email protected]>
AuthorDate: Tue Jun 7 17:27:06 2022 -0700
SAMZA-2747: Standby bug fixes (#1614)
* SAMZA-2747: Standby bug fixes
* Fix checkstyle
---
.../samza/storage/ContainerStorageManager.java | 93 ++++++++-----
.../samza/storage/TestContainerStorageManager.java | 153 +++++++++++++++++++++
2 files changed, 215 insertions(+), 31 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 6bd13cf01..c2e02a4d0 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -193,15 +193,15 @@ public class ContainerStorageManager {
Clock clock) {
this.checkpointManager = checkpointManager;
this.containerModel = containerModel;
- this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel,
sideInputSystemStreams);
- this.sideInputStoreNames = sideInputSystemStreams.keySet();
+ this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel,
sideInputSystemStreams, changelogSystemStreams);
+ this.sideInputStoreNames = getSideInputStores(containerModel,
sideInputSystemStreams, changelogSystemStreams);
this.sideInputTaskLatches = new HashMap<>();
this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
.flatMap(m -> m.values().stream())
.flatMap(Collection::stream)
.findAny()
.isPresent();
- this.changelogSystemStreams = getChangelogSystemStreams(containerModel,
changelogSystemStreams); // handling standby tasks
+ this.changelogSystemStreams =
getActiveTaskChangelogSystemStreams(containerModel, changelogSystemStreams);
LOG.info("Starting with changelogSystemStreams = {} taskSideInputStoreSSPs
= {}", this.changelogSystemStreams, this.taskSideInputStoreSSPs);
@@ -214,7 +214,7 @@ public class ContainerStorageManager {
if (loggedStoreBaseDirectory != null &&
loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
LOG.warn("Logged and non-logged store base directory are configured to
same path: {}. It is recommended to configure"
- + "them separately to ensure clean up of non-logged store data
doesn't accidentally impact logged store data.",
+ + "them separately to ensure clean up of non-logged store data
doesn't accidentally impact logged store data.",
loggedStoreBaseDirectory);
}
@@ -307,36 +307,17 @@ public class ContainerStorageManager {
}
/**
- * Add all sideInputs to a map of maps, indexed first by taskName, then by
sideInput store name.
- *
- * @param containerModel the containerModel to use
- * @param sideInputSystemStreams the map of store to sideInput system stream
- * @return taskSideInputSSPs map
- */
- private Map<TaskName, Map<String, Set<SystemStreamPartition>>>
getTaskSideInputSSPs(ContainerModel containerModel, Map<String,
Set<SystemStream>> sideInputSystemStreams) {
- Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs =
new HashMap<>();
-
- containerModel.getTasks().forEach((taskName, taskModel) -> {
- taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
- sideInputSystemStreams.keySet().forEach(storeName -> {
- Set<SystemStreamPartition> taskSideInputs =
taskModel.getSystemStreamPartitions().stream().filter(ssp ->
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
- taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
- });
- });
- return taskSideInputSSPs;
- }
-
- /**
- * For each standby task, we remove its changeLogSSPs from changelogSSP map
and add it to the task's taskSideInputSSPs.
- * The task's sideInputManager will consume and restore these as well.
+ * Remove changeLogSSPs that are associated with standby tasks from
changelogSSP map and only return changelogSSPs
+ * associated with the active tasks.
+ * The standby changelogs will be consumed and restored as side inputs.
*
* @param containerModel the container's model
* @param changelogSystemStreams the passed in set of changelogSystemStreams
* @return A map of changeLogSSP to storeName across all tasks, assuming no
two stores have the same changelogSSP
*/
- private Map<String, SystemStream> getChangelogSystemStreams(ContainerModel
containerModel,
+ @VisibleForTesting
+ Map<String, SystemStream> getActiveTaskChangelogSystemStreams(ContainerModel
containerModel,
Map<String, SystemStream> changelogSystemStreams) {
-
if (MapUtils.invertMap(changelogSystemStreams).size() !=
changelogSystemStreams.size()) {
throw new SamzaException("Two stores cannot have the same changelog
system-stream");
}
@@ -348,11 +329,9 @@ public class ContainerStorageManager {
);
getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel)
-> {
- taskSideInputStoreSSPs.putIfAbsent(taskName, new HashMap<>());
changelogSystemStreams.forEach((storeName, systemStream) -> {
SystemStreamPartition ssp = new SystemStreamPartition(systemStream,
taskModel.getChangelogPartition());
changelogSSPToStore.remove(ssp);
- taskSideInputStoreSSPs.get(taskName).put(storeName,
Collections.singleton(ssp));
});
});
@@ -361,6 +340,58 @@ public class ContainerStorageManager {
.collect(Collectors.toMap(Map.Entry::getKey, x ->
x.getValue().getSystemStream()));
}
+ /**
+ * Fetch the side input stores. For active containers, the stores correspond
to the side inputs and for standbys, they
+ * include the durable stores.
+ * @param containerModel the container's model
+ * @param sideInputSystemStreams the map of store to side input system
streams
+ * @param changelogSystemStreams the map of store to changelog system streams
+ * @return A set of side input stores
+ */
+ @VisibleForTesting
+ Set<String> getSideInputStores(ContainerModel containerModel,
+ Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String,
SystemStream> changelogSystemStreams) {
+ // add all the side input stores by default regardless of active vs standby
+ Set<String> sideInputStores = new
HashSet<>(sideInputSystemStreams.keySet());
+
+ // In case of standby tasks, we treat the stores that have changelogs as
side input stores for bootstrapping state
+ if (getTasks(containerModel, TaskMode.Standby).size() > 0) {
+ sideInputStores.addAll(changelogSystemStreams.keySet());
+ }
+ return sideInputStores;
+ }
+
+ /**
+ * Add all sideInputs to a map of maps, indexed first by taskName, then by
sideInput store name.
+ *
+ * @param containerModel the containerModel to use
+ * @param sideInputSystemStreams the map of store to sideInput system stream
+ * @param changelogSystemStreams the map of store to changelog system stream
+ * @return taskSideInputSSPs map
+ */
+ @VisibleForTesting
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>>
getTaskSideInputSSPs(ContainerModel containerModel,
+ Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String,
SystemStream> changelogSystemStreams) {
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs =
new HashMap<>();
+
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
+ taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+ sideInputSystemStreams.keySet().forEach(storeName -> {
+ Set<SystemStreamPartition> taskSideInputs =
taskModel.getSystemStreamPartitions().stream().filter(ssp ->
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
+ taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
+ });
+ });
+
+ getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel)
-> {
+ taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+ changelogSystemStreams.forEach((storeName, systemStream) -> {
+ SystemStreamPartition ssp = new SystemStreamPartition(systemStream,
taskModel.getChangelogPartition());
+ taskSideInputSSPs.get(taskName).put(storeName,
Collections.singleton(ssp));
+ });
+ });
+
+ return taskSideInputSSPs;
+ }
/**
* Creates SystemConsumer objects for store restoration, creating one
consumer per system.
@@ -488,7 +519,7 @@ public class ContainerStorageManager {
// Helper method to filter active Tasks from the container model
private static Map<TaskName, TaskModel> getTasks(ContainerModel
containerModel, TaskMode taskMode) {
return containerModel.getTasks().entrySet().stream()
- .filter(x ->
x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ .filter(x ->
x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
/**
diff --git
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 3e3e01dec..eed7d5442 100644
---
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -46,6 +46,7 @@ import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.serializers.Serde;
@@ -68,6 +69,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.collection.JavaConverters;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -85,6 +87,7 @@ public class TestContainerStorageManager {
private Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
private SamzaContainerMetrics samzaContainerMetrics;
private Map<TaskName, TaskModel> tasks;
+ private StandbyTestContext testContext;
private volatile int systemConsumerCreationCount;
private volatile int systemConsumerStartCount;
@@ -266,6 +269,7 @@ public class TestContainerStorageManager {
DEFAULT_STORE_BASE_DIR,
null,
new SystemClock());
+ this.testContext = new StandbyTestContext();
}
@Test
@@ -504,4 +508,153 @@ public class TestContainerStorageManager {
Assert.assertEquals(ImmutableSet.of("storeName0"),
factoriesToStores.get("factory2"));
}
+
+ @Test
+ public void getActiveTaskChangelogSystemStreams() {
+ Map<String, SystemStream> storeToChangelogSystemStreams =
+
containerStorageManager.getActiveTaskChangelogSystemStreams(testContext.standbyContainerModel,
+ testContext.storesToSystemStreams);
+
+ assertEquals("Standby container should have no active change log",
Collections.emptyMap(),
+ storeToChangelogSystemStreams);
+ }
+
+ @Test
+ public void
getActiveTaskChangelogSystemStreamsForActiveAndStandbyContainer() {
+ Map<String, SystemStream> expectedStoreToChangelogSystemStreams =
+ testContext.storesToSystemStreams;
+ Map<String, SystemStream> storeToChangelogSystemStreams =
containerStorageManager.getActiveTaskChangelogSystemStreams(
+ testContext.activeAndStandbyContainerModel,
testContext.storesToSystemStreams);
+
+ assertEquals("Active and standby container model should have non empty
store to changelog mapping",
+ expectedStoreToChangelogSystemStreams, storeToChangelogSystemStreams);
+ }
+
+ @Test
+ public void getActiveTaskChangelogSystemStreamsForStandbyContainer() {
+ Map<String, SystemStream> expectedStoreToChangelogSystemStreams =
+ testContext.storesToSystemStreams;
+ Map<String, SystemStream> storeToChangelogSystemStreams =
containerStorageManager.getActiveTaskChangelogSystemStreams(
+ testContext.activeContainerModel, testContext.storesToSystemStreams);
+
+ assertEquals("Active container model should have non empty store to
changelog mapping",
+ expectedStoreToChangelogSystemStreams, storeToChangelogSystemStreams);
+ }
+
+ @Test
+ public void getSideInputStoresForActiveContainer() {
+ Set<String> expectedSideInputStores = testContext.activeStores;
+ Set<String> actualSideInputStores =
+
containerStorageManager.getSideInputStores(testContext.activeContainerModel,
+ testContext.sideInputStoresToSystemStreams,
testContext.storesToSystemStreams);
+
+ assertEquals("Mismatch in stores", expectedSideInputStores,
actualSideInputStores);
+ }
+
+ @Test
+ public void getSideInputStoresForStandbyContainer() {
+ final Set<String> expectedSideInputStores = testContext.standbyStores;
+ Set<String> actualSideInputStores =
+
containerStorageManager.getSideInputStores(testContext.standbyContainerModel,
+ testContext.sideInputStoresToSystemStreams,
testContext.storesToSystemStreams);
+
+ assertEquals("Mismatch in side input stores", expectedSideInputStores,
actualSideInputStores);
+ }
+
+ @Test
+ public void getTaskSideInputSSPsForActiveContainer() {
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>>
expectedSideInputSSPs = testContext.activeSideInputSSPs;
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs
=
+
containerStorageManager.getTaskSideInputSSPs(testContext.activeContainerModel,
+ Collections.emptyMap(), testContext.storesToSystemStreams);
+
+ assertEquals("Mismatch in task name --> store --> SSP mapping",
expectedSideInputSSPs, actualSideInputSSPs);
+ }
+
+ @Test
+ public void getTaskSideInputSSPsForStandbyContainerWithSideInput() {
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>>
expectedSideInputSSPs = testContext.standbyWithSideInputSSPs;
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs
=
+
containerStorageManager.getTaskSideInputSSPs(testContext.standbyContainerModelWithSideInputs,
+ testContext.sideInputStoresToSystemStreams,
testContext.storesToSystemStreams);
+
+ assertEquals("Mismatch in task name --> store --> SSP mapping",
expectedSideInputSSPs, actualSideInputSSPs);
+ }
+
+ @Test
+ public void getTaskSideInputSSPsForStandbyContainerWithoutSideInputs() {
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>>
expectedSideInputSSPs = testContext.standbyChangelogSSPs;
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs
=
+
containerStorageManager.getTaskSideInputSSPs(testContext.standbyContainerModel,
+ Collections.emptyMap(), testContext.storesToSystemStreams);
+
+ assertEquals("Mismatch in task name --> store --> SSP mapping",
expectedSideInputSSPs, actualSideInputSSPs);
+ }
+
+ /**
+ * A container class to hold test fields and expected state for standby and
side input related tests
+ */
+ private static class StandbyTestContext {
+ private static final String ACTIVE_TASK = "active-task-1";
+ private static final TaskName ACTIVE_TASK_NAME = new TaskName(ACTIVE_TASK);
+ private static final TaskModel ACTIVE_TASK_MODEL =
+ new TaskModel(ACTIVE_TASK_NAME, Collections.emptySet(), new
Partition(1));
+
+ private static final String STANDBY_TASK_2 = "standby-task-2";
+ private static final TaskName STANDBY_TASK_NAME_2 = new
TaskName(STANDBY_TASK_2);
+ private static final Set<SystemStreamPartition> STANDBY_TASK_INPUT_SSP =
+ ImmutableSet.of(new SystemStreamPartition("test", "side-input-stream",
new Partition(2)));
+ private static final TaskModel STANDBY_TASK_MODEL_WITH_SIDE_INPUT =
+ new TaskModel(STANDBY_TASK_NAME_2, STANDBY_TASK_INPUT_SSP, new
Partition(0), TaskMode.Standby);
+
+ private static final String STANDBY_TASK = "standby-task";
+ private static final TaskName STANDBY_TASK_NAME = new
TaskName(STANDBY_TASK);
+ private static final SystemStreamPartition STANDBY_CHANGELOG_SSP =
+ new SystemStreamPartition("test", "stream", new Partition(0));
+ private static final TaskModel STANDBY_TASK_MODEL =
+ new TaskModel(STANDBY_TASK_NAME, Collections.emptySet(), new
Partition(0), TaskMode.Standby);
+
+ private static final String SIDE_INPUT_STORE = "side-input-store";
+ private static final SystemStream SIDE_INPUT_SYSTEM_STREAM = new
SystemStream("test", "side-input-stream");
+ private static final String TEST_STORE = "test-store";
+ private static final SystemStream TEST_SYSTEM_STREAM = new
SystemStream("test", "stream");
+
+ private final ContainerModel activeContainerModel;
+ private final ContainerModel activeAndStandbyContainerModel;
+ private final ContainerModel standbyContainerModel;
+ private final ContainerModel standbyContainerModelWithSideInputs;
+
+ private final Map<TaskName, Map<String, Set<SystemStreamPartition>>>
activeSideInputSSPs;
+ private final Map<TaskName, Map<String, Set<SystemStreamPartition>>>
standbyChangelogSSPs;
+ private final Map<TaskName, Map<String, Set<SystemStreamPartition>>>
standbyWithSideInputSSPs;
+
+ private final Map<String, SystemStream> storesToSystemStreams;
+ private final Map<String, Set<SystemStream>>
sideInputStoresToSystemStreams;
+ private final Set<String> activeStores;
+ private final Set<String> standbyStores;
+
+ public StandbyTestContext() {
+ Map<TaskName, TaskModel> activeTasks = ImmutableMap.of(ACTIVE_TASK_NAME,
ACTIVE_TASK_MODEL);
+ Map<TaskName, TaskModel> standbyTasks =
ImmutableMap.of(STANDBY_TASK_NAME, STANDBY_TASK_MODEL);
+
+ activeContainerModel = new ContainerModel("active-container-model",
activeTasks);
+ activeAndStandbyContainerModel = new
ContainerModel("active-standby-container-model",
+ ImmutableMap.<TaskName,
TaskModel>builder().putAll(activeTasks).putAll(standbyTasks).build());
+ standbyContainerModel = new ContainerModel("standby-container-model",
standbyTasks);
+ standbyContainerModelWithSideInputs = new
ContainerModel("standby-container-with-side-input",
+ ImmutableMap.of(STANDBY_TASK_NAME_2,
STANDBY_TASK_MODEL_WITH_SIDE_INPUT));
+
+ activeStores = ImmutableSet.of(SIDE_INPUT_STORE);
+ standbyStores = ImmutableSet.of(SIDE_INPUT_STORE, TEST_STORE);
+
+ sideInputStoresToSystemStreams = ImmutableMap.of(SIDE_INPUT_STORE,
ImmutableSet.of(SIDE_INPUT_SYSTEM_STREAM));
+ storesToSystemStreams = ImmutableMap.of(TEST_STORE, TEST_SYSTEM_STREAM);
+
+ activeSideInputSSPs = ImmutableMap.of(ACTIVE_TASK_NAME,
Collections.emptyMap());
+ standbyChangelogSSPs =
+ ImmutableMap.of(STANDBY_TASK_NAME, ImmutableMap.of(TEST_STORE,
ImmutableSet.of(STANDBY_CHANGELOG_SSP)));
+ standbyWithSideInputSSPs = ImmutableMap.of(STANDBY_TASK_NAME_2,
+ ImmutableMap.of(TEST_STORE, ImmutableSet.of(STANDBY_CHANGELOG_SSP),
SIDE_INPUT_STORE, STANDBY_TASK_INPUT_SSP));
+ }
+ }
}