pvary commented on a change in pull request #2228:
URL: https://github.com/apache/iceberg/pull/2228#discussion_r597500201



##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -135,29 +153,31 @@ public void abortTask(TaskAttemptContext originalContext) 
throws IOException {
   @Override
   public void commitJob(JobContext originalContext) throws IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-
-    JobConf conf = jobContext.getJobConf();
-    Table table = Catalogs.loadTable(conf);
+    JobConf jobConf = jobContext.getJobConf();
 
     long startTime = System.currentTimeMillis();
-    LOG.info("Committing job has started for table: {}, using location: {}", 
table,
-        generateJobLocation(conf, jobContext.getJobID()));
+    LOG.info("Committing job {} has started", jobContext.getJobID());
 
-    FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
-    List<DataFile> dataFiles = dataFiles(jobContext, io, true);
+    Map<String, String> outputs = 
SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
 
-    if (dataFiles.size() > 0) {
-      // Appending data files to the table
-      AppendFiles append = table.newAppend();
-      dataFiles.forEach(append::appendFile);
-      append.commit();
-      LOG.info("Commit took {} ms for table: {} with {} file(s)", 
System.currentTimeMillis() - startTime, table,
-          dataFiles.size());
-      LOG.debug("Added files {}", dataFiles);
-    } else {
-      LOG.info("Commit took {} ms for table: {} with no new files", 
System.currentTimeMillis() - startTime, table);
+    ExecutorService fileExecutor = fileExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Commits the changes for the output tables in parallel
+      Tasks.foreach(outputs.entrySet())
+          .throwFailureWhenFinished()
+          .stopOnFailure()
+          .executeWith(tableExecutor)
+          .run(entry -> commitTable(fileExecutor, jobContext, entry.getKey(), 
entry.getValue()));

Review comment:
       Sadly with the current Hive API we can not have atomicity.
   
   Currently here is how Hive works:
   1. Create a staging directory for the query inside the table root directories
   2. Create files inside these staging directories, and write data there - 
Mappers/Reducers
   3. With a MoveTask move the contents of the staging directory to the final 
place
   
   SerDe provides a way to change the step 2., but we do not have access to the 
other steps.
   
   On HDFS this works fairly well, since the hdfs move is fast / atomic (still 
no guarantees for multi table inserts), but on S3 it is not "optimal". Hive 
solves this for the ACID tables by making the changes available for reads when 
the transaction is committed. When #1849 is available we have to do some kind 
of 2 way commit, so the Hive ACID transaction is committed together with the 
Iceberg transaction. Not yet sure how it will look like. Maybe shared locking? 
Maybe adding two way commit to the API? We will find out when we get there.
   
   Until the problem is solved correctly this could be a somewhat limping 
solution for these queries to execute offering the similar guarantees than the 
current multitable inserts above S3 using Hive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to