[
https://issues.apache.org/jira/browse/BEAM-7336?focusedWorklogId=393590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393590
]
ASF GitHub Bot logged work on BEAM-7336:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Feb/20 16:03
Start Date: 26/Feb/20 16:03
Worklog Time Spent: 10m
Work Description: iemejia commented on pull request #10978:
[WIP][BEAM-7336] Add Beam schema inferring for KafkaIO
URL: https://github.com/apache/beam/pull/10978#discussion_r384592505
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -913,16 +985,55 @@ public void setValueDeserializer(String
valueDeserializer) {
return input.getPipeline().apply(transform);
}
- private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
- return (getKeyCoder() != null)
- ? getKeyCoder()
- : getKeyDeserializerProvider().getCoder(coderRegistry);
+ private Coder<K> getKeyCoder(CoderRegistry coderRegistry, SchemaRegistry
schemaRegistry) {
+ final Coder<K> keyCoder =
+ (getKeyCoder() != null)
+ ? getKeyCoder()
+ : getKeyDeserializerProvider().getCoder(coderRegistry);
+ if (isKeyInferBeamSchema()) {
+ return inferSchemaCoder(keyCoder, schemaRegistry);
+ }
+ return keyCoder;
+ }
+
+ private Coder<V> getValueCoder(CoderRegistry coderRegistry, SchemaRegistry
schemaRegistry) {
+ final Coder<V> valueCoder =
+ (getValueCoder() != null)
+ ? getValueCoder()
+ : getValueDeserializerProvider().getCoder(coderRegistry);
+ if (isValueInferBeamSchema()) {
+ return inferSchemaCoder(valueCoder, schemaRegistry);
+ }
+ return valueCoder;
+ }
+
+ @VisibleForTesting
+ static <T> SchemaCoder<T> inferSchemaCoder(Coder<T> coder, SchemaRegistry
schemaRegistry) {
+ // At the moment there is not a general way to infer SchemaCoders from
non-schema Coders (that
Review comment:
@reuvenlax I tried to get the Avro resolution by registering
`AvroRecordSchema` as a a new `SchemaRegistryProvider` but this produced a lot
of issues.
I have the impression we need to do something like what I mention to
register which basic Coders we can support also as SchemaCoders e.g. Avro,
Protobuf and others that contain a `Schema` and the `from/toRowFunction`s
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 393590)
Time Spent: 1h 50m (was: 1h 40m)
> KafkaIO should support inferring schemas when reading Avro
> ----------------------------------------------------------
>
> Key: BEAM-7336
> URL: https://issues.apache.org/jira/browse/BEAM-7336
> Project: Beam
> Issue Type: Sub-task
> Components: io-java-kafka
> Reporter: Reuven Lax
> Assignee: Alexey Romanenko
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> PubSubIO already supports this.
> It would also be nice to be able to look up Avro schemas in the Kafka schema
> registry.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)