kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899942578


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
   public void storageHandlerCommit(Properties commitProperties, boolean 
overwrite) throws HiveException {
     String tableName = commitProperties.getProperty(Catalogs.NAME);
     Configuration configuration = SessionState.getSessionConf();
-    Optional<JobContext> jobContext = generateJobContext(configuration, 
tableName, overwrite);
-    if (jobContext.isPresent()) {
+    Optional<List<JobContext>> jobContextList = 
generateJobContext(configuration, tableName, overwrite);
+    if (!jobContextList.isPresent()) {
+      return;
+    }
+
+    for (JobContext jobContext : jobContextList.get()) {
       OutputCommitter committer = new HiveIcebergOutputCommitter();
       try {
-        committer.commitJob(jobContext.get());
+        committer.commitJob(jobContext);
       } catch (Throwable e) {
         // Aborting the job if the commit has failed
         LOG.error("Error while trying to commit job: {}, starting rollback 
changes for table: {}",
-            jobContext.get().getJobID(), tableName, e);
+                jobContext.getJobID(), tableName, e);
         try {
-          committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+          committer.abortJob(jobContext, JobStatus.State.FAILED);

Review Comment:
   I think all jobs should be rolled back in case of error when committing any 
of them. To achieve this we are using `org.apache.iceberg.util.Tasks`:
   ```
         Tasks.foreach(outputs)
             .throwFailureWhenFinished()
             .stopOnFailure()
             .run(output -> {
   ...
   ```
   which can revert all tasks in case of error even if some of them are already 
succeeded.
   
   The initial implementation committed each job independently: all jobs 
launched a separate batch of tasks.
   I refactored this part to collect all outputs from all jobs and launch it in 
one batch.
   I also found that this is done parallel and we are looking up the necessary 
data for commit in the SessionState which is stored thread locally. I 
experienced that this is working only if one output exists since only one 
worker thread is used and that is the main thread where the `SessionState` is 
initialized. However if more than one outputs exists in a batch threads other 
than the main thread does not have the necessary data for commit in the 
`SessionState`.
   So I extracted collecting these data prior launching the tasks.
   
   This affects multi inserts, split updates and merge statements. I haven't 
found any tests for multi inserting into an iceberg table (please share some if 
any exists) I assume this issue haven't came up before.
   
   Please share your thoughts.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to