SourabhBadhya commented on code in PR #5076: URL: https://github.com/apache/hive/pull/5076#discussion_r1525735283
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String fileForCommitLocation, Fi throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation); } } + + public List<FileStatus> getWrittenFiles(List<JobContext> jobContexts) throws IOException { + List<OutputTable> outputs = collectOutputs(jobContexts); + ExecutorService fileExecutor = fileExecutor(jobContexts.get(0).getJobConf()); + ExecutorService tableExecutor = tableExecutor(jobContexts.get(0).getJobConf(), outputs.size()); + Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>(); + try { + Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream() + .map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext)))) + .suppressFailureWhenFinished() + .executeWith(tableExecutor) + .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge input file for the table {}", output, exc)) + .run(output -> { + JobContext jobContext = output.getValue(); + JobConf jobConf = jobContext.getJobConf(); + LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output); + + Table table = output.getKey(); + FileSystem fileSystem = new Path(table.location()).getFileSystem(jobConf); + String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); + // list jobLocation to get number of forCommit files + // we do this because map/reduce num in jobConf is unreliable + // and we have no access to vertex status info + int numTasks = listForCommits(jobConf, jobLocation).size(); + FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext, + table.io(), false); + for (DataFile dataFile : results.dataFiles()) { + FileStatus fileStatus = fileSystem.getFileStatus(new Path(dataFile.path().toString())); + dataFiles.add(fileStatus); + } + }, IOException.class); + } finally { + fileExecutor.shutdown(); + if (tableExecutor != null) { + tableExecutor.shutdown(); + } + } + return Lists.newArrayList(dataFiles); + } + + private void cleanMergeTaskInputFiles(JobConf jobConf, + ExecutorService tableExecutor, + TaskAttemptContext context) throws IOException { + // Merge task has merged several files into one. Hence we need to remove the stale files. + // At this stage the file is written and task-committed, but the old files are still present. + if (jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class)) { + MapWork mrwork = Utilities.getMapWork(jobConf); + if (mrwork != null) { + List<Path> mergedPaths = mrwork.getInputPaths(); + if (mergedPaths != null) { + Tasks.foreach(mergedPaths) + .retry(3) + .executeWith(tableExecutor) Review Comment: Same as this comment, https://github.com/apache/hive/pull/5076#discussion_r1505609448 I decided not to add this since we are trying deleting data files which are stale. If we are not able to delete, does this become an orphan file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org