pvary commented on a change in pull request #2228:
URL: https://github.com/apache/iceberg/pull/2228#discussion_r573154366



##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -171,117 +190,207 @@ public void commitJob(JobContext originalContext) 
throws IOException {
   @Override
   public void abortJob(JobContext originalContext, int status) throws 
IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+    JobConf jobConf = jobContext.getJobConf();
 
-    String location = generateJobLocation(jobContext.getJobConf(), 
jobContext.getJobID());
-    LOG.info("Job {} is aborted. Cleaning job location {}", 
jobContext.getJobID(), location);
-
-    FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
-    List<DataFile> dataFiles = dataFiles(jobContext, io, false);
+    LOG.info("Job {} is aborted. Data file cleaning started", 
jobContext.getJobID());
+    Map<String, String> outputs = 
SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
 
-    // Check if we have files already committed and remove data files if there 
are any
-    if (dataFiles.size() > 0) {
-      Tasks.foreach(dataFiles)
-          .retry(3)
+    ExecutorService fileReaderExecutor = fileReaderExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Cleans up the changes for the output tables in parallel
+      Tasks.foreach(outputs.entrySet())
           .suppressFailureWhenFinished()
-          .onFailure((file, exc) -> LOG.debug("Failed on to remove data file 
{} on abort job", file.path(), exc))
-          .run(file -> io.deleteFile(file.path().toString()));
+          .executeWith(tableExecutor)
+          .onFailure((entry, exc) -> LOG.debug("Failed cleanup table {} on 
abort job", entry.getKey(), exc))
+          .run(entry -> {
+            LOG.info("Cleaning job for table {}", jobContext.getJobID(), 
entry.getKey());
+
+            FileIO io = HiveIcebergStorageHandler.io(jobConf);
+            List<DataFile> dataFiles = dataFiles(fileReaderExecutor, 
entry.getValue(), jobContext, io, false);
+
+            // Check if we have files already committed and remove data files 
if there are any
+            if (dataFiles.size() > 0) {
+              Tasks.foreach(dataFiles)
+                  .retry(3)

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to