pvary commented on a change in pull request #2161:
URL: https://github.com/apache/hive/pull/2161#discussion_r616603042
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -256,4 +265,74 @@ private static PartitionSpec spec(Schema schema,
Properties properties,
return PartitionSpec.unpartitioned();
}
}
+
+ @Override
+ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table
table, boolean overwrite)
+ throws MetaException {
+ String tableName = TableIdentifier.of(table.getDbName(),
table.getTableName()).toString();
+
+ // check status to determine whether we need to commit or to abort
+ JobConf jobConf = new JobConf(conf);
+ String queryIdKey = jobConf.get("hive.query.id") + "." + tableName +
".result";
+ boolean success = jobConf.getBoolean(queryIdKey, false);
+
+ // construct the job context
+ JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID +
"." + tableName));
+ int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." +
tableName, -1);
+ jobConf.setNumReduceTasks(numTasks);
+ JobContext jobContext = new JobContextImpl(jobConf, jobID, null);
+
+ // we should only commit this current table because
+ // for multi-table inserts, this hook method will be called sequentially
for each target table
+ jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+
+ OutputCommitter committer = new HiveIcebergOutputCommitter();
+ try {
+ if (success) {
+ try {
+ committer.commitJob(jobContext);
+ } catch (Exception commitExc) {
+ LOG.error("Error while trying to commit job (table: {}, jobID: {}).
Will abort it now.",
+ tableName, jobID, commitExc);
+ abortJob(jobContext, committer, true);
+ throw new MetaException("Unable to commit job: " +
commitExc.getMessage());
+ }
+ } else {
+ abortJob(jobContext, committer, false);
+ }
+ } finally {
+ // avoid config pollution with prefixed/suffixed keys
+ cleanCommitConfig(queryIdKey, tableName);
+ }
+ }
+
+ private void abortJob(JobContext jobContext, OutputCommitter committer,
boolean suppressExc) throws MetaException {
+ try {
+ committer.abortJob(jobContext, JobStatus.State.FAILED);
+ } catch (IOException abortExc) {
+ LOG.error("Error while trying to abort failed job. There might be
uncleaned data files.", abortExc);
+ if (!suppressExc) {
+ throw new MetaException("Unable to abort job: " +
abortExc.getMessage());
+ }
+ }
+ }
+
+ private void cleanCommitConfig(String queryIdKey, String tableName) {
+ conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName);
+ conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName);
+ conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
+ conf.unset(queryIdKey);
+ }
+
+ @Override
+ public void preInsertTable(org.apache.hadoop.hive.metastore.api.Table table,
boolean overwrite)
+ throws MetaException {
+ // do nothing
+ }
+
+ @Override
+ public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table
table, boolean overwrite)
+ throws MetaException {
+ // do nothing
Review comment:
Shouldn't we call abortJob here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]