kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r908287057


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -162,53 +168,102 @@ public void abortTask(TaskAttemptContext 
originalContext) throws IOException {
     TaskAttemptContext context = 
TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
     // Clean up writer data from the local store
-    Map<String, HiveIcebergWriter> writers = 
WriterRegistry.removeWriters(context.getTaskAttemptID());
+    Map<String, List<HiveIcebergWriter>> writerMap = 
WriterRegistry.removeWriters(context.getTaskAttemptID());
 
     // Remove files if it was not done already
-    if (writers != null) {
-      for (HiveIcebergWriter writer : writers.values()) {
-        writer.close(true);
+    if (writerMap != null) {
+      for (List<HiveIcebergWriter> writerList : writerMap.values()) {
+        for (HiveIcebergWriter writer : writerList) {
+          writer.close(true);
+        }
       }
     }
   }
 
+  @Override
+  public void commitJob(JobContext originalContext) throws IOException {
+    commitJobs(Collections.singletonList(originalContext));
+  }
+
+  /**
+   * Wrapper class for storing output {@link Table} and it's context for 
committing changes:
+   * JobContext, CommitInfo.
+   */
+  private static class OutputTable {
+    private final String catalogName;
+    private final String tableName;
+    private final Table table;
+    private final JobContext jobContext;
+    private final SessionStateUtil.CommitInfo commitInfo;
+
+    private OutputTable(String catalogName, String tableName, Table table, 
JobContext jobContext,
+                        SessionStateUtil.CommitInfo commitInfo) {
+      this.catalogName = catalogName;
+      this.tableName = tableName;
+      this.table = table;
+      this.jobContext = jobContext;
+      this.commitInfo = commitInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      OutputTable output1 = (OutputTable) o;
+      return Objects.equals(tableName, output1.tableName) &&
+          Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tableName, jobContext.getJobID());
+    }
+
+    public Optional<SessionStateUtil.CommitInfo> getCommitInfo() {
+      return Optional.ofNullable(commitInfo);
+    }
+  }
+
   /**
    * Reads the commit files stored in the temp directories and collects the 
generated committed data files.
    * Appends the data files to the tables. At the end removes the temporary 
directories.
-   * @param originalContext The job context
+   * @param originalContextList The job context list
    * @throws IOException if there is a failure accessing the files
    */
-  @Override
-  public void commitJob(JobContext originalContext) throws IOException {
-    JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-    JobConf jobConf = jobContext.getJobConf();
+  public void commitJobs(List<JobContext> originalContextList) throws 
IOException {
+    if (originalContextList.isEmpty()) {
+      return;

Review Comment:
   Not sure how. I just tried to keep the original behaviour when jobs were 
committed separately:
   Original code:
   ```
   for (JobContext jobContext : jobContextList.get()) {
         OutputCommitter committer = new HiveIcebergOutputCommitter();
         try {
           committer.commitJob(jobContext);
         } catch (Throwable e) {
   ```
   In this case if the `jobContextList` was an empty list no `commitJob` call 
was performed and nothing was logged.
   
   New code: Ids are collected and logged in the `commitJobs` function.
   ```
       String ids = jobContextList.stream()
           .map(jobContext -> 
jobContext.getJobID().toString()).collect(Collectors.joining(","));
   
       LOG.info("Committing job(s) {} has started", ids);
   ```
   
   if we remove the empty list check a message without job ids is going to be 
logged which is confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to