xinyuiscool commented on a change in pull request #1164: [WIP] Transactional
State [5/5]: Added implementations for transactional state checkpoints and
restore
URL: https://github.com/apache/samza/pull/1164#discussion_r331580528
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -293,11 +327,57 @@ public static boolean storeExists(File storeDir) {
* @param taskMode the mode of the given task
* @return the partition directory for the store
*/
- public static File getStorePartitionDir(File storeBaseDir, String storeName,
TaskName taskName, TaskMode taskMode) {
+ public File getTaskStoreDir(File storeBaseDir, String storeName, TaskName
taskName, TaskMode taskMode) {
TaskName taskNameForDirName = taskName;
if (taskMode.equals(TaskMode.Standby)) {
taskNameForDirName = StandbyTaskUtil.getActiveTaskName(taskName);
}
return new File(storeBaseDir, (storeName + File.separator +
taskNameForDirName.toString()).replace(' ', '_'));
}
+
+ public List<File> getTaskStoreCheckpointDirs(File storeBaseDir, String
storeName,
+ TaskName taskName, TaskMode taskMode) {
+ try {
+ File storeDir = new File(storeBaseDir, storeName);
+ String taskStoreName = getTaskStoreDir(storeBaseDir, storeName,
taskName, taskMode).getName();
+
+ if (storeDir.exists()) { // new store or no local state
+ return Files.list(storeDir.toPath())
+ .map(Path::toFile)
+ .filter(file -> file.getName().contains(taskStoreName + "-"))
+ .collect(Collectors.toList());
+ } else {
+ return Collections.emptyList();
+ }
+ } catch (IOException e) {
+ throw new SamzaException(
+ String.format("Error finding checkpoint dirs for task: %s mode: %s
store: %s in dir: %s",
+ taskName, taskMode, storeName, storeBaseDir), e);
+ }
+ }
+
+ public void moveCheckpointFiles(File checkpointDir, File storeDir) {
Review comment:
I guess it's better to name it restoreCheckpointFiles instead of move, since
we don't move files here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services