This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b1c3cfe4f [GOBBLIN-1731] Enable HiveMetadataWriter to override table 
schema lit… (#3587)
b1c3cfe4f is described below

commit b1c3cfe4f8d6c07ff784593e3be035c735d7f6c7
Author: vikram bohra <[email protected]>
AuthorDate: Wed Oct 19 15:30:38 2022 -0700

    [GOBBLIN-1731] Enable HiveMetadataWriter to override table schema lit… 
(#3587)
    
    * [GOBBLIN-1731] Enable HiveMetadataWriter to override table schema literal
    
    * Changed from whiteblack list to alldenylist
---
 .../gobblin/hive/writer/HiveMetadataWriter.java    | 33 +++++++++++++++++++---
 .../java/org/apache/gobblin/util/AvroUtils.java    |  2 +-
 2 files changed, 30 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 5a825478b..c85177011 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -82,12 +82,18 @@ public class HiveMetadataWriter implements MetadataWriter {
 
   private static final String HIVE_REGISTRATION_WHITELIST = 
"hive.registration.whitelist";
   private static final String HIVE_REGISTRATION_BLACKLIST = 
"hive.registration.blacklist";
+  private static final String HIVE_USE_LATEST_SCHEMA_ALLOWLIST = 
"hive.use.latest.schema.allowlist";
+  private static final String HIVE_USE_LATEST_SCHEMA_DENYLIST = 
"hive.use.latest.schema.denylist";
+
   private static final String HIVE_REGISTRATION_TIMEOUT_IN_SECONDS = 
"hive.registration.timeout.seconds";
   private static final long DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS = 60;
   private final Joiner tableNameJoiner = Joiner.on('.');
   private final Closer closer = Closer.create();
   protected final HiveRegister hiveRegister;
   private final WhitelistBlacklist whitelistBlacklist;
+  // Always use the latest table Schema for tables in 
#useLatestTableSchemaWhiteListBlackList
+  // unless a newer writer schema arrives
+  private final WhitelistBlacklist useLatestTableSchemaAllowDenyList;
   @Getter
   private final KafkaSchemaRegistry schemaRegistry;
   private final HashMap<String, HashMap<List<String>, ListenableFuture<Void>>> 
currentExecutionMap;
@@ -104,7 +110,7 @@ public class HiveMetadataWriter implements MetadataWriter {
   /* Mapping from tableIdentifier to latest schema observed. */
   private final HashMap<String, String> latestSchemaMap;
   private final long timeOutSeconds;
-  private State state;
+  protected State state;
 
   protected EventSubmitter eventSubmitter;
 
@@ -121,6 +127,8 @@ public class HiveMetadataWriter implements MetadataWriter {
     this.schemaCreationTimeMap = new HashMap<>();
     this.specMaps = new HashMap<>();
     this.latestSchemaMap = new HashMap<>();
+    this.useLatestTableSchemaAllowDenyList = new 
WhitelistBlacklist(state.getProp(HIVE_USE_LATEST_SCHEMA_ALLOWLIST, ""),
+        state.getProp(HIVE_USE_LATEST_SCHEMA_DENYLIST, ""));
     this.tableTopicPartitionMap = new HashMap<>();
     this.timeOutSeconds =
         state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS, 
DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
@@ -189,7 +197,8 @@ public class HiveMetadataWriter implements MetadataWriter {
     }
 
     //ToDo: after making sure all spec has topic.name set, we should use 
topicName as key for schema
-    if (!latestSchemaMap.containsKey(tableKey)) {
+    if (useLatestTableSchemaAllowDenyList.acceptTable(dbName, tableName)
+        || !latestSchemaMap.containsKey(tableKey)) {
       HiveTable existingTable = this.hiveRegister.getTable(dbName, 
tableName).get();
       latestSchemaMap.put(tableKey,
           
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
@@ -318,7 +327,8 @@ public class HiveMetadataWriter implements MetadataWriter {
                 .build());
         if (gmce.getSchemaSource() == SchemaSource.EVENT) {
           // Schema source is Event, update schema anyway
-          latestSchemaMap.put(tableKey, newSchemaString);
+          String schemaToUpdate = overrideSchemaLiteral(spec, newSchemaString, 
newSchemaCreationTime, gmce.getPartitionColumns());
+          latestSchemaMap.put(tableKey, schemaToUpdate);
           // Clear the schema versions cache so next time if we see schema 
source is schemaRegistry, we will contact schemaRegistry and update
           existedSchemaCreationTimes.cleanUp();
         } else if (gmce.getSchemaSource() == SchemaSource.SCHEMAREGISTRY && 
newSchemaCreationTime != null
@@ -328,8 +338,9 @@ public class HiveMetadataWriter implements MetadataWriter {
             Schema latestSchema = (Schema) 
this.schemaRegistry.getLatestSchemaByTopic(topicName);
             String latestCreationTime = 
AvroUtils.getSchemaCreationTime(latestSchema);
             if (latestCreationTime.equals(newSchemaCreationTime)) {
+              String schemaToUpdate = overrideSchemaLiteral(spec, 
newSchemaString, newSchemaCreationTime, gmce.getPartitionColumns());
               //new schema is the latest schema, we update our record
-              latestSchemaMap.put(tableKey, newSchemaString);
+              latestSchemaMap.put(tableKey, schemaToUpdate);
             }
             existedSchemaCreationTimes.put(newSchemaCreationTime, "");
           }
@@ -360,6 +371,20 @@ public class HiveMetadataWriter implements MetadataWriter {
     }
   }
 
+  /**
+   * Method that overrides schema literal in implementation class
+   * @param spec HiveSpec
+   * @param latestSchema returns passed schema as is
+   * @param schemaCreationTime updates schema with creation time
+   * @param partitionNames
+   * @return schema literal
+   */
+  protected String overrideSchemaLiteral(HiveSpec spec, String latestSchema, 
String schemaCreationTime,
+      List<String> partitionNames) {
+    return latestSchema;
+  }
+
+
   private String fetchSchemaFromTable(String dbName, String tableName) throws 
IOException {
     String tableKey = tableNameJoiner.join(dbName, tableName);
     if (latestSchemaMap.containsKey(tableKey)) {
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 838318828..8d4985f6f 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -96,7 +96,7 @@ public class AvroUtils {
 
   public static final String AVRO_SUFFIX = ".avro";
 
-  private static final String SCHEMA_CREATION_TIME_KEY = "CreatedOn";
+  public static final String SCHEMA_CREATION_TIME_KEY = "CreatedOn";
 
   /**
    * Validates that the provided reader schema can be used to decode avro data 
written with the

Reply via email to