ZihanLi58 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r476744238
##########
File path:
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +193,30 @@ protected void registerPath(HiveSpec spec) throws
IOException {
throw new IOException(e);
}
}
- //TODO: We need to find a better to get the latest schema
- private void updateSchema(HiveSpec spec, Table table) throws IOException{
+ private void updateSchema(HiveSpec spec, Table table, HiveTable
existingTable) throws IOException{
if (this.schemaRegistry.isPresent()) {
try (Timer.Context context =
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
- String latestSchema =
this.schemaRegistry.get().getLatestSchema(topicName).toString();
-
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchema);
+ Schema latestSchema = (Schema)
this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
Review comment:
According to kafka team, schema registry allows "out of order
registration" of schemas - think of this as sorting schemas by compatibility
instead of by timestamp. this means chronological latest is NOT what the
registry considers latest. I also include this information in comments to
avoid confusing
In addition, I update the PR to first compare the creation time with
existing schema, if we see difference, we will then fetch latest schema to get
the latest schema creation time. In this way, we can avoid too many calls to
schema registry
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]