prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r585971931
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
##########
@@ -263,10 +245,11 @@ static StoreActions getStoreActions(
String checkpointedOffset = null; // can be null if no message, or
message has null offset
long timeSinceLastCheckpointInMs = Long.MAX_VALUE;
if (StringUtils.isNotBlank(checkpointMessage)) {
+ // TODO HIGH dchen fix Checkpoint version handling with
stateCheckpointMarker
Review comment:
Ideally the CheckpointV1 (with KafkaStateChangelogOffset) ->
CheckpointV2 (with KafkaSCM) format conversion will happen in
getCheckpointedChangelogOffset, and this method will get a Map<Store Name,
Kafka SCM>.
##########
File path:
samza-core/src/main/java/org/apache/samza/serializers/CheckpointV2Serde.java
##########
@@ -54,60 +59,65 @@
* }
* }
* </code>
- *
*/
-public class StatefulCheckpointSerde implements Serde<Checkpoint> {
- private static final StateCheckpointMarkerSerde
STATE_CHECKPOINT_MARKER_SERDE = new StateCheckpointMarkerSerde();
+public class CheckpointV2Serde implements Serde<CheckpointV2> {
+ private static final StateCheckpointMarkerSerde SCM_SERDE = new
StateCheckpointMarkerSerde();
+
private final Serde<JsonCheckpoint> jsonCheckpointSerde;
- public StatefulCheckpointSerde() {
+ public CheckpointV2Serde() {
this.jsonCheckpointSerde = new JsonSerdeV2<>(JsonCheckpoint.class);
}
@Override
- public Checkpoint fromBytes(byte[] bytes) {
+ public CheckpointV2 fromBytes(byte[] bytes) {
try {
JsonCheckpoint jsonCheckpoint = jsonCheckpointSerde.fromBytes(bytes);
Map<SystemStreamPartition, String> sspOffsets = new HashMap<>();
Map<String, List<StateCheckpointMarker>> stateCheckpoints = new
HashMap<>();
jsonCheckpoint.getInputOffsets().forEach((sspName, m) -> {
Review comment:
Minor: s/m/sspInfo or something similar. Maybe add a comment explaining
what it contains and why we serialize it in this format. Also extract constants
for the key names.
##########
File path:
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -78,28 +77,21 @@ public String getChangelogOffset() {
/**
* Builds the SSP to kafka offset mapping from map of store name to list of
StateCheckpointMarkers
* containing a KafkaStateCheckpointMarker
- * @param storeToStateBackendStateMarkers Map of store name to list of
StateCheckpointMarkers containing a KafkaStateCheckpointMarker
+ * @param factoryStateBackendStateMarkersMap Map of store name to list of
StateCheckpointMarkers containing a
+ * KafkaStateCheckpointMarker
* @return Map of ssp to option of Kafka offset
*/
// TODO HIGH dchen add unit tests, fix javadocs
public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
- Map<String, List<StateCheckpointMarker>>
storeToStateBackendStateMarkers) {
- Map<String, String> storeToKafkaStateMarker = new HashMap<>();
- storeToStateBackendStateMarkers.forEach((storeName, scms) -> {
- String kafkaStateMarker = null;
- for (StateCheckpointMarker scm : scms) {
- if
(KafkaChangelogStateBackendFactory.class.getName().equals(scm.getStateBackendFactoryName()))
{
- kafkaStateMarker = scm.getStateCheckpointMarker();
- break; // there should be only one KafkaStateCheckpointMarker per
store
- }
- }
- storeToKafkaStateMarker.put(storeName, kafkaStateMarker);
- });
- return scmToSSPOffsetMap(storeToKafkaStateMarker);
+ Map<String, Map<String, String>> factoryStateBackendStateMarkersMap) {
+ return scmToSSPOffsetMap(factoryStateBackendStateMarkersMap
+ .getOrDefault(KAFKA_BACKEND_FACTORY_NAME, null));
Review comment:
Don't return null collections, return empty collections instead.
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -62,37 +65,60 @@ public void start() {
* @return Committed StoreName to StateCheckpointMarker mappings of the
committed SSPs
*/
public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName,
CheckpointId checkpointId) {
- Map<String, StateCheckpointMarker> snapshot =
storageBackupManager.snapshot(checkpointId);
- LOG.trace("Returned StateCheckpointMarkers from snapshot: {} for taskName:
{} checkpoint id: {}", snapshot, taskName, checkpointId);
- CompletableFuture<Map<String, StateCheckpointMarker>>
- uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
-
- try {
- // TODO: Make async with andThen and add thread management for
concurrency and add timeouts
- Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get();
- LOG.trace("Returned StateCheckpointMarkers from upload: {} for taskName:
{} checkpoint id: {}", uploadMap, taskName, checkpointId);
- if (uploadMap != null) {
- LOG.trace("Persisting stores to file system for taskName: {} with
checkpoint id: {}", taskName, checkpointId);
- storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
- }
+ List<Map<String, StateCheckpointMarker>> stateCheckpoints = new
ArrayList<>();
Review comment:
Minor: add a comment for what this is: (list == one entry per backend
factory, map == store name to SCM)
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -62,37 +65,60 @@ public void start() {
* @return Committed StoreName to StateCheckpointMarker mappings of the
committed SSPs
*/
public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName,
CheckpointId checkpointId) {
- Map<String, StateCheckpointMarker> snapshot =
storageBackupManager.snapshot(checkpointId);
- LOG.trace("Returned StateCheckpointMarkers from snapshot: {} for taskName:
{} checkpoint id: {}", snapshot, taskName, checkpointId);
- CompletableFuture<Map<String, StateCheckpointMarker>>
- uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
-
- try {
- // TODO: Make async with andThen and add thread management for
concurrency and add timeouts
- Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get();
- LOG.trace("Returned StateCheckpointMarkers from upload: {} for taskName:
{} checkpoint id: {}", uploadMap, taskName, checkpointId);
- if (uploadMap != null) {
- LOG.trace("Persisting stores to file system for taskName: {} with
checkpoint id: {}", taskName, checkpointId);
- storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
- }
+ List<Map<String, StateCheckpointMarker>> stateCheckpoints = new
ArrayList<>();
+ backendFactoryBackupManagerMap.values().forEach(storageBackupManager -> {
+ Map<String, StateCheckpointMarker> snapshotSCMs =
storageBackupManager.snapshot(checkpointId);
Review comment:
Now that we have multiple state backends, it makes sense to extract
store flush and checkpoint dir creation out of the backup manager
implementations and move it here as a one time thing.
##########
File path:
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -78,28 +77,21 @@ public String getChangelogOffset() {
/**
* Builds the SSP to kafka offset mapping from map of store name to list of
StateCheckpointMarkers
* containing a KafkaStateCheckpointMarker
- * @param storeToStateBackendStateMarkers Map of store name to list of
StateCheckpointMarkers containing a KafkaStateCheckpointMarker
+ * @param factoryStateBackendStateMarkersMap Map of store name to list of
StateCheckpointMarkers containing a
Review comment:
Fix param name and description to reflect map contents.
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -62,37 +65,60 @@ public void start() {
* @return Committed StoreName to StateCheckpointMarker mappings of the
committed SSPs
*/
public Map<String, List<StateCheckpointMarker>> commit(TaskName taskName,
CheckpointId checkpointId) {
- Map<String, StateCheckpointMarker> snapshot =
storageBackupManager.snapshot(checkpointId);
- LOG.trace("Returned StateCheckpointMarkers from snapshot: {} for taskName:
{} checkpoint id: {}", snapshot, taskName, checkpointId);
- CompletableFuture<Map<String, StateCheckpointMarker>>
- uploadFuture = storageBackupManager.upload(checkpointId, snapshot);
-
- try {
- // TODO: Make async with andThen and add thread management for
concurrency and add timeouts
- Map<String, StateCheckpointMarker> uploadMap = uploadFuture.get();
- LOG.trace("Returned StateCheckpointMarkers from upload: {} for taskName:
{} checkpoint id: {}", uploadMap, taskName, checkpointId);
- if (uploadMap != null) {
- LOG.trace("Persisting stores to file system for taskName: {} with
checkpoint id: {}", taskName, checkpointId);
- storageBackupManager.persistToFilesystem(checkpointId, uploadMap);
- }
+ List<Map<String, StateCheckpointMarker>> stateCheckpoints = new
ArrayList<>();
+ backendFactoryBackupManagerMap.values().forEach(storageBackupManager -> {
+ Map<String, StateCheckpointMarker> snapshotSCMs =
storageBackupManager.snapshot(checkpointId);
+ LOG.debug("Found snapshot SCMs for taskName: {} checkpoint id: {} to be:
{}", taskName, checkpointId, snapshotSCMs);
- // TODO: call commit on multiple backup managers when available
- return mergeCheckpoints(taskName, Collections.singletonList(uploadMap));
- } catch (Exception e) {
- throw new SamzaException("Upload commit portion could not be completed
for taskName", e);
- }
+ CompletableFuture<Map<String, StateCheckpointMarker>> uploadFuture =
storageBackupManager.upload(checkpointId, snapshotSCMs);
+
+ try {
+ // TODO: HIGH dchen Make async with andThen and add thread management
for concurrency and add timeouts,
+ // need to make upload theads independent
+ Map<String, StateCheckpointMarker> uploadSCMs = uploadFuture.get();
+ LOG.debug("Found uplaod SCMs for taskName: {} checkpoint id: {} to be:
{}", taskName, checkpointId, uploadSCMs);
+
+ if (uploadSCMs != null) {
+ LOG.debug("Persisting SCMs to store checkpoint directory for
taskName: {} with checkpoint id: {}", taskName,
+ checkpointId);
+ storageBackupManager.persistToFilesystem(checkpointId, uploadSCMs);
Review comment:
Should we move this (tagging checkpoint dir with a file containing the
serialized "Checkpoint/SCM map") out of each backup manager impl and do it once
in this class?
##########
File path:
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -78,28 +77,21 @@ public String getChangelogOffset() {
/**
* Builds the SSP to kafka offset mapping from map of store name to list of
StateCheckpointMarkers
* containing a KafkaStateCheckpointMarker
- * @param storeToStateBackendStateMarkers Map of store name to list of
StateCheckpointMarkers containing a KafkaStateCheckpointMarker
+ * @param factoryStateBackendStateMarkersMap Map of store name to list of
StateCheckpointMarkers containing a
+ * KafkaStateCheckpointMarker
* @return Map of ssp to option of Kafka offset
*/
// TODO HIGH dchen add unit tests, fix javadocs
public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
- Map<String, List<StateCheckpointMarker>>
storeToStateBackendStateMarkers) {
- Map<String, String> storeToKafkaStateMarker = new HashMap<>();
- storeToStateBackendStateMarkers.forEach((storeName, scms) -> {
- String kafkaStateMarker = null;
- for (StateCheckpointMarker scm : scms) {
- if
(KafkaChangelogStateBackendFactory.class.getName().equals(scm.getStateBackendFactoryName()))
{
- kafkaStateMarker = scm.getStateCheckpointMarker();
- break; // there should be only one KafkaStateCheckpointMarker per
store
- }
- }
- storeToKafkaStateMarker.put(storeName, kafkaStateMarker);
- });
- return scmToSSPOffsetMap(storeToKafkaStateMarker);
+ Map<String, Map<String, String>> factoryStateBackendStateMarkersMap) {
+ return scmToSSPOffsetMap(factoryStateBackendStateMarkersMap
+ .getOrDefault(KAFKA_BACKEND_FACTORY_NAME, null));
}
/**
* Builds a SSP to Kafka offset mapping from map of store name to
KafkaStateCheckpointMarkers
+ * @param storeToKafkaStateMarker storeName to serialized
KafkaStateCheckpointMarker
+ * @return Map of SSP to Optional offset
Review comment:
Document which ssps (store changelogs), and when the optional will be
empty.
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -37,17 +36,18 @@
private final CheckpointId checkpointId;
private final Map<SystemStreamPartition, String> inputOffsets;
- private final Map<String, List<StateCheckpointMarker>>
stateCheckpointMarkers;
+ private final Map<String, Map<String, String>> stateCheckpointMarkers;
/**
* Constructs the checkpoint with separated input and state offsets
* @param checkpointId CheckpointId associated with this checkpoint
* @param inputOffsets Map of Samza system stream partition to offset of the
checkpoint
- * @param stateCheckpointMarkers Map of local state store names and
StateCheckpointMarkers for each state backend system
+ * @param stateCheckpointMarkers Map of state backend factory name to map of
local state store names
+ * to StateCheckpointMarkers
Review comment:
s/StateCheckpointMarkers/state checkpoint markers everywhere since its
not a class anymore.
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -85,8 +85,7 @@ public CheckpointId getCheckpointId() {
*
* @return The state checkpoint markers for the checkpoint
*/
- public Map<String, List<StateCheckpointMarker>> getStateCheckpointMarkers() {
- // TODO HIGH dchen it might be a lot simpler to make this a
Map<StateBackend, Map<StoreName, Marker>>
+ public Map<String, Map<String, String>> getStateCheckpointMarkers() {
Review comment:
Document map contents.
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -79,7 +79,7 @@
* @param checkpointId The {@link CheckpointId} of the last successfully
committed checkpoint
* @param stateCheckpointMarkers A map of store name to state checkpoint
markers from returned by {@link #upload(CheckpointId, Map)} upload}
Review comment:
s/from returned by/returned by/
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -139,29 +127,4 @@ public void close() {
}
});
}
-
- // TODO HIGH dchen add javadocs for what this method is doing
- // TODO HIGH dchen add unit tests.
- private Map<String, List<StateCheckpointMarker>> getStoreSCMs(TaskName
taskName,
Review comment:
Nice cleanup, thanks!
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -101,33 +96,26 @@ public void start() {
}
});
- return getStoreSCMs(taskName, backendFactoryStoreStateMarkers);
+ return backendFactoryStoreStateMarkers;
}
- // TODO HIGH dchen add javadocs explaining what the params are.
- public void cleanUp(CheckpointId checkpointId, Map<String,
List<StateCheckpointMarker>> stateCheckpointMarkers) {
- // { state backend factory -> { store name -> state checkpoint marker)
- Map<String, Map<String, StateCheckpointMarker>> stateBackendToStoreSCMs =
new HashMap<>();
-
- // The number of backend factories is equal to the length of the
stateCheckpointMarker per store list
- stateBackendToBackupManager.keySet().forEach((stateBackendFactoryName) -> {
- stateBackendToStoreSCMs.put(stateBackendFactoryName, new HashMap<>());
- });
-
- stateCheckpointMarkers.forEach((storeName, scmList) -> {
- scmList.forEach(scm -> {
- if
(stateBackendToStoreSCMs.containsKey(scm.getStateBackendFactoryName())) {
-
stateBackendToStoreSCMs.get(scm.getStateBackendFactoryName()).put(storeName,
scm);
- } else {
- LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ",
scm, scm.getStateBackendFactoryName());
+ /**
+ * Cleanup each of the task backup managers
+ * @param checkpointId CheckpointId of the most recent successful commit
+ * @param stateCheckpointMarkers map of map(stateBackendFactoryName to
map(storeName to StateCheckpointMarkers) from
Review comment:
Fix param description. Will be more readable to use the format you used
in JsonCheckpoint.
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -101,33 +96,26 @@ public void start() {
}
});
- return getStoreSCMs(taskName, backendFactoryStoreStateMarkers);
+ return backendFactoryStoreStateMarkers;
}
- // TODO HIGH dchen add javadocs explaining what the params are.
- public void cleanUp(CheckpointId checkpointId, Map<String,
List<StateCheckpointMarker>> stateCheckpointMarkers) {
- // { state backend factory -> { store name -> state checkpoint marker)
- Map<String, Map<String, StateCheckpointMarker>> stateBackendToStoreSCMs =
new HashMap<>();
-
- // The number of backend factories is equal to the length of the
stateCheckpointMarker per store list
- stateBackendToBackupManager.keySet().forEach((stateBackendFactoryName) -> {
- stateBackendToStoreSCMs.put(stateBackendFactoryName, new HashMap<>());
- });
-
- stateCheckpointMarkers.forEach((storeName, scmList) -> {
- scmList.forEach(scm -> {
- if
(stateBackendToStoreSCMs.containsKey(scm.getStateBackendFactoryName())) {
-
stateBackendToStoreSCMs.get(scm.getStateBackendFactoryName()).put(storeName,
scm);
- } else {
- LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ",
scm, scm.getStateBackendFactoryName());
+ /**
+ * Cleanup each of the task backup managers
+ * @param checkpointId CheckpointId of the most recent successful commit
+ * @param stateCheckpointMarkers map of map(stateBackendFactoryName to
map(storeName to StateCheckpointMarkers) from
+ * the latest commit
+ */
+ public void cleanUp(CheckpointId checkpointId, Map<String, Map<String,
String>> stateCheckpointMarkers) {
+ stateCheckpointMarkers.entrySet().forEach((factoryNameToSCM) -> {
+ String factoryName = factoryNameToSCM.getKey();
+ if (stateBackendToBackupManager.containsKey(factoryName)) {
+ TaskBackupManager backupManager =
stateBackendToBackupManager.get(factoryName);
+ if (backupManager != null) {
+ backupManager.cleanUp(checkpointId, factoryNameToSCM.getValue());
}
Review comment:
Else throw exception? When would backupManager be null?
##########
File path:
samza-core/src/main/java/org/apache/samza/serializers/JsonCheckpoint.java
##########
@@ -29,6 +29,7 @@
public class JsonCheckpoint {
private String checkpointId;
private Map<String, Map<String, String>> inputOffsets;
+ // Map<StorageBackendFactoryName, Map<StoreName, StateCheckpointMarker>>
Review comment:
+1, maybe use this convention in javadocs at-params/at-returns and
comments describing field/variables as well. More readable than the natural
language description IMHO.
##########
File path:
samza-core/src/main/java/org/apache/samza/serializers/CheckpointV2Serde.java
##########
@@ -54,60 +59,65 @@
* }
* }
* </code>
- *
*/
-public class StatefulCheckpointSerde implements Serde<Checkpoint> {
- private static final StateCheckpointMarkerSerde
STATE_CHECKPOINT_MARKER_SERDE = new StateCheckpointMarkerSerde();
+public class CheckpointV2Serde implements Serde<CheckpointV2> {
+ private static final StateCheckpointMarkerSerde SCM_SERDE = new
StateCheckpointMarkerSerde();
+
private final Serde<JsonCheckpoint> jsonCheckpointSerde;
- public StatefulCheckpointSerde() {
+ public CheckpointV2Serde() {
this.jsonCheckpointSerde = new JsonSerdeV2<>(JsonCheckpoint.class);
}
@Override
- public Checkpoint fromBytes(byte[] bytes) {
+ public CheckpointV2 fromBytes(byte[] bytes) {
try {
JsonCheckpoint jsonCheckpoint = jsonCheckpointSerde.fromBytes(bytes);
Map<SystemStreamPartition, String> sspOffsets = new HashMap<>();
Map<String, List<StateCheckpointMarker>> stateCheckpoints = new
HashMap<>();
jsonCheckpoint.getInputOffsets().forEach((sspName, m) -> {
Review comment:
Might not be necessary if you're planning to switch to Jackson
Deserializers later.
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -65,9 +61,9 @@ public void start() {
/**
* Commits the local state on the remote backup implementation
* TODO BLOCKER dchen add comments / docs for what all these Map keys and
value are.
- * @return Committed StoreName to StateCheckpointMarker mappings of the
committed SSPs
+ * @return Committed Map of FactoryName to (Map of StoreName to
StateCheckpointMarker) mappings of the committed SSPs
Review comment:
Clean up redundant description. (Committed map of .. mappings of the
committed SSPs)
##########
File path:
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -78,28 +77,21 @@ public String getChangelogOffset() {
/**
* Builds the SSP to kafka offset mapping from map of store name to list of
StateCheckpointMarkers
* containing a KafkaStateCheckpointMarker
- * @param storeToStateBackendStateMarkers Map of store name to list of
StateCheckpointMarkers containing a KafkaStateCheckpointMarker
+ * @param factoryStateBackendStateMarkersMap Map of store name to list of
StateCheckpointMarkers containing a
Review comment:
Update rest of the javadocs as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]