marton-bod commented on a change in pull request #2228:
URL: https://github.com/apache/iceberg/pull/2228#discussion_r573138697



##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -171,117 +190,207 @@ public void commitJob(JobContext originalContext) 
throws IOException {
   @Override
   public void abortJob(JobContext originalContext, int status) throws 
IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+    JobConf jobConf = jobContext.getJobConf();
 
-    String location = generateJobLocation(jobContext.getJobConf(), 
jobContext.getJobID());
-    LOG.info("Job {} is aborted. Cleaning job location {}", 
jobContext.getJobID(), location);
-
-    FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
-    List<DataFile> dataFiles = dataFiles(jobContext, io, false);
+    LOG.info("Job {} is aborted. Data file cleaning started", 
jobContext.getJobID());
+    Map<String, String> outputs = 
SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
 
-    // Check if we have files already committed and remove data files if there 
are any
-    if (dataFiles.size() > 0) {
-      Tasks.foreach(dataFiles)
-          .retry(3)
+    ExecutorService fileReaderExecutor = fileReaderExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Cleans up the changes for the output tables in parallel
+      Tasks.foreach(outputs.entrySet())
           .suppressFailureWhenFinished()
-          .onFailure((file, exc) -> LOG.debug("Failed on to remove data file 
{} on abort job", file.path(), exc))
-          .run(file -> io.deleteFile(file.path().toString()));
+          .executeWith(tableExecutor)
+          .onFailure((entry, exc) -> LOG.debug("Failed cleanup table {} on 
abort job", entry.getKey(), exc))
+          .run(entry -> {
+            LOG.info("Cleaning job for table {}", jobContext.getJobID(), 
entry.getKey());
+
+            FileIO io = HiveIcebergStorageHandler.io(jobConf);
+            List<DataFile> dataFiles = dataFiles(fileReaderExecutor, 
entry.getValue(), jobContext, io, false);
+
+            // Check if we have files already committed and remove data files 
if there are any
+            if (dataFiles.size() > 0) {
+              Tasks.foreach(dataFiles)
+                  .retry(3)
+                  .suppressFailureWhenFinished()
+                  .onFailure((file, exc) -> LOG.debug("Failed to remove data 
file {} on abort job", file.path(), exc))
+                  .run(file -> io.deleteFile(file.path().toString()));
+            }
+          });
+    } finally {
+      fileReaderExecutor.shutdown();
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
     }
 
+    LOG.info("Job {} is aborted. Data file cleaning finished", 
jobContext.getJobID());
+
     cleanup(jobContext);
   }
 
+  /**
+   * Collects the additions to a single table and adds/commits the new files 
to the Iceberg table.
+   * @param executor The executor used to read the forCommit files
+   * @param jobContext The job context
+   * @param name The name of the table used for loading from the catalog
+   * @param location The location of the table used for loading from the 
catalog
+   */
+  private void commitTable(ExecutorService executor, JobContext jobContext, 
String name, String location) {
+    JobConf conf = jobContext.getJobConf();
+    Properties catalogProperties = new Properties();
+    catalogProperties.put(Catalogs.NAME, name);
+    catalogProperties.put(Catalogs.LOCATION, location);
+    Table table = Catalogs.loadTable(conf, catalogProperties);
+
+    long startTime = System.currentTimeMillis();
+    LOG.info("Committing job has started for table: {}, using location: {}",
+        table, generateJobLocation(location, conf, jobContext.getJobID()));
+
+    FileIO io = HiveIcebergStorageHandler.io(conf);
+    List<DataFile> dataFiles = dataFiles(executor, location, jobContext, io, 
true);
+
+    if (dataFiles.size() > 0) {
+      // Appending data files to the table
+      AppendFiles append = table.newAppend();
+      dataFiles.forEach(append::appendFile);
+      append.commit();
+      LOG.info("Commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime, table,
+          dataFiles.size());
+      LOG.debug("Added files {}", dataFiles);
+    } else {
+      LOG.info("Commit took {} ms for table: {} with no new files", 
System.currentTimeMillis() - startTime, table);
+    }
+  }
+
   /**
    * Cleans up the jobs temporary location.
    * @param jobContext The job context
    * @throws IOException if there is a failure deleting the files
    */
   private void cleanup(JobContext jobContext) throws IOException {
-    String location = generateJobLocation(jobContext.getJobConf(), 
jobContext.getJobID());
-    LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(), 
location);
+    JobConf jobConf = jobContext.getJobConf();
 
-    // Remove the job's temp directory recursively.
-    // Intentionally used foreach on a single item. Using the Tasks API here 
only for the retry capability.
-    Tasks.foreach(location)
-        .retry(3)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} 
on cleanup job", file, exc))
-        .run(file -> {
-          Path toDelete = new Path(file);
-          FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf());
-          fs.delete(toDelete, true);
-        }, IOException.class);
+    LOG.info("Cleaning for job {} started", jobContext.getJobID());
+    Map<String, String> outputs = 
SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
+
+    for (String table : outputs.keySet()) {
+      String tableLocation = generateJobLocation(outputs.get(table), jobConf, 
jobContext.getJobID());
+      LOG.info("Cleaning table {} on location: {}", table, tableLocation);
+
+      // Remove the job's temp directory recursively.
+      // Intentionally used foreach on a single item. Using the Tasks API here 
only for the retry capability.
+      Tasks.foreach(tableLocation)
+          .retry(3)
+          .suppressFailureWhenFinished()
+          .onFailure((file, exc) -> LOG.debug("Failed on to remove directory 
{} on cleanup job", file, exc))
+          .run(file -> {
+            Path toDelete = new Path(file);
+            FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf());
+            fs.delete(toDelete, true);
+          }, IOException.class);
+    }
+
+    LOG.info("Cleaning for job {} finished", jobContext.getJobID());
+  }
+
+  /**
+   * Executor service for parallel handling of file reads. Should be shared 
when committing multiple tables.
+   * @param conf The configuration containing the pool size
+   * @return The generated executor service
+   */
+  private static ExecutorService fileReaderExecutor(Configuration conf) {
+    int size = conf.getInt(InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE,
+        InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT);
+    return Executors.newFixedThreadPool(
+        size,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setPriority(Thread.NORM_PRIORITY)
+            .setNameFormat("iceberg-commit-file-pool-%d")
+            .build());
+  }
+
+  /**
+   * Executor service for parallel handling of table manipulation. Could 
return null, if no parallelism is possible.
+   * @param conf The configuration containing the pool size
+   * @return The generated executor service, or null if executor is not needed.
+   */
+  private static ExecutorService tableExecutor(Configuration conf, int 
outputSize) {
+    int size = conf.getInt(InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE,
+        InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT);
+    size = Math.max(outputSize, size);

Review comment:
       Would it make sense to use `Math.min` instead? If the outputSize is 
smaller than the conf, we'll end up launching threads unnecessarily. If 
outputSize is larger, using `min` would still make sense as it would enforce 
the conf size setting provided by the user (maybe only if they provided one 
explicitly, other than the default)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to