xinyuiscool commented on a change in pull request #1164: [WIP] Transactional 
State [5/5]: Added implementations for transactional state checkpoints and 
restore
URL: https://github.com/apache/samza/pull/1164#discussion_r331580528
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##########
 @@ -293,11 +327,57 @@ public static boolean storeExists(File storeDir) {
    * @param taskMode the mode of the given task
    * @return the partition directory for the store
    */
-  public static File getStorePartitionDir(File storeBaseDir, String storeName, 
TaskName taskName, TaskMode taskMode) {
+  public File getTaskStoreDir(File storeBaseDir, String storeName, TaskName 
taskName, TaskMode taskMode) {
     TaskName taskNameForDirName = taskName;
     if (taskMode.equals(TaskMode.Standby)) {
       taskNameForDirName =  StandbyTaskUtil.getActiveTaskName(taskName);
     }
     return new File(storeBaseDir, (storeName + File.separator + 
taskNameForDirName.toString()).replace(' ', '_'));
   }
+
+  public List<File> getTaskStoreCheckpointDirs(File storeBaseDir, String 
storeName,
+      TaskName taskName, TaskMode taskMode) {
+    try {
+      File storeDir = new File(storeBaseDir, storeName);
+      String taskStoreName = getTaskStoreDir(storeBaseDir, storeName, 
taskName, taskMode).getName();
+
+      if (storeDir.exists()) { // new store or no local state
+        return Files.list(storeDir.toPath())
+            .map(Path::toFile)
+            .filter(file -> file.getName().contains(taskStoreName + "-"))
+            .collect(Collectors.toList());
+      } else {
+        return Collections.emptyList();
+      }
+    } catch (IOException e) {
+      throw new SamzaException(
+          String.format("Error finding checkpoint dirs for task: %s mode: %s 
store: %s in dir: %s",
+              taskName, taskMode, storeName, storeBaseDir), e);
+    }
+  }
+
+  public void moveCheckpointFiles(File checkpointDir, File storeDir) {
 
 Review comment:
   I guess it's better to name it restoreCheckpointFiles instead of move, since 
we don't move files here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to