This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b1c3cfe4f [GOBBLIN-1731] Enable HiveMetadataWriter to override table
schema lit… (#3587)
b1c3cfe4f is described below
commit b1c3cfe4f8d6c07ff784593e3be035c735d7f6c7
Author: vikram bohra <[email protected]>
AuthorDate: Wed Oct 19 15:30:38 2022 -0700
[GOBBLIN-1731] Enable HiveMetadataWriter to override table schema lit…
(#3587)
* [GOBBLIN-1731] Enable HiveMetadataWriter to override table schema literal
* Changed from whiteblack list to alldenylist
---
.../gobblin/hive/writer/HiveMetadataWriter.java | 33 +++++++++++++++++++---
.../java/org/apache/gobblin/util/AvroUtils.java | 2 +-
2 files changed, 30 insertions(+), 5 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 5a825478b..c85177011 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -82,12 +82,18 @@ public class HiveMetadataWriter implements MetadataWriter {
private static final String HIVE_REGISTRATION_WHITELIST =
"hive.registration.whitelist";
private static final String HIVE_REGISTRATION_BLACKLIST =
"hive.registration.blacklist";
+ private static final String HIVE_USE_LATEST_SCHEMA_ALLOWLIST =
"hive.use.latest.schema.allowlist";
+ private static final String HIVE_USE_LATEST_SCHEMA_DENYLIST =
"hive.use.latest.schema.denylist";
+
private static final String HIVE_REGISTRATION_TIMEOUT_IN_SECONDS =
"hive.registration.timeout.seconds";
private static final long DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS = 60;
private final Joiner tableNameJoiner = Joiner.on('.');
private final Closer closer = Closer.create();
protected final HiveRegister hiveRegister;
private final WhitelistBlacklist whitelistBlacklist;
+ // Always use the latest table Schema for tables in
#useLatestTableSchemaWhiteListBlackList
+ // unless a newer writer schema arrives
+ private final WhitelistBlacklist useLatestTableSchemaAllowDenyList;
@Getter
private final KafkaSchemaRegistry schemaRegistry;
private final HashMap<String, HashMap<List<String>, ListenableFuture<Void>>>
currentExecutionMap;
@@ -104,7 +110,7 @@ public class HiveMetadataWriter implements MetadataWriter {
/* Mapping from tableIdentifier to latest schema observed. */
private final HashMap<String, String> latestSchemaMap;
private final long timeOutSeconds;
- private State state;
+ protected State state;
protected EventSubmitter eventSubmitter;
@@ -121,6 +127,8 @@ public class HiveMetadataWriter implements MetadataWriter {
this.schemaCreationTimeMap = new HashMap<>();
this.specMaps = new HashMap<>();
this.latestSchemaMap = new HashMap<>();
+ this.useLatestTableSchemaAllowDenyList = new
WhitelistBlacklist(state.getProp(HIVE_USE_LATEST_SCHEMA_ALLOWLIST, ""),
+ state.getProp(HIVE_USE_LATEST_SCHEMA_DENYLIST, ""));
this.tableTopicPartitionMap = new HashMap<>();
this.timeOutSeconds =
state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS,
DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
@@ -189,7 +197,8 @@ public class HiveMetadataWriter implements MetadataWriter {
}
//ToDo: after making sure all spec has topic.name set, we should use
topicName as key for schema
- if (!latestSchemaMap.containsKey(tableKey)) {
+ if (useLatestTableSchemaAllowDenyList.acceptTable(dbName, tableName)
+ || !latestSchemaMap.containsKey(tableKey)) {
HiveTable existingTable = this.hiveRegister.getTable(dbName,
tableName).get();
latestSchemaMap.put(tableKey,
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
@@ -318,7 +327,8 @@ public class HiveMetadataWriter implements MetadataWriter {
.build());
if (gmce.getSchemaSource() == SchemaSource.EVENT) {
// Schema source is Event, update schema anyway
- latestSchemaMap.put(tableKey, newSchemaString);
+ String schemaToUpdate = overrideSchemaLiteral(spec, newSchemaString,
newSchemaCreationTime, gmce.getPartitionColumns());
+ latestSchemaMap.put(tableKey, schemaToUpdate);
// Clear the schema versions cache so next time if we see schema
source is schemaRegistry, we will contact schemaRegistry and update
existedSchemaCreationTimes.cleanUp();
} else if (gmce.getSchemaSource() == SchemaSource.SCHEMAREGISTRY &&
newSchemaCreationTime != null
@@ -328,8 +338,9 @@ public class HiveMetadataWriter implements MetadataWriter {
Schema latestSchema = (Schema)
this.schemaRegistry.getLatestSchemaByTopic(topicName);
String latestCreationTime =
AvroUtils.getSchemaCreationTime(latestSchema);
if (latestCreationTime.equals(newSchemaCreationTime)) {
+ String schemaToUpdate = overrideSchemaLiteral(spec,
newSchemaString, newSchemaCreationTime, gmce.getPartitionColumns());
//new schema is the latest schema, we update our record
- latestSchemaMap.put(tableKey, newSchemaString);
+ latestSchemaMap.put(tableKey, schemaToUpdate);
}
existedSchemaCreationTimes.put(newSchemaCreationTime, "");
}
@@ -360,6 +371,20 @@ public class HiveMetadataWriter implements MetadataWriter {
}
}
+ /**
+ * Method that overrides schema literal in implementation class
+ * @param spec HiveSpec
+ * @param latestSchema returns passed schema as is
+ * @param schemaCreationTime updates schema with creation time
+ * @param partitionNames
+ * @return schema literal
+ */
+ protected String overrideSchemaLiteral(HiveSpec spec, String latestSchema,
String schemaCreationTime,
+ List<String> partitionNames) {
+ return latestSchema;
+ }
+
+
private String fetchSchemaFromTable(String dbName, String tableName) throws
IOException {
String tableKey = tableNameJoiner.join(dbName, tableName);
if (latestSchemaMap.containsKey(tableKey)) {
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 838318828..8d4985f6f 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -96,7 +96,7 @@ public class AvroUtils {
public static final String AVRO_SUFFIX = ".avro";
- private static final String SCHEMA_CREATION_TIME_KEY = "CreatedOn";
+ public static final String SCHEMA_CREATION_TIME_KEY = "CreatedOn";
/**
* Validates that the provided reader schema can be used to decode avro data
written with the