[ 
https://issues.apache.org/jira/browse/BEAM-7073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908696#comment-16908696
 ] 

Vishwas commented on BEAM-7073:
-------------------------------

[~ryanskraba] This issue is already fixed as part of below PR

[https://github.com/apache/beam/pull/8376]

This Jira can be closed.

> AvroUtils converting generic record to Beam Row causes class cast exception
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-7073
>                 URL: https://issues.apache.org/jira/browse/BEAM-7073
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.11.0
>         Environment: Direct Runner
>            Reporter: Vishwas
>            Assignee: Ryan Skraba
>            Priority: Major
>
> Below is my pipeline:
> KafkaSource (KafkaIo.read) -----------> Pardo -------> BeamSql------> 
> KafkaSink (KafkaIO.write)
> Kafka Source IO reads from Kafka topic avro records and deserializes it to 
> generic record using below
> KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, 
> GenericRecord>read()
>                 .withBootstrapServers(bootstrapServerUrl)
>                 .withTopic(topicName)
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
>                                                                      
> AvroCoder.of(GenericRecord.class, avroSchema))   
>                 
> .updateConsumerProperties(ImmutableMap.of("schema.registry.url",
>                                                                               
>                                    schemaRegistryUrl));
> Avro schema of the topic has a logicaltype (timestamp-millis). This is 
> deserialized to
> joda-time.
>           {
>             "name": "timeOfRelease",
>             "type": [
>                 "null",
>                 {
>                     "type": "long",
>                     "logicalType": "timestamp-millis",
>                     "connect.version": 1,
>                     "connect.name": "org.apache.kafka.connect.data.Timestamp"
>                 }
>             ],
>             "default": null,
>        }
> Now in my Pardo transform, I am trying to use the AvroUtils class methods to 
> convert the generic record to Beam Row and getting below class cast exception
>              AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be 
> cast to java.lang.Long
>     at 
> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>     at 
> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>  
> This looks like a bug as joda time type created as part of deserialization is 
> being type casted to Long in below code.
>       else if (logicalType instanceof LogicalTypes.TimestampMillis) {
>               return convertDateTimeStrict((Long) value, fieldType);
>       }
> PS: I also used the avro-tools 1.8.2 jar to get the classes for the mentioned 
> avro schema and I see that the attribute with timestamp-millis logical type 
> is being converted to joda-time.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to