deniskuzZ commented on code in PR #5418: URL: https://github.com/apache/hive/pull/5418#discussion_r1759154868
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java: ########## @@ -346,43 +343,42 @@ public void abortJobs(List<JobContext> originalContextList) throws IOException { List<JobContext> jobContextList = originalContextList.stream() .map(TezUtil::enrichContextWithVertexId) .collect(Collectors.toList()); - List<OutputTable> outputs = collectOutputs(jobContextList); + Multimap<OutputTable, JobContext> outputs = collectOutputs(jobContextList); + JobConf jobConf = jobContextList.get(0).getJobConf(); String ids = jobContextList.stream() .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(",")); LOG.info("Job(s) {} are aborted. Data file cleaning started", ids); - Collection<String> jobLocations = new ConcurrentLinkedQueue<>(); + ExecutorService fileExecutor = fileExecutor(jobConf); + ExecutorService tableExecutor = tableExecutor(jobConf, outputs.keySet().size()); - ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf()); - ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size()); + Collection<String> jobLocations = new ConcurrentLinkedQueue<>(); try { // Cleans up the changes for the output tables in parallel - Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream() - .map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext)))) + Tasks.foreach(outputs.keySet()) .suppressFailureWhenFinished() .executeWith(tableExecutor) .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc)) .run(output -> { - JobContext jobContext = output.getValue(); - JobConf jobConf = jobContext.getJobConf(); - LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output); - - Table table = output.getKey(); - String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); - jobLocations.add(jobLocation); - // list jobLocation to get number of forCommit files - // we do this because map/reduce num in jobConf is unreliable and we have no access to vertex status info - int numTasks = listForCommits(jobConf, jobLocation).size(); - FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext, - table.io(), false); - // Check if we have files already written and remove data and delta files if there are any - Tasks.foreach(results.allFiles()) - .retry(3) - .suppressFailureWhenFinished() - .executeWith(fileExecutor) - .onFailure((file, exc) -> LOG.warn("Failed to remove data file {} on abort job", file.path(), exc)) - .run(file -> table.io().deleteFile(file.path().toString())); + for (JobContext jobContext : outputs.get(output)) { Review Comment: it was always like this, just we haven't used the executors properly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org