pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r908277556
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java:
##########
@@ -231,25 +314,47 @@ public void commitJob(JobContext originalContext) throws
IOException {
*/
@Override
public void abortJob(JobContext originalContext, int status) throws
IOException {
- JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
- JobConf jobConf = jobContext.getJobConf();
+ abortJobs(Collections.singletonList(originalContext));
+ }
+
+ public void abortJobs(List<JobContext> jobContexts, JobStatus.State
runState) throws IOException {
+ int state = runState.getValue();
+ if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
+ throw new IOException("Invalid job run state : " + runState.name());
+ } else {
+ this.abortJobs(jobContexts);
+ }
+ }
- LOG.info("Job {} is aborted. Data file cleaning started",
jobContext.getJobID());
- Collection<String> outputs =
HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+ public void abortJobs(List<JobContext> originalContextList) throws
IOException {
+ if (originalContextList.isEmpty()) {
+ return;
+ }
+
+ List<JobContext> jobContextList = originalContextList.stream()
+ .map(TezUtil::enrichContextWithVertexId)
+ .collect(Collectors.toList());
+ String ids = jobContextList.stream()
+ .map(jobContext ->
jobContext.getJobID().toString()).collect(Collectors.joining(","));
+ Set<OutputTable> outputs = collectOutputs(jobContextList);
+
+ LOG.info("Job(s) {} are aborted. Data file cleaning started", ids);
Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
- ExecutorService fileExecutor = fileExecutor(jobConf);
- ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+ ExecutorService fileExecutor =
fileExecutor(jobContextList.get(0).getJobConf());
+ ExecutorService tableExecutor =
tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
try {
// Cleans up the changes for the output tables in parallel
Tasks.foreach(outputs)
.suppressFailureWhenFinished()
.executeWith(tableExecutor)
.onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on
abort job", output, exc))
.run(output -> {
+ JobContext jobContext = output.jobContext;
+ JobConf jobConf = output.jobContext.getJobConf();
Review Comment:
nit?:
```
JobConf jobConf = jobContext.getJobConf();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]