marton-bod commented on a change in pull request #2228:
URL: https://github.com/apache/iceberg/pull/2228#discussion_r573142013
##########
File path:
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -171,117 +190,207 @@ public void commitJob(JobContext originalContext)
throws IOException {
@Override
public void abortJob(JobContext originalContext, int status) throws
IOException {
JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+ JobConf jobConf = jobContext.getJobConf();
- String location = generateJobLocation(jobContext.getJobConf(),
jobContext.getJobID());
- LOG.info("Job {} is aborted. Cleaning job location {}",
jobContext.getJobID(), location);
-
- FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
- List<DataFile> dataFiles = dataFiles(jobContext, io, false);
+ LOG.info("Job {} is aborted. Data file cleaning started",
jobContext.getJobID());
+ Map<String, String> outputs =
SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
- // Check if we have files already committed and remove data files if there
are any
- if (dataFiles.size() > 0) {
- Tasks.foreach(dataFiles)
- .retry(3)
+ ExecutorService fileReaderExecutor = fileReaderExecutor(jobConf);
+ ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+ try {
+ // Cleans up the changes for the output tables in parallel
+ Tasks.foreach(outputs.entrySet())
.suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.debug("Failed on to remove data file
{} on abort job", file.path(), exc))
- .run(file -> io.deleteFile(file.path().toString()));
+ .executeWith(tableExecutor)
+ .onFailure((entry, exc) -> LOG.debug("Failed cleanup table {} on
abort job", entry.getKey(), exc))
+ .run(entry -> {
+ LOG.info("Cleaning job for table {}", jobContext.getJobID(),
entry.getKey());
+
+ FileIO io = HiveIcebergStorageHandler.io(jobConf);
+ List<DataFile> dataFiles = dataFiles(fileReaderExecutor,
entry.getValue(), jobContext, io, false);
+
+ // Check if we have files already committed and remove data files
if there are any
+ if (dataFiles.size() > 0) {
+ Tasks.foreach(dataFiles)
+ .retry(3)
Review comment:
should we parallelize this as well? maybe using the same
fileReaderExecutor here as well (but rename it to fileIoExecutor or something
similar which deals with file reads/deletes)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]