kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899942578
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
public void storageHandlerCommit(Properties commitProperties, boolean
overwrite) throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
Configuration configuration = SessionState.getSessionConf();
- Optional<JobContext> jobContext = generateJobContext(configuration,
tableName, overwrite);
- if (jobContext.isPresent()) {
+ Optional<List<JobContext>> jobContextList =
generateJobContext(configuration, tableName, overwrite);
+ if (!jobContextList.isPresent()) {
+ return;
+ }
+
+ for (JobContext jobContext : jobContextList.get()) {
OutputCommitter committer = new HiveIcebergOutputCommitter();
try {
- committer.commitJob(jobContext.get());
+ committer.commitJob(jobContext);
} catch (Throwable e) {
// Aborting the job if the commit has failed
LOG.error("Error while trying to commit job: {}, starting rollback
changes for table: {}",
- jobContext.get().getJobID(), tableName, e);
+ jobContext.getJobID(), tableName, e);
try {
- committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+ committer.abortJob(jobContext, JobStatus.State.FAILED);
Review Comment:
I think all jobs should be rolled back in case of error when committing any
of them. To achieve this we are using `org.apache.iceberg.util.Tasks`:
```
Tasks.foreach(outputs)
.throwFailureWhenFinished()
.stopOnFailure()
.run(output -> {
...
```
which can revert all tasks in case of error even if some of them are already
succeeded.
The initial implementation committed each job independently: all jobs
launched a separate batch of tasks.
I refactored this part to collect all outputs from all jobs and launch it in
one batch.
I also found that this is done parallel and we are looking up the necessary
data for commit in the SessionState which is stored thread locally. I
experienced that this is working only if one output exists since only one
worker thread is used and that is the main thread where the `SessionState` is
initialized. However if more than one outputs exists in a batch threads other
than the main thread does not have the necessary data for commit in the
`SessionState`.
So I extracted collecting these data prior launching the tasks.
This affects multi inserts, split updates and merge statements. I haven't
found any tests for multi inserting into an iceberg table (please share some if
any exists) I assume this issue haven't came up before.
Please share your thoughts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]