marton-bod commented on a change in pull request #2228:
URL: https://github.com/apache/iceberg/pull/2228#discussion_r572683992
##########
File path:
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -171,55 +166,104 @@ public void commitJob(JobContext originalContext) throws
IOException {
@Override
public void abortJob(JobContext originalContext, int status) throws
IOException {
JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+ JobConf jobConf = jobContext.getJobConf();
- String location = generateJobLocation(jobContext.getJobConf(),
jobContext.getJobID());
- LOG.info("Job {} is aborted. Cleaning job location {}",
jobContext.getJobID(), location);
+ LOG.info("Job {} is aborted. Cleaning started", jobContext.getJobID());
+ Map<String, String> outputs =
SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
- FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
- List<DataFile> dataFiles = dataFiles(jobContext, io, false);
+ for (String location : outputs.keySet()) {
+ LOG.info("Cleaning job location {}", jobContext.getJobID(), location);
- // Check if we have files already committed and remove data files if there
are any
- if (dataFiles.size() > 0) {
- Tasks.foreach(dataFiles)
- .retry(3)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.debug("Failed on to remove data file
{} on abort job", file.path(), exc))
- .run(file -> io.deleteFile(file.path().toString()));
+ FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
+ List<DataFile> dataFiles = dataFiles(location, jobContext, io, false);
+
+ // Check if we have files already committed and remove data files if
there are any
+ if (dataFiles.size() > 0) {
+ Tasks.foreach(dataFiles)
+ .retry(3)
+ .suppressFailureWhenFinished()
+ .onFailure((file, exc) -> LOG.debug("Failed on to remove data file
{} on abort job", file.path(), exc))
+ .run(file -> io.deleteFile(file.path().toString()));
+ }
}
+ LOG.info("Job {} is aborted. Cleaning finished", jobContext.getJobID());
cleanup(jobContext);
}
+ /**
+ * Collects the additions to a single table and adds/commits the new files
to the Iceberg table..
+ * @param jobContext The job context
+ * @param name The name of the table used for loading from the catalog
+ * @param location The location of the table used for loading from the
catalog
+ */
+ private void commitTable(JobContext jobContext, String name, String
location) {
+ JobConf conf = jobContext.getJobConf();
+ Properties catalogProperties = new Properties();
+ catalogProperties.put(Catalogs.NAME, name);
+ catalogProperties.put(Catalogs.LOCATION, location);
+ Table table = Catalogs.loadTable(conf, catalogProperties);
+
+ long startTime = System.currentTimeMillis();
+ LOG.info("Committing job has started for table: {}, using location: {}",
+ table, generateJobLocation(location, conf, jobContext.getJobID()));
+
+ FileIO io = HiveIcebergStorageHandler.io(conf);
+ List<DataFile> dataFiles = dataFiles(location, jobContext, io, true);
+
+ if (dataFiles.size() > 0) {
+ // Appending data files to the table
+ AppendFiles append = table.newAppend();
+ dataFiles.forEach(append::appendFile);
+ append.commit();
+ LOG.info("Commit took {} ms for table: {} with {} file(s)",
System.currentTimeMillis() - startTime, table,
+ dataFiles.size());
+ LOG.debug("Added files {}", dataFiles);
+ } else {
+ LOG.info("Commit took {} ms for table: {} with no new files",
System.currentTimeMillis() - startTime, table);
+ }
+ }
+
/**
* Cleans up the jobs temporary location.
* @param jobContext The job context
* @throws IOException if there is a failure deleting the files
*/
private void cleanup(JobContext jobContext) throws IOException {
- String location = generateJobLocation(jobContext.getJobConf(),
jobContext.getJobID());
- LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(),
location);
-
- // Remove the job's temp directory recursively.
- // Intentionally used foreach on a single item. Using the Tasks API here
only for the retry capability.
- Tasks.foreach(location)
- .retry(3)
- .suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {}
on cleanup job", file, exc))
- .run(file -> {
- Path toDelete = new Path(file);
- FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf());
- fs.delete(toDelete, true);
- }, IOException.class);
+ JobConf jobConf = jobContext.getJobConf();
+
+ LOG.info("Cleaning for job {} started", jobContext.getJobID());
+ Map<String, String> outputs =
SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
+
+ for (String location : outputs.keySet()) {
+ String tableLocation = generateJobLocation(location, jobConf,
jobContext.getJobID());
+ LOG.info("Cleaning location: {}", tableLocation);
+
+ // Remove the job's temp directory recursively.
+ // Intentionally used foreach on a single item. Using the Tasks API here
only for the retry capability.
+ Tasks.foreach(tableLocation)
Review comment:
Now that we're cleaning up multiple locations, can we take advantage of
the parallelism here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]