marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r645354564
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -459,35 +454,35 @@ public void
rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table
throws MetaException {
String tableName = TableIdentifier.of(table.getDbName(),
table.getTableName()).toString();
JobContext jobContext = getJobContextForCommitOrAbort(tableName,
overwrite);
- OutputCommitter committer = new HiveIcebergOutputCommitter();
- try {
- LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table:
{}", jobContext.getJobID(), tableName);
- committer.abortJob(jobContext, JobStatus.State.FAILED);
- } catch (IOException e) {
- LOG.error("Error while trying to abort failed job. There might be
uncleaned data files.", e);
- // no throwing here because the original commitInsertTable exception
should be propagated
- } finally {
- // avoid config pollution with prefixed/suffixed keys
- cleanCommitConfig(tableName);
+ if (jobContext != null) {
+ OutputCommitter committer = new HiveIcebergOutputCommitter();
+ try {
+ LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table:
{}", jobContext.getJobID(), tableName);
+ committer.abortJob(jobContext, JobStatus.State.FAILED);
+ } catch (IOException e) {
+ LOG.error("Error while trying to abort failed job. There might be
uncleaned data files.", e);
+ // no throwing here because the original commitInsertTable exception
should be propagated
+ }
}
}
- private void cleanCommitConfig(String tableName) {
- conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName);
- conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName);
- conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
- conf.unset(InputFormatConfig.OUTPUT_TABLES);
- }
-
private JobContext getJobContextForCommitOrAbort(String tableName, boolean
overwrite) {
Review comment:
Done
##########
File path:
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -183,10 +183,9 @@ private void createTableForCTAS(Configuration
configuration, Properties serDePro
serDeProperties.get(Catalogs.NAME), tableSchema,
serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
Catalogs.createTable(configuration, serDeProperties);
- // set these in the global conf so that we can rollback the table in the
lifecycle hook in case of failures
- String queryId = configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname);
- configuration.set(String.format(InputFormatConfig.IS_CTAS_QUERY_TEMPLATE,
queryId), "true");
-
configuration.set(String.format(InputFormatConfig.CTAS_TABLE_NAME_TEMPLATE,
queryId),
+ // set these in the query state so that we can rollback the table in the
lifecycle hook in case of failures
+ SessionStateUtil.addResource(configuration,
InputFormatConfig.IS_CTAS_QUERY, "true");
Review comment:
Yes, true, CTAS_TABLE_NAME should be enough
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work)
throws IOException, TezExcep
// get all target tables this vertex wrote to
List<String> tables = new ArrayList<>();
for (Map.Entry<String, String> entry : jobConf) {
Review comment:
Good question. Don't think there's a faster way if we want to keep this
generic and work for all existing and future iceberg properties. I did find a
convenience method though which can do this for us without a loop:
`public Map<String, String> getPropsWithPrefix(String confPrefix)`, but it
also loops through the keys internally.
If we are concerned about performance, we can find out which exact
properties are needed to be propagated, hardcode those and simply pass only
those few without the looping.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work)
throws IOException, TezExcep
// get all target tables this vertex wrote to
List<String> tables = new ArrayList<>();
for (Map.Entry<String, String> entry : jobConf) {
- if (entry.getKey().startsWith("iceberg.mr.serialized.table."))
{
-
tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+ if
(entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+
tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
}
}
- // save information for each target table (jobID, task num,
query state)
+ // find iceberg props in jobConf as they can be needed, but not
available, during job commit
+ Map<String, String> icebergProperties = new HashMap<>();
+ jobConf.forEach(e -> {
Review comment:
You're right, will definitely merge them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]