prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594681640
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -323,7 +319,7 @@ public boolean storeExists(File storeDir) {
* If the file does not exist, returns null.
* // TODO HIGH dchen add tests at all call sites for handling null value.
*
- * @param storagePartitionDir base directory for the store
+ * @param storagePartitionDir base directory for the store checkpoint file
Review comment:
"store directory to read the checkpoint file from"
"base directory" is overloaded, it's used to refer to the "logged-store" dir
as well as the task store dir, but not the store checkpoint dir.
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -43,32 +45,40 @@
public interface TaskBackupManager {
/**
- * Initializes the TaskBackupManager instance
- * @param checkpoint Last recorded checkpoint from the CheckpointManager or
null if no last checkpoint was found
+ * Initializes the TaskBackupManager instance.
+ *
+ * @param checkpoint last recorded checkpoint from the CheckpointManager or
null if no last checkpoint was found
*/
void init(@Nullable Checkpoint checkpoint);
/**
- * Commit operation that is synchronous to processing
+ * Snapshot is used to capture the current state of the stores in order to
persist it to the backup manager in the
+ * {@link #upload(CheckpointId, Map)} phase. Performs the commit operation
that is synchronous
+ * to processing. Returns the per store name state snapshotted checkpoints
to be used in upload.
+ *
* @param checkpointId {@link CheckpointId} of the current commit
* @return a map of store name to state checkpoint markers for stores
managed by this state backend
*/
Map<String, String> snapshot(CheckpointId checkpointId);
/**
- * Commit operation that is asynchronous to message processing,
+ * Upload is used to persist to state provided by the {@link
#snapshot(CheckpointId)} to the
Review comment:
s/persist to state/persist the state
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -139,18 +132,16 @@ public void init() {
}
/**
- * // TODO HIGH dchen fix method name and documentation. local file system
isn't very relevant here (too general).
- * // Maybe rename to "writeCheckpointToStoreDirs" or something and update
docs.
* Writes the {@link Checkpoint} returned by {@link #commit(CheckpointId)}
* locally to the file system on disk if the checkpoint passed in is an
instance of {@link CheckpointV2},
* otherwise if it is an instance of {@link CheckpointV1} persists the Kafka
changelog ssp-offsets only.
*
* Note: The assumption is that this method will be invoked once for each
{@link Checkpoint} version that the
- * task needs to write.
+ * task needs to write, once per checkpoint version for backwards
compatibility.
Review comment:
"needs to write, as determined by {@link
TaskConfig#getCheckpointVersionsToWrite} (or whatever the method name is). This
is required for upgrade and rollback compatibility."
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -139,18 +132,16 @@ public void init() {
}
/**
- * // TODO HIGH dchen fix method name and documentation. local file system
isn't very relevant here (too general).
- * // Maybe rename to "writeCheckpointToStoreDirs" or something and update
docs.
* Writes the {@link Checkpoint} returned by {@link #commit(CheckpointId)}
* locally to the file system on disk if the checkpoint passed in is an
instance of {@link CheckpointV2},
* otherwise if it is an instance of {@link CheckpointV1} persists the Kafka
changelog ssp-offsets only.
*
* Note: The assumption is that this method will be invoked once for each
{@link Checkpoint} version that the
- * task needs to write.
+ * task needs to write, once per checkpoint version for backwards
compatibility.
*
* @param checkpoint the latest checkpoint to be persisted to local file
system
*/
- public void persistToLocalFileSystem(Checkpoint checkpoint) {
+ public void writeCheckpointToStoreDirectory(Checkpoint checkpoint) {
Review comment:
`Directories` (since this writes to both store dir and store checkpoint
dir). Can also clarify this in the method javadocs. E.g.:
"Writes the {@link Checkpoint} information returned by {@link
#commit(CheckpointId)} in each store directory and store checkpoint directory.
For CheckpointV2, writes the entire task {@link CheckpointV2}. For
CheckpointV1, only writes the changelog ssp offsets in the OFFSET* files."
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint
checkpoint) {
}
/**
- * // TODO MED dchen: fix documentation
- * Cleanup the commit state for each of the task backup managers
+ * Cleanup on the commit state from the {@link #commit(CheckpointId)} call.
Evokes the cleanup the commit state
Review comment:
"Performs any post-commit and cleanup actions after the {@link
Checkpoint} is successfully written to the checkpoint topic. Invokes {@link
TaskStorageBackupManager#cleanup} on each of the configured backup managers.
Deletes all local store checkpoint directories older than the {@param
latestCheckpointId}
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -246,15 +245,12 @@ public void writeOffsetFile(File storeDir,
Map<SystemStreamPartition, String> of
/**
* Writes the checkpoint to the store checkpoint directory based on the
checkpointId.
- * // TODO HIGH dchen why assume writing to "checkpoint directory" instead
of arbitrary directory?
- * // TODO HIGH dchen fix param descriptions
- * @param checkpointDir base store directory to write the checkpoint to
+ *
+ * @param storeDir base store directory to write the checkpoint to
Review comment:
"store or store checkpoint directory to write the checkpoint file to"
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -43,32 +45,40 @@
public interface TaskBackupManager {
/**
- * Initializes the TaskBackupManager instance
- * @param checkpoint Last recorded checkpoint from the CheckpointManager or
null if no last checkpoint was found
+ * Initializes the TaskBackupManager instance.
+ *
+ * @param checkpoint last recorded checkpoint from the CheckpointManager or
null if no last checkpoint was found
*/
void init(@Nullable Checkpoint checkpoint);
/**
- * Commit operation that is synchronous to processing
+ * Snapshot is used to capture the current state of the stores in order to
persist it to the backup manager in the
+ * {@link #upload(CheckpointId, Map)} phase. Performs the commit operation
that is synchronous
+ * to processing. Returns the per store name state snapshotted checkpoints
to be used in upload.
+ *
* @param checkpointId {@link CheckpointId} of the current commit
* @return a map of store name to state checkpoint markers for stores
managed by this state backend
*/
Map<String, String> snapshot(CheckpointId checkpointId);
/**
- * Commit operation that is asynchronous to message processing,
+ * Upload is used to persist to state provided by the {@link
#snapshot(CheckpointId)} to the
+ * underlying backup system. Commit operation that is asynchronous to
message processing and returns a
+ * {@link CompletableFuture} containing the successfully uploaded state
checkpoints.
Review comment:
"uploaded state checkpoint markers"
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint
checkpoint) {
}
/**
- * // TODO MED dchen: fix documentation
- * Cleanup the commit state for each of the task backup managers
+ * Cleanup on the commit state from the {@link #commit(CheckpointId)} call.
Evokes the cleanup the commit state
Review comment:
s/Evokes/Invokes in this class.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]