marton-bod commented on a change in pull request #2228:
URL: https://github.com/apache/iceberg/pull/2228#discussion_r603226279



##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -118,170 +134,269 @@ public void abortTask(TaskAttemptContext 
originalContext) throws IOException {
     TaskAttemptContext context = 
TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
     // Clean up writer data from the local store
-    HiveIcebergRecordWriter writer = 
HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+    Map<String, HiveIcebergRecordWriter> writers = 
HiveIcebergRecordWriter.removeWriters(context.getTaskAttemptID());
 
     // Remove files if it was not done already
-    if (writer != null) {
-      writer.close(true);
+    if (writers != null) {
+      for (HiveIcebergRecordWriter writer : writers.values()) {
+        writer.close(true);
+      }
     }
   }
 
   /**
-   * Reads the commit files stored in the temp directory and collects the 
generated committed data files.
-   * Appends the data files to the table. At the end removes the temporary 
directory.
+   * Reads the commit files stored in the temp directories and collects the 
generated committed data files.
+   * Appends the data files to the tables. At the end removes the temporary 
directories.
    * @param originalContext The job context
-   * @throws IOException if there is a failure deleting the files
+   * @throws IOException if there is a failure accessing the files
    */
   @Override
   public void commitJob(JobContext originalContext) throws IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-
-    JobConf conf = jobContext.getJobConf();
-    Table table = Catalogs.loadTable(conf);
+    JobConf jobConf = jobContext.getJobConf();
 
     long startTime = System.currentTimeMillis();
-    LOG.info("Committing job has started for table: {}, using location: {}", 
table,
-        generateJobLocation(conf, jobContext.getJobID()));
+    LOG.info("Committing job {} has started", jobContext.getJobID());
 
-    FileIO io = HiveIcebergStorageHandler.table(jobContext.getJobConf()).io();
-    List<DataFile> dataFiles = dataFiles(jobContext, io, true);
+    Collection<String> outputs = 
HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+    Queue<String> jobLocations = new ConcurrentLinkedQueue<>();
 
-    if (dataFiles.size() > 0) {
-      // Appending data files to the table
-      AppendFiles append = table.newAppend();
-      dataFiles.forEach(append::appendFile);
-      append.commit();
-      LOG.info("Commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime, table,
-          dataFiles.size());
-      LOG.debug("Added files {}", dataFiles);
-    } else {
-      LOG.info("Commit took {} ms for table: {} with no new files", 
System.currentTimeMillis() - startTime, table);
+    ExecutorService fileExecutor = fileExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Commits the changes for the output tables in parallel
+      Tasks.foreach(outputs)
+          .throwFailureWhenFinished()
+          .stopOnFailure()
+          .executeWith(tableExecutor)
+          .run(output -> {
+            Table table = HiveIcebergStorageHandler.table(jobConf, output);
+            jobLocations.add(generateJobLocation(table.location(), jobConf, 
jobContext.getJobID()));
+            commitTable(table.io(), fileExecutor, jobContext, output, 
table.location());
+          });
+    } finally {
+      fileExecutor.shutdown();
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
     }
 
-    cleanup(jobContext);
+    LOG.info("Commit took {} ms for job {}", System.currentTimeMillis() - 
startTime, jobContext.getJobID());
+
+    cleanup(jobContext, jobLocations);
   }
 
   /**
-   * Removes the generated data files, if there is a commit file already 
generated for them.
-   * The cleanup at the end removes the temporary directory as well.
+   * Removes the generated data files if there is a commit file already 
generated for them.
+   * The cleanup at the end removes the temporary directories as well.
    * @param originalContext The job context
    * @param status The status of the job
    * @throws IOException if there is a failure deleting the files
    */
   @Override
   public void abortJob(JobContext originalContext, int status) throws 
IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+    JobConf jobConf = jobContext.getJobConf();
 
-    String location = generateJobLocation(jobContext.getJobConf(), 
jobContext.getJobID());
-    LOG.info("Job {} is aborted. Cleaning job location {}", 
jobContext.getJobID(), location);
-
-    FileIO io = HiveIcebergStorageHandler.table(jobContext.getJobConf()).io();
-    List<DataFile> dataFiles = dataFiles(jobContext, io, false);
+    LOG.info("Job {} is aborted. Data file cleaning started", 
jobContext.getJobID());
+    Collection<String> outputs = 
HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+    Queue<String> jobLocations = new ConcurrentLinkedQueue<>();
 
-    // Check if we have files already committed and remove data files if there 
are any
-    if (dataFiles.size() > 0) {
-      Tasks.foreach(dataFiles)
-          .retry(3)
+    ExecutorService fileExecutor = fileExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Cleans up the changes for the output tables in parallel
+      Tasks.foreach(outputs)
           .suppressFailureWhenFinished()
-          .onFailure((file, exc) -> LOG.debug("Failed on to remove data file 
{} on abort job", file.path(), exc))
-          .run(file -> io.deleteFile(file.path().toString()));
+          .executeWith(tableExecutor)
+          .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on 
abort job", output, exc))
+          .run(output -> {
+            LOG.info("Cleaning job for table {}", jobContext.getJobID(), 
output);
+
+            Table table = HiveIcebergStorageHandler.table(jobConf, output);
+            jobLocations.add(generateJobLocation(table.location(), jobConf, 
jobContext.getJobID()));
+            Queue<DataFile> dataFiles = dataFiles(fileExecutor, 
table.location(), jobContext, table.io(), false);
+
+            // Check if we have files already committed and remove data files 
if there are any
+            if (dataFiles.size() > 0) {
+              Tasks.foreach(dataFiles)
+                  .retry(3)
+                  .suppressFailureWhenFinished()
+                  .executeWith(fileExecutor)
+                  .onFailure((file, exc) -> LOG.warn("Failed to remove data 
file {} on abort job", file.path(), exc))
+                  .run(file -> table.io().deleteFile(file.path().toString()));
+            }
+          });
+    } finally {
+      fileExecutor.shutdown();
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
     }
 
-    cleanup(jobContext);
+    LOG.info("Job {} is aborted. Data file cleaning finished", 
jobContext.getJobID());
+
+    cleanup(jobContext, jobLocations);
+  }
+
+  /**
+   * Collects the additions to a single table and adds/commits the new files 
to the Iceberg table.
+   * @param io The io to read the forCommit files
+   * @param executor The executor used to read the forCommit files
+   * @param jobContext The job context
+   * @param name The name of the table used for loading from the catalog
+   * @param location The location of the table used for loading from the 
catalog
+   */
+  private void commitTable(FileIO io, ExecutorService executor, JobContext 
jobContext, String name, String location) {
+    JobConf conf = jobContext.getJobConf();
+    Properties catalogProperties = new Properties();
+    catalogProperties.put(Catalogs.NAME, name);
+    catalogProperties.put(Catalogs.LOCATION, location);
+    Table table = Catalogs.loadTable(conf, catalogProperties);

Review comment:
       Makes sense. Just thinking out loud but I'm wondering whether we can 
remove the table deserialization step if we have to load the table here anyway. 
For example if we also cached the table locations with a similar config prefix, 
and had a method `HiveIcebergStorageHandler.location(jobConf, output)`, then 
could we get rid of the deserialization step?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to