[
https://issues.apache.org/jira/browse/GOBBLIN-1248?focusedWorklogId=474496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-474496
]
ASF GitHub Bot logged work on GOBBLIN-1248:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Aug/20 20:09
Start Date: 25/Aug/20 20:09
Worklog Time Spent: 10m
Work Description: sv2000 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r476698036
##########
File path:
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
##########
@@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.util.ConfigUtils;
Review comment:
Seems like this change is unintentional.
##########
File path:
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +193,30 @@ protected void registerPath(HiveSpec spec) throws
IOException {
throw new IOException(e);
}
}
- //TODO: We need to find a better to get the latest schema
- private void updateSchema(HiveSpec spec, Table table) throws IOException{
+ private void updateSchema(HiveSpec spec, Table table, HiveTable
existingTable) throws IOException{
if (this.schemaRegistry.isPresent()) {
try (Timer.Context context =
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
- String latestSchema =
this.schemaRegistry.get().getLatestSchema(topicName).toString();
-
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchema);
+ Schema latestSchema = (Schema)
this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+ String latestSchemaCreationTime =
AvroUtils.getSchemaCreationTime(latestSchema);
+ // If no schema set for the table spec, we fall back to schema fetched
from schema registry
+ Schema writerSchema = new Schema.Parser().parse((
+
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchema.toString())));
+ String writerSchemaCreationTime =
AvroUtils.getSchemaCreationTime(writerSchema);
+ if(writerSchemaCreationTime == null || latestSchemaCreationTime ==
null || latestSchemaCreationTime.equals(writerSchemaCreationTime)) {
+ spec.getTable()
+ .getSerDeProps()
+
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
writerSchema);
+ } else {
+ // If creation time of writer schema does not equal to the latest
schema, we don't update the schema
+ spec.getTable()
+ .getSerDeProps()
+
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+ }
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
- } catch (SchemaRegistryException | IOException e) {
- log.error(String.format("Error when fetch latest schema for topic %s",
topicName), e);
+ } catch ( IOException e) {
+ log.error(String.format("Error when update latest schema for topic
%s", topicName), e);
Review comment:
update -> updating
##########
File path:
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +193,30 @@ protected void registerPath(HiveSpec spec) throws
IOException {
throw new IOException(e);
}
}
- //TODO: We need to find a better to get the latest schema
- private void updateSchema(HiveSpec spec, Table table) throws IOException{
+ private void updateSchema(HiveSpec spec, Table table, HiveTable
existingTable) throws IOException{
Review comment:
It would be good to add javadoc explaining the update behavior.
##########
File path:
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +193,30 @@ protected void registerPath(HiveSpec spec) throws
IOException {
throw new IOException(e);
}
}
- //TODO: We need to find a better to get the latest schema
- private void updateSchema(HiveSpec spec, Table table) throws IOException{
+ private void updateSchema(HiveSpec spec, Table table, HiveTable
existingTable) throws IOException{
if (this.schemaRegistry.isPresent()) {
try (Timer.Context context =
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
- String latestSchema =
this.schemaRegistry.get().getLatestSchema(topicName).toString();
-
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchema);
+ Schema latestSchema = (Schema)
this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
Review comment:
It's confusing why we are fetching latest schema from schema registry.
Can we not get the schema creation time from the hive table and update schema
if the writer schema creation time > table schema creation time?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 474496)
Time Spent: 0.5h (was: 20m)
> Fix discrepancy between table schema and file schema
> ----------------------------------------------------
>
> Key: GOBBLIN-1248
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1248
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Previously in streaming pipeline, to avoid race condition on metadata schema,
> when we do hive registration, we always fetch the latest schema from Kafka
> SchemaRegistry, since gobblin converter may change the schema, this introduce
> discrepancy between hive table schema and real file schema, so we need a
> better way to solve this problem
--
This message was sent by Atlassian Jira
(v8.3.4#803005)