Repository: incubator-gobblin Updated Branches: refs/heads/master fcd57541a -> 27a54f05e
[GOBBLIN-566] Fix bug in HiveMetastoreBasedRegister where it would try to alter a partition when it didn't exist. Closes #2429 from ibuenros/hive-register-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/27a54f05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/27a54f05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/27a54f05 Branch: refs/heads/master Commit: 27a54f05ef2c084972db23d200512e6873e954d1 Parents: fcd5754 Author: ibuenros <[email protected]> Authored: Thu Aug 23 15:32:48 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Aug 23 15:32:48 2018 -0700 ---------------------------------------------------------------------- .../metastore/HiveMetaStoreBasedRegister.java | 48 ++++++++++++-------- 1 file changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/27a54f05/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java index 2d074d9..0ee5445 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java @@ -156,7 +156,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister { Optional<HivePartition> partition = spec.getPartition(); if (partition.isPresent()) { - addOrAlterPartition(client.get(), table, HiveMetaStoreUtils.getPartition(partition.get()), spec); + addOrAlterPartition(client.get(), table, partition.get()); } HiveMetaStoreEventHelper.submitSuccessfulPathRegistration(eventSubmitter, spec); } catch (TException e) { @@ -299,9 +299,8 @@ public class HiveMetaStoreBasedRegister extends HiveRegister { } return false; } catch (NoSuchObjectException e) { - try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) { - client.get().alter_partition(table.getDbName(), table.getTableName(), - getPartitionWithCreateTimeNow(HiveMetaStoreUtils.getPartition(partition))); + try (Timer.Context context = this.metricContext.timer(ADD_PARTITION_TIMER).time()) { + client.get().add_partition(getPartitionWithCreateTimeNow(HiveMetaStoreUtils.getPartition(partition))); } HiveMetaStoreEventHelper.submitSuccessfulPartitionAdd(this.eventSubmitter, table, partition); return true; @@ -432,47 +431,60 @@ public class HiveMetaStoreBasedRegister extends HiveRegister { } } - private void addOrAlterPartition(IMetaStoreClient client, Table table, Partition partition, HiveSpec spec) + @Override + public void addOrAlterPartition(HiveTable table, HivePartition partition) throws IOException { + try (AutoReturnableObject<IMetaStoreClient> client = this.clientPool.getClient()) { + addOrAlterPartition(client.get(), HiveMetaStoreUtils.getTable(table), partition); + } catch (TException te) { + throw new IOException( + String.format("Failed to add/alter partition %s.%s@%s", table.getDbName(), table.getTableName(), partition.getValues()), + te); + } + } + + private void addOrAlterPartition(IMetaStoreClient client, Table table, HivePartition partition) throws TException { - Preconditions.checkArgument(table.getPartitionKeysSize() == partition.getValues().size(), + Partition nativePartition = HiveMetaStoreUtils.getPartition(partition); + + Preconditions.checkArgument(table.getPartitionKeysSize() == nativePartition.getValues().size(), String.format("Partition key size is %s but partition value size is %s", table.getPartitionKeys().size(), - partition.getValues().size())); + nativePartition.getValues().size())); try (AutoCloseableLock lock = - this.locks.getPartitionLock(table.getDbName(), table.getTableName(), partition.getValues())) { + this.locks.getPartitionLock(table.getDbName(), table.getTableName(), nativePartition.getValues())) { try { try (Timer.Context context = this.metricContext.timer(ADD_PARTITION_TIMER).time()) { - client.add_partition(getPartitionWithCreateTimeNow(partition)); + client.add_partition(getPartitionWithCreateTimeNow(nativePartition)); } - log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(partition), - table.getTableName(), partition.getSd().getLocation())); + log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(nativePartition), + table.getTableName(), nativePartition.getSd().getLocation())); } catch (TException e) { try { HivePartition existingPartition; try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) { existingPartition = HiveMetaStoreUtils.getHivePartition( - client.getPartition(table.getDbName(), table.getTableName(), partition.getValues())); + client.getPartition(table.getDbName(), table.getTableName(), nativePartition.getValues())); } - if (needToUpdatePartition(existingPartition, spec.getPartition().get())) { + if (needToUpdatePartition(existingPartition, partition)) { log.info(String.format("Partition update required. ExistingPartition %s, newPartition %s", - stringifyPartition(existingPartition), stringifyPartition(spec.getPartition().get()))); - Partition newPartition = getPartitionWithCreateTime(partition, existingPartition); + stringifyPartition(existingPartition), stringifyPartition(partition))); + Partition newPartition = getPartitionWithCreateTime(nativePartition, existingPartition); log.info(String.format("Altering partition %s", newPartition)); try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) { client.alter_partition(table.getDbName(), table.getTableName(), newPartition); } log.info(String.format("Updated partition %s in table %s with location %s", stringifyPartition(newPartition), - table.getTableName(), partition.getSd().getLocation())); + table.getTableName(), nativePartition.getSd().getLocation())); } else { log.info(String.format("Partition %s in table %s with location %s already exists and no need to update", - stringifyPartition(partition), table.getTableName(), partition.getSd().getLocation())); + stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation())); } } catch (Throwable e2) { log.error(String.format( "Unable to add or alter partition %s in table %s with location %s: " + e2.getMessage(), - stringifyPartitionVerbose(partition), table.getTableName(), partition.getSd().getLocation()), e2); + stringifyPartitionVerbose(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()), e2); throw e2; } }
