[ 
https://issues.apache.org/jira/browse/GOBBLIN-1248?focusedWorklogId=475108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-475108
 ]

ASF GitHub Bot logged work on GOBBLIN-1248:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Aug/20 02:40
            Start Date: 27/Aug/20 02:40
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r477970367



##########
File path: 
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,57 @@ protected void registerPath(HiveSpec spec) throws 
IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and 
compare that
+   * with the creation time of writer's schema. If they are the same, then we 
will update the
+   * table schema to the writer's schema, else we will keep the table schema 
the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly 
update the schema to
+   * the existing table schema
+   * Note: We treat the creation time as version number of schema, since 
according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means 
chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable 
existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = 
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = 
this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
 latestSchema);
+        Schema existingTableSchema = new 
Schema.Parser().parse(existingTable.getSerDeProps().getProp(
+            AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        String existingSchemaCreationTime = 
AvroUtils.getSchemaCreationTime(existingTableSchema);
+        // If no schema set for the table spec, we fall back to existing schema
+        Schema writerSchema = new Schema.Parser().parse((
+            
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
 existingTableSchema.toString())));
+        String writerSchemaCreationTime = 
AvroUtils.getSchemaCreationTime(writerSchema);
+        if(existingSchemaCreationTime == null || 
existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          spec.getTable()
+              .getSerDeProps()
+              
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), 
writerSchema);
+        } else {
+          // If creation time of writer schema does not equal to the existing 
schema, we compare with schema fetched from
+          // schema registry to determine whether to update the schema
+          Schema latestSchema = (Schema) 
this.schemaRegistry.get().getLatestSchemaByTopic(topicName);

Review comment:
       Synced offline and apply new solution




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 475108)
    Time Spent: 1h 40m  (was: 1.5h)

> Fix discrepancy between table schema and file schema
> ----------------------------------------------------
>
>                 Key: GOBBLIN-1248
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1248
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Previously in streaming pipeline, to avoid race condition on metadata schema, 
> when we do hive registration, we always fetch the latest schema from Kafka 
> SchemaRegistry, since gobblin converter may change the schema, this introduce 
> discrepancy between hive table schema and real file schema, so we need a 
> better way to solve this problem



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to