This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fd3526  [GOBBLIN-1344] Enable configurable schema source db and table 
when registering hive schema
7fd3526 is described below

commit 7fd352652e9bbcbde619087d177ed127b0ef6aee
Author: Zihan Li <[email protected]>
AuthorDate: Tue Jan 5 15:12:24 2021 -0800

    [GOBBLIN-1344] Enable configurable schema source db and table when 
registering hive schema
    
    [GOBBLIN-1344] Enable configurable schema source
    db and table when registering hive schema
    
    delete useless code
    
    delete useless code
    
    address comments
    
    Closes #3181 from ZihanLi58/GOBBLIN-1344_new
---
 .../hive/metastore/HiveMetaStoreBasedRegister.java | 23 ++++++++++++++++++++--
 1 file changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index a2b6228..5caf494 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -93,6 +93,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
 
   public static final String HIVE_REGISTER_METRICS_PREFIX = "hiveRegister.";
   public static final String ADD_PARTITION_TIMER = 
HIVE_REGISTER_METRICS_PREFIX + "addPartitionTimerTimer";
+  public static final String SCHEMA_SOURCE_DB = HIVE_REGISTER_METRICS_PREFIX + 
"schema.source.dbName";
   public static final String GET_HIVE_PARTITION = HIVE_REGISTER_METRICS_PREFIX 
+ "getPartitionTimer";
   public static final String ALTER_PARTITION = HIVE_REGISTER_METRICS_PREFIX + 
"alterPartitionTimer";
   public static final String TABLE_EXISTS = HIVE_REGISTER_METRICS_PREFIX + 
"tableExistsTimer";
@@ -101,6 +102,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   public static final String CREATE_HIVE_DATABASE = 
HIVE_REGISTER_METRICS_PREFIX + "createDatabaseTimer";
   public static final String CREATE_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX 
+ "createTableTimer";
   public static final String GET_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX + 
"getTableTimer";
+  public static final String GET_SCHEMA_SOURCE_HIVE_TABLE = 
HIVE_REGISTER_METRICS_PREFIX + "getSchemaSourceTableTimer";
   public static final String GET_AND_SET_LATEST_SCHEMA = 
HIVE_REGISTER_METRICS_PREFIX + "getAndSetLatestSchemaTimer";
   public static final String DROP_TABLE = HIVE_REGISTER_METRICS_PREFIX + 
"dropTableTimer";
   public static final String PATH_REGISTER_TIMER = 
HIVE_REGISTER_METRICS_PREFIX + "pathRegisterTimer";
@@ -144,6 +146,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
 
 
   private final boolean optimizedChecks;
+  private final State state;
   //If this is true, after we know the partition is existing, we will skip the 
partition in stead of getting the existing
   // partition and computing the diff to see if it needs to be updated. Use 
this only when you can make sure the metadata
   //for a partition is immutable
@@ -154,6 +157,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   private String topicName = "";
   public HiveMetaStoreBasedRegister(State state, Optional<String> 
metastoreURI) throws IOException {
     super(state);
+    this.state = state;
     this.locks = new HiveLock(state.getProperties());
 
     this.optimizedChecks = state.getPropAsBoolean(OPTIMIZED_CHECK_ENABLED, 
true);
@@ -229,8 +233,15 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
             AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
         String existingSchemaCreationTime = 
AvroUtils.getSchemaCreationTime(existingTableSchema);
         // If no schema set for the table spec, we fall back to existing schema
+        if 
(spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName())
 == null) {
+          spec.getTable()
+              .getSerDeProps()
+              
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), 
existingTableSchema);
+          
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+          return;
+        }
         Schema writerSchema = new Schema.Parser().parse((
-            
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
 existingTableSchema.toString())));
+            
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName())));
         String writerSchemaCreationTime = 
AvroUtils.getSchemaCreationTime(writerSchema);
         if(existingSchemaCreationTime != null && 
!existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
           // If creation time of writer schema does not equal to the existing 
schema, we compare with schema fetched from
@@ -280,8 +291,16 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
         try (Timer.Context context = 
this.metricContext.timer(GET_HIVE_TABLE).time()) {
           existingTable = 
HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
         }
+        HiveTable schemaSourceTable = existingTable;
+        if (state.contains(SCHEMA_SOURCE_DB)) {
+          try (Timer.Context context = 
this.metricContext.timer(GET_SCHEMA_SOURCE_HIVE_TABLE).time()) {
+            // We assume the schema source table has the same table name as 
the origin table, so only the db name can be configured
+            schemaSourceTable = 
HiveMetaStoreUtils.getHiveTable(client.getTable(state.getProp(SCHEMA_SOURCE_DB, 
dbName),
+                tableName));
+          }
+        }
         if(shouldUpdateLatestSchema) {
-          updateSchema(spec, table, existingTable);
+          updateSchema(spec, table, schemaSourceTable);
         }
         if (needToUpdateTable(existingTable, 
HiveMetaStoreUtils.getHiveTable(table))) {
           try (Timer.Context context = 
this.metricContext.timer(ALTER_TABLE).time()) {

Reply via email to