pvary commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r898887564
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
public void storageHandlerCommit(Properties commitProperties, boolean
overwrite) throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
Configuration configuration = SessionState.getSessionConf();
- Optional<JobContext> jobContext = generateJobContext(configuration,
tableName, overwrite);
- if (jobContext.isPresent()) {
+ Optional<List<JobContext>> jobContextList =
generateJobContext(configuration, tableName, overwrite);
+ if (!jobContextList.isPresent()) {
+ return;
+ }
+
+ for (JobContext jobContext : jobContextList.get()) {
OutputCommitter committer = new HiveIcebergOutputCommitter();
try {
- committer.commitJob(jobContext.get());
+ committer.commitJob(jobContext);
} catch (Throwable e) {
// Aborting the job if the commit has failed
LOG.error("Error while trying to commit job: {}, starting rollback
changes for table: {}",
- jobContext.get().getJobID(), tableName, e);
+ jobContext.getJobID(), tableName, e);
Review Comment:
please remove the extra spaces
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
public void storageHandlerCommit(Properties commitProperties, boolean
overwrite) throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
Configuration configuration = SessionState.getSessionConf();
- Optional<JobContext> jobContext = generateJobContext(configuration,
tableName, overwrite);
- if (jobContext.isPresent()) {
+ Optional<List<JobContext>> jobContextList =
generateJobContext(configuration, tableName, overwrite);
+ if (!jobContextList.isPresent()) {
+ return;
+ }
+
+ for (JobContext jobContext : jobContextList.get()) {
OutputCommitter committer = new HiveIcebergOutputCommitter();
try {
- committer.commitJob(jobContext.get());
+ committer.commitJob(jobContext);
} catch (Throwable e) {
// Aborting the job if the commit has failed
LOG.error("Error while trying to commit job: {}, starting rollback
changes for table: {}",
- jobContext.get().getJobID(), tableName, e);
+ jobContext.getJobID(), tableName, e);
try {
- committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+ committer.abortJob(jobContext, JobStatus.State.FAILED);
} catch (IOException ioe) {
LOG.error("Error while trying to abort failed job. There might be
uncleaned data files.", ioe);
// no throwing here because the original exception should be
propagated
}
throw new HiveException(
- "Error committing job: " + jobContext.get().getJobID() + " for
table: " + tableName, e);
+ "Error committing job: " + jobContext.getJobID() + " for
table: " + tableName, e);
Review Comment:
please remove the extra spaces
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]