[
https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782338&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782338
]
ASF GitHub Bot logged work on HIVE-26319:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Jun/22 09:26
Start Date: 17/Jun/22 09:26
Worklog Time Spent: 10m
Work Description: kasakrisz commented on code in PR #3362:
URL: https://github.com/apache/hive/pull/3362#discussion_r899942578
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -411,23 +411,27 @@ public boolean commitInMoveTask() {
public void storageHandlerCommit(Properties commitProperties, boolean
overwrite) throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
Configuration configuration = SessionState.getSessionConf();
- Optional<JobContext> jobContext = generateJobContext(configuration,
tableName, overwrite);
- if (jobContext.isPresent()) {
+ Optional<List<JobContext>> jobContextList =
generateJobContext(configuration, tableName, overwrite);
+ if (!jobContextList.isPresent()) {
+ return;
+ }
+
+ for (JobContext jobContext : jobContextList.get()) {
OutputCommitter committer = new HiveIcebergOutputCommitter();
try {
- committer.commitJob(jobContext.get());
+ committer.commitJob(jobContext);
} catch (Throwable e) {
// Aborting the job if the commit has failed
LOG.error("Error while trying to commit job: {}, starting rollback
changes for table: {}",
- jobContext.get().getJobID(), tableName, e);
+ jobContext.getJobID(), tableName, e);
try {
- committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
+ committer.abortJob(jobContext, JobStatus.State.FAILED);
Review Comment:
I think all jobs should be rolled back in case of error when committing any
of them. To achieve this we are using `org.apache.iceberg.util.Tasks`:
```
Tasks.foreach(outputs)
.throwFailureWhenFinished()
.stopOnFailure()
.run(output -> {
...
```
which can revert all tasks in case of error even if some of them are already
succeeded.
The initial implementation committed each job independently: all jobs
launched a separate batch of tasks.
I refactored this part to collect all outputs from all jobs and launch it in
one batch.
I also found that this is done parallel and we are looking up the necessary
data for commit in the SessionState which is stored thread locally. I
experienced that this is working only if one output exists since only one
worker thread is used and that is the main thread where the `SessionState` is
initialized. However if more than one outputs exists in a batch threads other
than the main thread does not have the necessary data for commit in the
`SessionState`.
So I extracted collecting these data prior launching the tasks.
This affects multi inserts, split updates and merge statements. I haven't
found any tests for multi inserting into an iceberg table (please share some if
any exists) I assume this issue haven't came up before.
Please share your thoughts.
Issue Time Tracking
-------------------
Worklog Id: (was: 782338)
Time Spent: 2h (was: 1h 50m)
> Iceberg integration: Perform update split early
> -----------------------------------------------
>
> Key: HIVE-26319
> URL: https://issues.apache.org/jira/browse/HIVE-26319
> Project: Hive
> Issue Type: Improvement
> Components: File Formats
> Reporter: Krisztian Kasa
> Assignee: Krisztian Kasa
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.0.0
>
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Extend update split early to iceberg tables like in HIVE-21160 for native
> acid tables
--
This message was sent by Atlassian Jira
(v8.20.7#820007)