vinothchandar commented on a change in pull request #651: Spark Stage retry 
handling
URL: https://github.com/apache/incubator-hudi/pull/651#discussion_r285282727
 
 

 ##########
 File path: hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
 ##########
 @@ -63,50 +71,64 @@ public HoodieIOHandle(HoodieWriteConfig config, String 
commitTime, HoodieTable<T
         config.getWriteStatusFailureFraction());
   }
 
+  private static FileSystem getFileSystem(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), 
config.isConsistencyCheckEnabled()
+        ? 
FSUtils.getFailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
+        config.getMaxConsistencyChecks(), 
config.getInitialConsistencyCheckIntervalMs(),
+        config.getMaxConsistencyCheckIntervalMs()) : new 
NoOpConsistencyGuard());
+  }
+
   /**
-   * Deletes any new tmp files written during the current commit, into the 
partition
+   * Generate a write token based on the currently running spark task and its 
place in the spark dag.
    */
-  public static void cleanupTmpFilesFromCurrentCommit(HoodieWriteConfig 
config, String commitTime,
-      String partitionPath, int taskPartitionId, HoodieTable hoodieTable) {
-    FileSystem fs = hoodieTable.getMetaClient().getFs();
-    try {
-      FileStatus[] prevFailedFiles = fs.globStatus(new Path(String
-          .format("%s/%s/%s", config.getBasePath(), partitionPath,
-              FSUtils.maskWithoutFileId(commitTime, taskPartitionId))));
-      if (prevFailedFiles != null) {
-        logger.info(
-            "Deleting " + prevFailedFiles.length + " files generated by 
previous failed attempts.");
-        for (FileStatus status : prevFailedFiles) {
-          fs.delete(status.getPath(), false);
-        }
-      }
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to cleanup Temp files from commit " 
+ commitTime, e);
-    }
+  private static String makeSparkWriteToken() {
+    return FSUtils.makeWriteToken(TaskContext.getPartitionId(), 
TaskContext.get().stageId(),
+        TaskContext.get().taskAttemptId());
   }
 
   public static Schema createHoodieWriteSchema(Schema originalSchema) {
     return HoodieAvroUtils.addMetadataFields(originalSchema);
   }
 
-  public Path makeNewPath(String partitionPath, int taskPartitionId, String 
fileName) {
+  public Path makeNewPath(String partitionPath) {
     Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
     try {
       fs.mkdirs(path); // create a new partition as needed.
     } catch (IOException e) {
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
 
-    return new Path(path.toString(),
-        FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
+    return new Path(path.toString(), FSUtils.makeDataFileName(commitTime, 
writeToken, fileId));
   }
 
-  public Path makeTempPath(String partitionPath, int taskPartitionId, String 
fileName, int stageId,
-      long taskAttemptId) {
-    Path path = new Path(config.getBasePath(), 
HoodieTableMetaClient.TEMPFOLDER_NAME);
-    return new Path(path.toString(),
-        FSUtils.makeTempDataFileName(partitionPath, commitTime, 
taskPartitionId, fileName, stageId,
-            taskAttemptId));
+  /**
+   * Creates an empty marker file corresponding to storage writer path
+   * @param partitionPath Partition path
+   */
+  protected void createMarkerFile(String partitionPath) {
+    Path markerPath = makeNewMarkerPath(partitionPath);
+    try {
+      logger.info("Creating Marker Path=" + markerPath);
+      fs.create(markerPath, true).close();
 
 Review comment:
   marker should be unique right? so this should work even with 
overWrite=false? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to