This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7fd3526 [GOBBLIN-1344] Enable configurable schema source db and table
when registering hive schema
7fd3526 is described below
commit 7fd352652e9bbcbde619087d177ed127b0ef6aee
Author: Zihan Li <[email protected]>
AuthorDate: Tue Jan 5 15:12:24 2021 -0800
[GOBBLIN-1344] Enable configurable schema source db and table when
registering hive schema
[GOBBLIN-1344] Enable configurable schema source
db and table when registering hive schema
delete useless code
delete useless code
address comments
Closes #3181 from ZihanLi58/GOBBLIN-1344_new
---
.../hive/metastore/HiveMetaStoreBasedRegister.java | 23 ++++++++++++++++++++--
1 file changed, 21 insertions(+), 2 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index a2b6228..5caf494 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -93,6 +93,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
public static final String HIVE_REGISTER_METRICS_PREFIX = "hiveRegister.";
public static final String ADD_PARTITION_TIMER =
HIVE_REGISTER_METRICS_PREFIX + "addPartitionTimerTimer";
+ public static final String SCHEMA_SOURCE_DB = HIVE_REGISTER_METRICS_PREFIX +
"schema.source.dbName";
public static final String GET_HIVE_PARTITION = HIVE_REGISTER_METRICS_PREFIX
+ "getPartitionTimer";
public static final String ALTER_PARTITION = HIVE_REGISTER_METRICS_PREFIX +
"alterPartitionTimer";
public static final String TABLE_EXISTS = HIVE_REGISTER_METRICS_PREFIX +
"tableExistsTimer";
@@ -101,6 +102,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
public static final String CREATE_HIVE_DATABASE =
HIVE_REGISTER_METRICS_PREFIX + "createDatabaseTimer";
public static final String CREATE_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX
+ "createTableTimer";
public static final String GET_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX +
"getTableTimer";
+ public static final String GET_SCHEMA_SOURCE_HIVE_TABLE =
HIVE_REGISTER_METRICS_PREFIX + "getSchemaSourceTableTimer";
public static final String GET_AND_SET_LATEST_SCHEMA =
HIVE_REGISTER_METRICS_PREFIX + "getAndSetLatestSchemaTimer";
public static final String DROP_TABLE = HIVE_REGISTER_METRICS_PREFIX +
"dropTableTimer";
public static final String PATH_REGISTER_TIMER =
HIVE_REGISTER_METRICS_PREFIX + "pathRegisterTimer";
@@ -144,6 +146,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
private final boolean optimizedChecks;
+ private final State state;
//If this is true, after we know the partition is existing, we will skip the
partition in stead of getting the existing
// partition and computing the diff to see if it needs to be updated. Use
this only when you can make sure the metadata
//for a partition is immutable
@@ -154,6 +157,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
private String topicName = "";
public HiveMetaStoreBasedRegister(State state, Optional<String>
metastoreURI) throws IOException {
super(state);
+ this.state = state;
this.locks = new HiveLock(state.getProperties());
this.optimizedChecks = state.getPropAsBoolean(OPTIMIZED_CHECK_ENABLED,
true);
@@ -229,8 +233,15 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
String existingSchemaCreationTime =
AvroUtils.getSchemaCreationTime(existingTableSchema);
// If no schema set for the table spec, we fall back to existing schema
+ if
(spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName())
== null) {
+ spec.getTable()
+ .getSerDeProps()
+
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
existingTableSchema);
+
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+ return;
+ }
Schema writerSchema = new Schema.Parser().parse((
-
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
existingTableSchema.toString())));
+
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName())));
String writerSchemaCreationTime =
AvroUtils.getSchemaCreationTime(writerSchema);
if(existingSchemaCreationTime != null &&
!existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
// If creation time of writer schema does not equal to the existing
schema, we compare with schema fetched from
@@ -280,8 +291,16 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
try (Timer.Context context =
this.metricContext.timer(GET_HIVE_TABLE).time()) {
existingTable =
HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
}
+ HiveTable schemaSourceTable = existingTable;
+ if (state.contains(SCHEMA_SOURCE_DB)) {
+ try (Timer.Context context =
this.metricContext.timer(GET_SCHEMA_SOURCE_HIVE_TABLE).time()) {
+ // We assume the schema source table has the same table name as
the origin table, so only the db name can be configured
+ schemaSourceTable =
HiveMetaStoreUtils.getHiveTable(client.getTable(state.getProp(SCHEMA_SOURCE_DB,
dbName),
+ tableName));
+ }
+ }
if(shouldUpdateLatestSchema) {
- updateSchema(spec, table, existingTable);
+ updateSchema(spec, table, schemaSourceTable);
}
if (needToUpdateTable(existingTable,
HiveMetaStoreUtils.getHiveTable(table))) {
try (Timer.Context context =
this.metricContext.timer(ALTER_TABLE).time()) {