This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b50cf1f [GOBBLIN-1580] Check table exists instead of call create
table directly to make sure table exists (#3432)
b50cf1f is described below
commit b50cf1fcaf459717263a432286a569e0393a06ef
Author: Zihan Li <[email protected]>
AuthorDate: Mon Nov 22 16:43:17 2021 -0800
[GOBBLIN-1580] Check table exists instead of call create table directly to
make sure table exists (#3432)
* [hotfix] workaround to catch exception when iceberg does not support get
metrics for non-union type
* address comments
* [GOBBLIN-1580]Check table exists instead of call create table directly to
make sure table exists
---
.../hive/metastore/HiveMetaStoreBasedRegister.java | 78 ++++++++++++----------
1 file changed, 41 insertions(+), 37 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 020eba7..56e8276 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -273,53 +273,56 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
* Or will create the table thru. RPC and return retVal from remote
MetaStore.
*/
private boolean ensureHiveTableExistenceBeforeAlternation(String tableName,
String dbName, IMetaStoreClient client,
- Table table, HiveSpec spec) throws TException, IOException{
+ Table table) throws TException, IOException{
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName)) {
try {
- try (Timer.Context context =
this.metricContext.timer(CREATE_HIVE_TABLE).time()) {
- client.createTable(getTableWithCreateTimeNow(table));
- log.info(String.format("Created Hive table %s in db %s", tableName,
dbName));
- return true;
- } catch (AlreadyExistsException e) {
- log.debug("Table {}.{} already existed", table.getDbName(),
table.getTableName());
+ if(!existsTable(dbName, tableName)) {
+ try (Timer.Context context =
this.metricContext.timer(CREATE_HIVE_TABLE).time()) {
+ client.createTable(getTableWithCreateTimeNow(table));
+ log.info(String.format("Created Hive table %s in db %s",
tableName, dbName));
+ return true;
+ }
}
}catch (TException e) {
log.error(
String.format("Unable to create Hive table %s in db %s: " +
e.getMessage(), tableName, dbName), e);
throw e;
}
-
log.info("Table {} already exists in db {}.", tableName, dbName);
- try {
- HiveTable existingTable;
- try (Timer.Context context =
this.metricContext.timer(GET_HIVE_TABLE).time()) {
- existingTable =
HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
- }
- HiveTable schemaSourceTable = existingTable;
- if (state.contains(SCHEMA_SOURCE_DB)) {
- try (Timer.Context context =
this.metricContext.timer(GET_SCHEMA_SOURCE_HIVE_TABLE).time()) {
- // We assume the schema source table has the same table name as
the origin table, so only the db name can be configured
- schemaSourceTable =
HiveMetaStoreUtils.getHiveTable(client.getTable(state.getProp(SCHEMA_SOURCE_DB,
dbName),
- tableName));
- }
- }
- if(shouldUpdateLatestSchema) {
- updateSchema(spec, table, schemaSourceTable);
+ // When the logic up to here it means table already existed in db.
Return false.
+ return false;
+ }
+ }
+
+ private void alterTableIfNeeded (String tableName, String dbName,
IMetaStoreClient client,
+ Table table, HiveSpec spec) throws TException, IOException {
+ try {
+ HiveTable existingTable;
+ try (Timer.Context context =
this.metricContext.timer(GET_HIVE_TABLE).time()) {
+ existingTable =
HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
+ }
+ HiveTable schemaSourceTable = existingTable;
+ if (state.contains(SCHEMA_SOURCE_DB)) {
+ try (Timer.Context context =
this.metricContext.timer(GET_SCHEMA_SOURCE_HIVE_TABLE).time()) {
+ // We assume the schema source table has the same table name as the
origin table, so only the db name can be configured
+ schemaSourceTable =
HiveMetaStoreUtils.getHiveTable(client.getTable(state.getProp(SCHEMA_SOURCE_DB,
dbName),
+ tableName));
}
- if (needToUpdateTable(existingTable,
HiveMetaStoreUtils.getHiveTable(table))) {
- try (Timer.Context context =
this.metricContext.timer(ALTER_TABLE).time()) {
- client.alter_table(dbName, tableName,
getNewTblByMergingExistingTblProps(table, existingTable));
- }
- log.info(String.format("updated Hive table %s in db %s", tableName,
dbName));
+ }
+ if(shouldUpdateLatestSchema) {
+ updateSchema(spec, table, schemaSourceTable);
+ }
+ if (needToUpdateTable(existingTable,
HiveMetaStoreUtils.getHiveTable(table))) {
+ try (Timer.Context context =
this.metricContext.timer(ALTER_TABLE).time()) {
+ client.alter_table(dbName, tableName,
getNewTblByMergingExistingTblProps(table, existingTable));
}
- } catch (TException e2) {
- log.error(
- String.format("Unable to create or alter Hive table %s in db %s: "
+ e2.getMessage(), tableName, dbName),
- e2);
- throw e2;
+ log.info(String.format("updated Hive table %s in db %s", tableName,
dbName));
}
- // When the logic up to here it means table already existed in db and
alteration happen. Return false.
- return false;
+ } catch (TException e2) {
+ log.error(
+ String.format("Unable to alter Hive table %s in db %s: " +
e2.getMessage(), tableName, dbName),
+ e2);
+ throw e2;
}
}
@@ -475,15 +478,16 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
this.tableAndDbExistenceCache.get(dbName + ":" + tableName, new
Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- return ensureHiveTableExistenceBeforeAlternation(tableName,
dbName, client, table, spec);
+ return ensureHiveTableExistenceBeforeAlternation(tableName,
dbName, client, table);
}
});
} catch (ExecutionException ee) {
throw new IOException("Table existence checking throwing execution
exception.", ee);
}
} else {
- this.ensureHiveTableExistenceBeforeAlternation(tableName, dbName,
client, table, spec);
+ this.ensureHiveTableExistenceBeforeAlternation(tableName, dbName,
client, table);
}
+ alterTableIfNeeded(tableName, dbName, client, table, spec);
}
@Override