SourabhBadhya commented on code in PR #5076:
URL: https://github.com/apache/hive/pull/5076#discussion_r1525735283
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -720,4 +725,65 @@ private static FilesForCommit readFileForCommit(String
fileForCommitLocation, Fi
throw new NotFoundException("Can not read or parse committed file: %s",
fileForCommitLocation);
}
}
+
+ public List<FileStatus> getWrittenFiles(List<JobContext> jobContexts) throws
IOException {
+ List<OutputTable> outputs = collectOutputs(jobContexts);
+ ExecutorService fileExecutor =
fileExecutor(jobContexts.get(0).getJobConf());
+ ExecutorService tableExecutor =
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+ Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>();
+ try {
+ Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+ .map(jobContext -> new SimpleImmutableEntry<>(kv.table,
jobContext))))
+ .suppressFailureWhenFinished()
+ .executeWith(tableExecutor)
+ .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge
input file for the table {}", output, exc))
+ .run(output -> {
+ JobContext jobContext = output.getValue();
+ JobConf jobConf = jobContext.getJobConf();
+ LOG.info("Cleaning job for jobID: {}, table: {}",
jobContext.getJobID(), output);
+
+ Table table = output.getKey();
+ FileSystem fileSystem = new
Path(table.location()).getFileSystem(jobConf);
+ String jobLocation = generateJobLocation(table.location(),
jobConf, jobContext.getJobID());
+ // list jobLocation to get number of forCommit files
+ // we do this because map/reduce num in jobConf is unreliable
+ // and we have no access to vertex status info
+ int numTasks = listForCommits(jobConf, jobLocation).size();
+ FilesForCommit results = collectResults(numTasks,
fileExecutor, table.location(), jobContext,
+ table.io(), false);
+ for (DataFile dataFile : results.dataFiles()) {
+ FileStatus fileStatus = fileSystem.getFileStatus(new
Path(dataFile.path().toString()));
+ dataFiles.add(fileStatus);
+ }
+ }, IOException.class);
+ } finally {
+ fileExecutor.shutdown();
+ if (tableExecutor != null) {
+ tableExecutor.shutdown();
+ }
+ }
+ return Lists.newArrayList(dataFiles);
+ }
+
+ private void cleanMergeTaskInputFiles(JobConf jobConf,
+ ExecutorService tableExecutor,
+ TaskAttemptContext context) throws
IOException {
+ // Merge task has merged several files into one. Hence we need to remove
the stale files.
+ // At this stage the file is written and task-committed, but the old files
are still present.
+ if
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
{
+ MapWork mrwork = Utilities.getMapWork(jobConf);
+ if (mrwork != null) {
+ List<Path> mergedPaths = mrwork.getInputPaths();
+ if (mergedPaths != null) {
+ Tasks.foreach(mergedPaths)
+ .retry(3)
+ .executeWith(tableExecutor)
Review Comment:
Same as this comment,
https://github.com/apache/hive/pull/5076#discussion_r1505609448
I decided not to add this since we are trying deleting data files which are
stale. If we are not able to delete, does this become an orphan file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]