vinothchandar commented on a change in pull request #651: Spark Stage retry
handling
URL: https://github.com/apache/incubator-hudi/pull/651#discussion_r285282727
##########
File path: hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
##########
@@ -63,50 +71,64 @@ public HoodieIOHandle(HoodieWriteConfig config, String
commitTime, HoodieTable<T
config.getWriteStatusFailureFraction());
}
+ private static FileSystem getFileSystem(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(),
config.isConsistencyCheckEnabled()
+ ?
FSUtils.getFailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
+ config.getMaxConsistencyChecks(),
config.getInitialConsistencyCheckIntervalMs(),
+ config.getMaxConsistencyCheckIntervalMs()) : new
NoOpConsistencyGuard());
+ }
+
/**
- * Deletes any new tmp files written during the current commit, into the
partition
+ * Generate a write token based on the currently running spark task and its
place in the spark dag.
*/
- public static void cleanupTmpFilesFromCurrentCommit(HoodieWriteConfig
config, String commitTime,
- String partitionPath, int taskPartitionId, HoodieTable hoodieTable) {
- FileSystem fs = hoodieTable.getMetaClient().getFs();
- try {
- FileStatus[] prevFailedFiles = fs.globStatus(new Path(String
- .format("%s/%s/%s", config.getBasePath(), partitionPath,
- FSUtils.maskWithoutFileId(commitTime, taskPartitionId))));
- if (prevFailedFiles != null) {
- logger.info(
- "Deleting " + prevFailedFiles.length + " files generated by
previous failed attempts.");
- for (FileStatus status : prevFailedFiles) {
- fs.delete(status.getPath(), false);
- }
- }
- } catch (IOException e) {
- throw new HoodieIOException("Failed to cleanup Temp files from commit "
+ commitTime, e);
- }
+ private static String makeSparkWriteToken() {
+ return FSUtils.makeWriteToken(TaskContext.getPartitionId(),
TaskContext.get().stageId(),
+ TaskContext.get().taskAttemptId());
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
return HoodieAvroUtils.addMetadataFields(originalSchema);
}
- public Path makeNewPath(String partitionPath, int taskPartitionId, String
fileName) {
+ public Path makeNewPath(String partitionPath) {
Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
try {
fs.mkdirs(path); // create a new partition as needed.
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
- return new Path(path.toString(),
- FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
+ return new Path(path.toString(), FSUtils.makeDataFileName(commitTime,
writeToken, fileId));
}
- public Path makeTempPath(String partitionPath, int taskPartitionId, String
fileName, int stageId,
- long taskAttemptId) {
- Path path = new Path(config.getBasePath(),
HoodieTableMetaClient.TEMPFOLDER_NAME);
- return new Path(path.toString(),
- FSUtils.makeTempDataFileName(partitionPath, commitTime,
taskPartitionId, fileName, stageId,
- taskAttemptId));
+ /**
+ * Creates an empty marker file corresponding to storage writer path
+ * @param partitionPath Partition path
+ */
+ protected void createMarkerFile(String partitionPath) {
+ Path markerPath = makeNewMarkerPath(partitionPath);
+ try {
+ logger.info("Creating Marker Path=" + markerPath);
+ fs.create(markerPath, true).close();
Review comment:
marker should be unique right? so this should work even with
overWrite=false?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services