[
https://issues.apache.org/jira/browse/GOBBLIN-1248?focusedWorklogId=475397&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-475397
]
ASF GitHub Bot logged work on GOBBLIN-1248:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Aug/20 16:24
Start Date: 27/Aug/20 16:24
Worklog Time Spent: 10m
Work Description: sv2000 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r478543139
##########
File path:
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,49 @@ protected void registerPath(HiveSpec spec) throws
IOException {
throw new IOException(e);
}
}
- //TODO: We need to find a better to get the latest schema
- private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+ /**
+ * This method is used to update the table schema to the latest schema
+ * It will fetch creation time of the latest schema from schema registry and
compare that
+ * with the creation time of writer's schema. If they are the same, then we
will update the
+ * table schema to the writer's schema, else we will keep the table schema
the same as schema of
+ * existing table.
+ * Note: If there is no schema specified in the table spec, we will directly
update the schema to
+ * the existing table schema
+ * Note: We treat the creation time as version number of schema, since
according to Kafka team,
+ * schema registry allows "out of order registration" of schemas, this means
chronological latest is
+ * NOT what the registry considers latest.
+ * @param spec
+ * @param table
+ * @param existingTable
+ * @throws IOException
+ */
+ private void updateSchema(HiveSpec spec, Table table, HiveTable
existingTable) throws IOException{
if (this.schemaRegistry.isPresent()) {
try (Timer.Context context =
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
- String latestSchema =
this.schemaRegistry.get().getLatestSchema(topicName).toString();
-
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchema);
-
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
- } catch (SchemaRegistryException | IOException e) {
- log.error(String.format("Error when fetch latest schema for topic %s",
topicName), e);
+ Schema existingTableSchema = new
Schema.Parser().parse(existingTable.getSerDeProps().getProp(
+ AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+ String existingSchemaCreationTime =
AvroUtils.getSchemaCreationTime(existingTableSchema);
+ // If no schema set for the table spec, we fall back to existing schema
+ Schema writerSchema = new Schema.Parser().parse((
+
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
existingTableSchema.toString())));
+ String writerSchemaCreationTime =
AvroUtils.getSchemaCreationTime(writerSchema);
+ if(existingSchemaCreationTime != null &&
!existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
+ // If creation time of writer schema does not equal to the existing
schema, we compare with schema fetched from
+ // schema registry to determine whether to update the schema
+ Schema latestSchema = (Schema)
this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+ String latestSchemaCreationTime =
AvroUtils.getSchemaCreationTime(latestSchema);
+ if (latestSchemaCreationTime != null &&
latestSchemaCreationTime.equals(existingSchemaCreationTime)) {
Review comment:
Shouldn't there be an else block that updates the table schema if
latestSchemaCreationTime does not equal existingSchemaCreationTime?
##########
File path:
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,49 @@ protected void registerPath(HiveSpec spec) throws
IOException {
throw new IOException(e);
}
}
- //TODO: We need to find a better to get the latest schema
- private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+ /**
+ * This method is used to update the table schema to the latest schema
+ * It will fetch creation time of the latest schema from schema registry and
compare that
+ * with the creation time of writer's schema. If they are the same, then we
will update the
+ * table schema to the writer's schema, else we will keep the table schema
the same as schema of
+ * existing table.
+ * Note: If there is no schema specified in the table spec, we will directly
update the schema to
+ * the existing table schema
+ * Note: We treat the creation time as version number of schema, since
according to Kafka team,
Review comment:
Is this comment accurate? Do you mean: We cannot use creation time as
the schema version number?
Also: drop "according to Kafka team" from the comment.
##########
File path:
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,49 @@ protected void registerPath(HiveSpec spec) throws
IOException {
throw new IOException(e);
}
}
- //TODO: We need to find a better to get the latest schema
- private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+ /**
+ * This method is used to update the table schema to the latest schema
+ * It will fetch creation time of the latest schema from schema registry and
compare that
+ * with the creation time of writer's schema. If they are the same, then we
will update the
+ * table schema to the writer's schema, else we will keep the table schema
the same as schema of
+ * existing table.
+ * Note: If there is no schema specified in the table spec, we will directly
update the schema to
+ * the existing table schema
+ * Note: We treat the creation time as version number of schema, since
according to Kafka team,
+ * schema registry allows "out of order registration" of schemas, this means
chronological latest is
+ * NOT what the registry considers latest.
+ * @param spec
+ * @param table
+ * @param existingTable
+ * @throws IOException
+ */
+ private void updateSchema(HiveSpec spec, Table table, HiveTable
existingTable) throws IOException{
Review comment:
Is it possible to add unit tests for this method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 475397)
Time Spent: 1h 50m (was: 1h 40m)
> Fix discrepancy between table schema and file schema
> ----------------------------------------------------
>
> Key: GOBBLIN-1248
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1248
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Previously in streaming pipeline, to avoid race condition on metadata schema,
> when we do hive registration, we always fetch the latest schema from Kafka
> SchemaRegistry, since gobblin converter may change the schema, this introduce
> discrepancy between hive table schema and real file schema, so we need a
> better way to solve this problem
--
This message was sent by Atlassian Jira
(v8.3.4#803005)