[
https://issues.apache.org/jira/browse/BEAM-7802?focusedWorklogId=288830&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288830
]
ASF GitHub Bot logged work on BEAM-7802:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Aug/19 09:49
Start Date: 05/Aug/19 09:49
Worklog Time Spent: 10m
Work Description: kanterov commented on pull request #9130: [BEAM-7802]
Expose a method to make an Avro-based PCollection into an Schema-based one
URL: https://github.com/apache/beam/pull/9130#discussion_r310517412
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -309,6 +310,18 @@ public static GenericRecord toGenericRecord(
return g -> toGenericRecord(g, avroSchema);
}
+ /** Transform an existing Avro-based PCollection input into an Schema-based
PCollection. */
+ public static <T> PCollection<T> asSchemaPCollection(
+ PCollection<T> pc, Class<T> clazz, @Nullable org.apache.avro.Schema
schema) {
+ if (!pc.hasSchema()) {
+ Schema beamSchema = getSchema(clazz, schema);
+ if (beamSchema != null) {
+ pc.setSchema(beamSchema, getToRowFunction(clazz, schema),
getFromRowFunction(clazz));
Review comment:
`pc.setSchema` is equivalent to `pc.setCoder(SchemaCoder.of(...))`
I was thinking, what if we just create a static method to create
`SchemaCoder` instead, then user-facing API would be like:
```
KafkaIO.read(MyRecord.class)
.setCoder(AvroUtils.schemaCoder(MyRecord.class))
```
or, it can be done implicitly by registering classes in advance:
```
p.getSchemaRegistry().registerSchemaProvider(MyRecord.class, new
AvroRecordSchema());
```
or for every `SpecificRecord` (didn't try this, but it should work as well):
```
p.getSchemaRegistry().registerSchemaProvider(SpecificRecord.class, new
AvroRecordSchema());
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 288830)
Time Spent: 1h 20m (was: 1h 10m)
> Expose a method to make an Avro-based PCollection into an Schema-based one
> --------------------------------------------------------------------------
>
> Key: BEAM-7802
> URL: https://issues.apache.org/jira/browse/BEAM-7802
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
> Priority: Minor
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Avro can infer the Schema for an Avro based PCollection by using the
> `withBeamSchemas` method, however if the user created a PCollection with Avro
> objects or IndexedRecord/GenericRecord, he needs to manually set the schema
> (or coder). The idea is to expose a method in schema.AvroUtils to ease this.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)