SourabhBadhya commented on code in PR #5076: URL: https://github.com/apache/hive/pull/5076#discussion_r1505595228
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -720,4 +744,42 @@ private static FilesForCommit readFileForCommit(String fileForCommitLocation, Fi throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation); } } + + public List<DataFile> getWrittenFiles(List<JobContext> jobContexts) throws IOException { + List<JobContext> jobContextList = jobContexts.stream() + .map(TezUtil::enrichContextWithVertexId) + .collect(Collectors.toList()); + List<OutputTable> outputs = collectOutputs(jobContextList); + ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf()); + ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size()); + Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>(); + try { + Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream() + .map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext)))) + .suppressFailureWhenFinished() + .executeWith(tableExecutor) + .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge input file for the table {}", output, exc)) + .run(output -> { + JobContext jobContext = output.getValue(); + JobConf jobConf = jobContext.getJobConf(); + LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output); + + Table table = output.getKey(); + String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); + // list jobLocation to get number of forCommit files + // we do this because map/reduce num in jobConf is unreliable + // and we have no access to vertex status info + int numTasks = listForCommits(jobConf, jobLocation).size(); + FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext, + table.io(), false); + dataFiles.addAll(results.dataFiles()); + }, IOException.class); + } finally { + fileExecutor.shutdown(); + if (tableExecutor != null) { Review Comment: `fileExecutor` is always created, whereas `tableExecutor` can be null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org