[ 
https://issues.apache.org/jira/browse/HIVE-25006?focusedWorklogId=585731&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-585731
 ]

ASF GitHub Bot logged work on HIVE-25006:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/21 11:41
            Start Date: 20/Apr/21 11:41
    Worklog Time Spent: 10m 
      Work Description: pvary commented on a change in pull request #2161:
URL: https://github.com/apache/hive/pull/2161#discussion_r616603042



##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -256,4 +265,74 @@ private static PartitionSpec spec(Schema schema, 
Properties properties,
       return PartitionSpec.unpartitioned();
     }
   }
+
+  @Override
+  public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table 
table, boolean overwrite)
+      throws MetaException {
+    String tableName = TableIdentifier.of(table.getDbName(), 
table.getTableName()).toString();
+
+    // check status to determine whether we need to commit or to abort
+    JobConf jobConf = new JobConf(conf);
+    String queryIdKey = jobConf.get("hive.query.id") + "." + tableName + 
".result";
+    boolean success = jobConf.getBoolean(queryIdKey, false);
+
+    // construct the job context
+    JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID + 
"." + tableName));
+    int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + 
tableName, -1);
+    jobConf.setNumReduceTasks(numTasks);
+    JobContext jobContext = new JobContextImpl(jobConf, jobID, null);
+
+    // we should only commit this current table because
+    // for multi-table inserts, this hook method will be called sequentially 
for each target table
+    jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+
+    OutputCommitter committer = new HiveIcebergOutputCommitter();
+    try {
+      if (success) {
+        try {
+          committer.commitJob(jobContext);
+        } catch (Exception commitExc) {
+          LOG.error("Error while trying to commit job (table: {}, jobID: {}). 
Will abort it now.",
+              tableName, jobID, commitExc);
+          abortJob(jobContext, committer, true);
+          throw new MetaException("Unable to commit job: " + 
commitExc.getMessage());
+        }
+      } else {
+        abortJob(jobContext, committer, false);
+      }
+    } finally {
+      // avoid config pollution with prefixed/suffixed keys
+      cleanCommitConfig(queryIdKey, tableName);
+    }
+  }
+
+  private void abortJob(JobContext jobContext, OutputCommitter committer, 
boolean suppressExc) throws MetaException {
+    try {
+      committer.abortJob(jobContext, JobStatus.State.FAILED);
+    } catch (IOException abortExc) {
+      LOG.error("Error while trying to abort failed job. There might be 
uncleaned data files.", abortExc);
+      if (!suppressExc) {
+        throw new MetaException("Unable to abort job: " + 
abortExc.getMessage());
+      }
+    }
+  }
+
+  private void cleanCommitConfig(String queryIdKey, String tableName) {
+    conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName);
+    conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName);
+    conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
+    conf.unset(queryIdKey);
+  }
+
+  @Override
+  public void preInsertTable(org.apache.hadoop.hive.metastore.api.Table table, 
boolean overwrite)
+      throws MetaException {
+    // do nothing
+  }
+
+  @Override
+  public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table 
table, boolean overwrite)
+      throws MetaException {
+    // do nothing

Review comment:
       Shouldn't we call abortJob here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 585731)
    Time Spent: 1h 20m  (was: 1h 10m)

> Commit Iceberg writes in HiveMetaHook instead of TezAM
> ------------------------------------------------------
>
>                 Key: HIVE-25006
>                 URL: https://issues.apache.org/jira/browse/HIVE-25006
>             Project: Hive
>          Issue Type: Task
>            Reporter: Marton Bod
>            Assignee: Marton Bod
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Trigger the write commits in the HiveIcebergStorageHandler#commitInsertTable. 
> This will enable us to implement insert overwrites for iceberg tables.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to