ovj commented on a change in pull request #651: Spark Stage retry handling
URL: https://github.com/apache/incubator-hudi/pull/651#discussion_r280920633
##########
File path: hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
##########
@@ -67,49 +75,51 @@ public HoodieIOHandle(HoodieWriteConfig config, String
commitTime, HoodieTable<T
}
/**
- * Deletes any new tmp files written during the current commit, into the
partition
+ * Generate a write token based on the currently running spark task and its
place in the spark dag.
*/
- public static void cleanupTmpFilesFromCurrentCommit(HoodieWriteConfig
config, String commitTime,
- String partitionPath, int taskPartitionId, HoodieTable hoodieTable) {
- FileSystem fs = hoodieTable.getMetaClient().getFs();
- try {
- FileStatus[] prevFailedFiles = fs.globStatus(new Path(String
- .format("%s/%s/%s", config.getBasePath(), partitionPath,
- FSUtils.maskWithoutFileId(commitTime, taskPartitionId))));
- if (prevFailedFiles != null) {
- logger.info(
- "Deleting " + prevFailedFiles.length + " files generated by
previous failed attempts.");
- for (FileStatus status : prevFailedFiles) {
- fs.delete(status.getPath(), false);
- }
- }
- } catch (IOException e) {
- throw new HoodieIOException("Failed to cleanup Temp files from commit "
+ commitTime, e);
- }
+ private static String makeSparkWriteToken() {
+ return FSUtils.makeWriteToken(TaskContext.getPartitionId(),
TaskContext.get().stageId(),
+ TaskContext.get().taskAttemptId());
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
return HoodieAvroUtils.addMetadataFields(originalSchema);
}
- public Path makeNewPath(String partitionPath, int taskPartitionId, String
fileName) {
+ public Path makeNewPath(String partitionPath) {
Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
try {
fs.mkdirs(path); // create a new partition as needed.
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
- return new Path(path.toString(),
- FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
+ return new Path(path.toString(), FSUtils.makeDataFileName(commitTime,
writeToken, fileId));
+ }
+
+ /**
+ * Creates an empty marker file corresponding to storage writer path
+ * @param partitionPath Partition path
+ */
+ protected void createMarkerFile(String partitionPath) {
+ Path markerPath = makeNewMarkerPath(partitionPath);
+ try {
+ logger.info("Creating Marker Path=" + markerPath);
+ fs.create(markerPath, true).close();
+ } catch (IOException e) {
+ throw new HoodieException("Failed to create marker file " + markerPath,
e);
+ }
}
- public Path makeTempPath(String partitionPath, int taskPartitionId, String
fileName, int stageId,
- long taskAttemptId) {
- Path path = new Path(config.getBasePath(),
HoodieTableMetaClient.TEMPFOLDER_NAME);
- return new Path(path.toString(),
- FSUtils.makeTempDataFileName(partitionPath, commitTime,
taskPartitionId, fileName, stageId,
- taskAttemptId));
+ private Path makeNewMarkerPath(String partitionPath) {
+ Path markerRootPath = new
Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
+ Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
Review comment:
may be add this in the javadoc? Would be easy to follow.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services