prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r594681640



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -323,7 +319,7 @@ public boolean storeExists(File storeDir) {
    * If the file does not exist, returns null.
    * // TODO HIGH dchen add tests at all call sites for handling null value.
    *
-   * @param storagePartitionDir base directory for the store
+   * @param storagePartitionDir base directory for the store checkpoint file

Review comment:
       "store directory to read the checkpoint file from"
   
   "base directory" is overloaded, it's used to refer to the "logged-store" dir 
as well as the task store dir, but not the store checkpoint dir.

##########
File path: 
samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -43,32 +45,40 @@
 public interface TaskBackupManager {
 
   /**
-   * Initializes the TaskBackupManager instance
-   * @param checkpoint Last recorded checkpoint from the CheckpointManager or 
null if no last checkpoint was found
+   * Initializes the TaskBackupManager instance.
+   *
+   * @param checkpoint last recorded checkpoint from the CheckpointManager or 
null if no last checkpoint was found
    */
   void init(@Nullable Checkpoint checkpoint);
 
   /**
-   * Commit operation that is synchronous to processing
+   *  Snapshot is used to capture the current state of the stores in order to 
persist it to the backup manager in the
+   *  {@link #upload(CheckpointId, Map)} phase. Performs the commit operation 
that is synchronous
+   *  to processing. Returns the per store name state snapshotted checkpoints 
to be used in upload.
+   *
    * @param checkpointId {@link CheckpointId} of the current commit
    * @return a map of store name to state checkpoint markers for stores 
managed by this state backend
    */
   Map<String, String> snapshot(CheckpointId checkpointId);
 
   /**
-   * Commit operation that is asynchronous to message processing,
+   * Upload is used to persist to state provided by the {@link 
#snapshot(CheckpointId)} to the

Review comment:
       s/persist to state/persist the state

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -139,18 +132,16 @@ public void init() {
   }
 
   /**
-   * // TODO HIGH dchen fix method name and documentation. local file system 
isn't very relevant here (too general).
-   * // Maybe rename to "writeCheckpointToStoreDirs" or something and update 
docs.
    * Writes the {@link Checkpoint} returned by {@link #commit(CheckpointId)}
    * locally to the file system on disk if the checkpoint passed in is an 
instance of {@link CheckpointV2},
    * otherwise if it is an instance of {@link CheckpointV1} persists the Kafka 
changelog ssp-offsets only.
    *
    * Note: The assumption is that this method will be invoked once for each 
{@link Checkpoint} version that the
-   * task needs to write.
+   * task needs to write, once per checkpoint version for backwards 
compatibility.

Review comment:
       "needs to write, as determined by {@link 
TaskConfig#getCheckpointVersionsToWrite} (or whatever the method name is). This 
is required for upgrade and rollback compatibility."
   

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -139,18 +132,16 @@ public void init() {
   }
 
   /**
-   * // TODO HIGH dchen fix method name and documentation. local file system 
isn't very relevant here (too general).
-   * // Maybe rename to "writeCheckpointToStoreDirs" or something and update 
docs.
    * Writes the {@link Checkpoint} returned by {@link #commit(CheckpointId)}
    * locally to the file system on disk if the checkpoint passed in is an 
instance of {@link CheckpointV2},
    * otherwise if it is an instance of {@link CheckpointV1} persists the Kafka 
changelog ssp-offsets only.
    *
    * Note: The assumption is that this method will be invoked once for each 
{@link Checkpoint} version that the
-   * task needs to write.
+   * task needs to write, once per checkpoint version for backwards 
compatibility.
    *
    * @param checkpoint the latest checkpoint to be persisted to local file 
system
    */
-  public void persistToLocalFileSystem(Checkpoint checkpoint) {
+  public void writeCheckpointToStoreDirectory(Checkpoint checkpoint) {

Review comment:
       `Directories` (since this writes to both store dir and store checkpoint 
dir). Can also clarify this in the method javadocs. E.g.: 
   
   "Writes the {@link Checkpoint} information returned by {@link 
#commit(CheckpointId)} in each store directory and store checkpoint directory. 
For CheckpointV2, writes the entire task {@link CheckpointV2}. For 
CheckpointV1, only writes the changelog ssp offsets in the OFFSET* files."

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint 
checkpoint) {
   }
 
   /**
-   * // TODO MED dchen: fix documentation
-   * Cleanup the commit state for each of the task backup managers
+   * Cleanup on the commit state from the {@link #commit(CheckpointId)} call. 
Evokes the cleanup the commit state

Review comment:
       "Performs any post-commit and cleanup actions after the {@link 
Checkpoint} is successfully written to the checkpoint topic. Invokes {@link 
TaskStorageBackupManager#cleanup} on each of the configured backup managers. 
Deletes all local store checkpoint directories older than the {@param 
latestCheckpointId}

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -246,15 +245,12 @@ public void writeOffsetFile(File storeDir, 
Map<SystemStreamPartition, String> of
 
   /**
    * Writes the checkpoint to the store checkpoint directory based on the 
checkpointId.
-   * // TODO HIGH dchen why assume writing to "checkpoint directory" instead 
of arbitrary directory?
-   * // TODO HIGH dchen fix param descriptions
-   * @param checkpointDir base store directory to write the checkpoint to
+   *
+   * @param storeDir base store directory to write the checkpoint to

Review comment:
       "store or store checkpoint directory to write the checkpoint file to"

##########
File path: 
samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -43,32 +45,40 @@
 public interface TaskBackupManager {
 
   /**
-   * Initializes the TaskBackupManager instance
-   * @param checkpoint Last recorded checkpoint from the CheckpointManager or 
null if no last checkpoint was found
+   * Initializes the TaskBackupManager instance.
+   *
+   * @param checkpoint last recorded checkpoint from the CheckpointManager or 
null if no last checkpoint was found
    */
   void init(@Nullable Checkpoint checkpoint);
 
   /**
-   * Commit operation that is synchronous to processing
+   *  Snapshot is used to capture the current state of the stores in order to 
persist it to the backup manager in the
+   *  {@link #upload(CheckpointId, Map)} phase. Performs the commit operation 
that is synchronous
+   *  to processing. Returns the per store name state snapshotted checkpoints 
to be used in upload.
+   *
    * @param checkpointId {@link CheckpointId} of the current commit
    * @return a map of store name to state checkpoint markers for stores 
managed by this state backend
    */
   Map<String, String> snapshot(CheckpointId checkpointId);
 
   /**
-   * Commit operation that is asynchronous to message processing,
+   * Upload is used to persist to state provided by the {@link 
#snapshot(CheckpointId)} to the
+   * underlying backup system. Commit operation that is asynchronous to 
message processing and returns a
+   * {@link CompletableFuture} containing the successfully uploaded state 
checkpoints.

Review comment:
       "uploaded state checkpoint markers"

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -181,27 +176,26 @@ public void persistToLocalFileSystem(Checkpoint 
checkpoint) {
   }
 
   /**
-   * // TODO MED dchen: fix documentation
-   * Cleanup the commit state for each of the task backup managers
+   * Cleanup on the commit state from the {@link #commit(CheckpointId)} call. 
Evokes the cleanup the commit state

Review comment:
       s/Evokes/Invokes in this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to