pvary commented on a change in pull request #2228:
URL: https://github.com/apache/iceberg/pull/2228#discussion_r598578564



##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -135,29 +153,31 @@ public void abortTask(TaskAttemptContext originalContext) 
throws IOException {
   @Override
   public void commitJob(JobContext originalContext) throws IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-
-    JobConf conf = jobContext.getJobConf();
-    Table table = Catalogs.loadTable(conf);
+    JobConf jobConf = jobContext.getJobConf();
 
     long startTime = System.currentTimeMillis();
-    LOG.info("Committing job has started for table: {}, using location: {}", 
table,
-        generateJobLocation(conf, jobContext.getJobID()));
+    LOG.info("Committing job {} has started", jobContext.getJobID());
 
-    FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
-    List<DataFile> dataFiles = dataFiles(jobContext, io, true);
+    Map<String, String> outputs = 
SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
 
-    if (dataFiles.size() > 0) {
-      // Appending data files to the table
-      AppendFiles append = table.newAppend();
-      dataFiles.forEach(append::appendFile);
-      append.commit();
-      LOG.info("Commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime, table,
-          dataFiles.size());
-      LOG.debug("Added files {}", dataFiles);
-    } else {
-      LOG.info("Commit took {} ms for table: {} with no new files", 
System.currentTimeMillis() - startTime, table);
+    ExecutorService fileExecutor = fileExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());

Review comment:
       If we keep the option for running things in parallel, then I would like 
to keep a different configuration option for the 2 pools (file executors 
definitely benefit from parallel reading, but one might want to turn off the 
table parallelization separately). We can not reuse the same pool as this might 
lead to deadlock when the table executor is exhausted and no more thread remain 
for actually reading the `forCommit` files.
   
   Also the `tableExecutor` is typically `null`, as we do not create extra 
threads for single table writes. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to