deniskuzZ commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1529953727


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String 
fileForCommitLocation, Fi
       throw new NotFoundException("Can not read or parse committed file: %s", 
fileForCommitLocation);
     }
   }
+
+  public List<FileStatus> getWrittenFiles(List<JobContext> jobContexts) throws 
IOException {
+    List<OutputTable> outputs = collectOutputs(jobContexts);
+    ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+    ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+    Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>();
+    try {
+      Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+                      .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext))))
+              .suppressFailureWhenFinished()
+              .executeWith(tableExecutor)
+              .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+              .run(output -> {
+                JobContext jobContext = output.getValue();
+                JobConf jobConf = jobContext.getJobConf();
+                LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+                Table table = output.getKey();
+                FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+                String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+                // list jobLocation to get number of forCommit files
+                // we do this because map/reduce num in jobConf is unreliable
+                // and we have no access to vertex status info
+                int numTasks = listForCommits(jobConf, jobLocation).size();
+                FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+                        table.io(), false);
+                for (DataFile dataFile : results.dataFiles()) {
+                  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
+                  dataFiles.add(fileStatus);
+                }
+              }, IOException.class);
+    } finally {
+      fileExecutor.shutdown();
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
+    }
+    return Lists.newArrayList(dataFiles);
+  }
+
+  private void cleanMergeTaskInputFiles(JobConf jobConf,
+                                        ExecutorService tableExecutor,
+                                        TaskAttemptContext context) throws 
IOException {
+    // Merge task has merged several files into one. Hence we need to remove 
the stale files.
+    // At this stage the file is written and task-committed, but the old files 
are still present.
+    if 
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
 {
+      MapWork mrwork = Utilities.getMapWork(jobConf);
+      if (mrwork != null) {
+        List<Path> mergedPaths = mrwork.getInputPaths();
+        if (mergedPaths != null) {
+          Tasks.foreach(mergedPaths)
+                  .retry(3)
+                  .executeWith(tableExecutor)

Review Comment:
   i think so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to