pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r908269817


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext 
originalContext) throws IOException {
     TaskAttemptContext context = 
TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
     // Clean up writer data from the local store
-    Map<String, HiveIcebergWriter> writers = 
WriterRegistry.removeWriters(context.getTaskAttemptID());
+    Map<String, List<HiveIcebergWriter>> writerMap = 
WriterRegistry.removeWriters(context.getTaskAttemptID());
 
     // Remove files if it was not done already
-    if (writers != null) {
-      for (HiveIcebergWriter writer : writers.values()) {
-        writer.close(true);
+    if (writerMap != null) {
+      for (List<HiveIcebergWriter> writerList : writerMap.values()) {
+        for (HiveIcebergWriter writer : writerList) {
+          writer.close(true);
+        }
       }
     }
   }
 
+  @Override
+  public void commitJob(JobContext originalContext) throws IOException {
+    commitJobs(Collections.singletonList(originalContext));
+  }
+
+  /**
+   * Wrapper class for storing output {@link Table} and it's context for 
committing changes:
+   * JobContext, CommitInfo.
+   */
+  private static class OutputTable {
+    private final String catalogName;
+    private final String tableName;
+    private final Table table;
+    private final JobContext jobContext;
+    private final SessionStateUtil.CommitInfo commitInfo;
+
+    private OutputTable(String catalogName, String tableName, Table table, 
JobContext jobContext,
+                        SessionStateUtil.CommitInfo commitInfo) {
+      this.catalogName = catalogName;
+      this.tableName = tableName;
+      this.table = table;
+      this.jobContext = jobContext;
+      this.commitInfo = commitInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      OutputTable output1 = (OutputTable) o;
+      return Objects.equals(tableName, output1.tableName) &&
+          Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tableName, jobContext.getJobID());
+    }
+
+    public Optional<SessionStateUtil.CommitInfo> getCommitInfo() {
+      return Optional.ofNullable(commitInfo);
+    }
+  }
+
   /**
    * Reads the commit files stored in the temp directories and collects the 
generated committed data files.
    * Appends the data files to the tables. At the end removes the temporary 
directories.
-   * @param originalContext The job context
+   * @param originalContextList The job context list
    * @throws IOException if there is a failure accessing the files
    */
-  @Override
-  public void commitJob(JobContext originalContext) throws IOException {
-    JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-    JobConf jobConf = jobContext.getJobConf();
+  public void commitJobs(List<JobContext> originalContextList) throws 
IOException {
+    if (originalContextList.isEmpty()) {
+      return;

Review Comment:
   If we change anything, a debug level log would be nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to