SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1505595228


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -720,4 +744,42 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
       throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
     }
   }
+
+  public List<DataFile> getWrittenFiles(List<JobContext> jobContexts) throws 
IOException {
+    List<JobContext> jobContextList = jobContexts.stream()
+            .map(TezUtil::enrichContextWithVertexId)
+            .collect(Collectors.toList());
+    List<OutputTable> outputs = collectOutputs(jobContextList);
+    ExecutorService fileExecutor = 
fileExecutor(jobContextList.get(0).getJobConf());
+    ExecutorService tableExecutor = 
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
+    Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
+    try {
+      Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+                      .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext))))
+              .suppressFailureWhenFinished()
+              .executeWith(tableExecutor)
+              .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+              .run(output -> {
+                JobContext jobContext = output.getValue();
+                JobConf jobConf = jobContext.getJobConf();
+                LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+                Table table = output.getKey();
+                String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+                // list jobLocation to get number of forCommit files
+                // we do this because map/reduce num in jobConf is unreliable
+                // and we have no access to vertex status info
+                int numTasks = listForCommits(jobConf, jobLocation).size();
+                FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+                        table.io(), false);
+                dataFiles.addAll(results.dataFiles());
+              }, IOException.class);
+    } finally {
+      fileExecutor.shutdown();
+      if (tableExecutor != null) {

Review Comment:
   `fileExecutor` is always created, whereas `tableExecutor` can be null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to