[ 
https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenzo Nicora updated FLINK-17486:
-----------------------------------
    Description: 
When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} 
(logical type) field, copying the record fails with:

{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class java.nio.ByteBuffer}}

I understand the problem arises when Flink tries to make a deep-copy of the 
record for checkpointing.

This code reproduces the problem 
([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]):

 
{code:java}
AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);
Sample s1 = Sample.newBuilder()
   .setPrice(BigDecimal.valueOf(42.32))
   .setId("A12345")
   .build();
Sample s2 = serializer.copy(s1);
{code}
 

 

The AVRO SpecificRecord is generated from this IDL (using the 
maven-avro-plugin):
{code:java}
@namespace("example.avro")
 protocol SampleProtocol {
   record Sample{
     string id;
     decimal(9,2) price;
     timestamp_ms eventTime;
    }
 }{code}
In particular, I had the problem after attaching an 
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
and using Confluent Schema Registry. The assigned extracts the event time from 
the record and enabling bookmarking (not sure whether this is related).
 A simplified version of the application is here: 
[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]

 

The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 
1.8.2.

In fact, the following code does deep-copy only relying on AVRO and does work:  
{code:java}
Sample s1 = Sample.newBuilder()
   .setPrice(BigDecimal.valueOf(42.32))
   .setId("A12345")
   .build();
 Sample s2 = Sample.newBuilder(s1).build();{code}
 

Code of the two tests and simplified application: 
[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]

 

  was:
When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} 
(logical type) field, copying the record fails with:

{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
class java.nio.ByteBuffer}}

I understand the problem arises when Flink tries to make a deep-copy of the 
record for checkpointing.

This code reproduces the problem:

{{AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);}}

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}

{{Sample s2 = serializer.copy(s1);}}

 

The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL:

{{@namespace("example.avro")}}
 {{protocol SampleProtocol {}}
 {{  record Sample{}}
 {{    string id;}}
 {{    decimal(9,2) price;}}
 {{    timestamp_ms eventTime;}}
 {{   }}}
 {{}}}

In particular, I had the problem after attaching an 
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
and using Confluent Schema Registry. The assigned extracts the event time from 
the record and enabling bookmarking (not sure whether this is related).
A simplified version of the application is 
[here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 
1.8.2.

In fact, the following code does deep-copy only relying on AVRO and does work:  

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}
 {{Sample s2 = Sample.newBuilder(s1).build();}}

 

The code of the two tests and simplified application are  
[here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]

 


> ClassCastException when copying AVRO SpecificRecord containing a decimal field
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-17486
>                 URL: https://issues.apache.org/jira/browse/FLINK-17486
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.10.0
>         Environment: Flink 1.10.0
> AVRO 1.9.2
> Java 1.8.0 (but also Java 14)
> Scala binary 2.11
>            Reporter: Lorenzo Nicora
>            Priority: Critical
>
> When consuming from a Kafka source AVRO SpecificRecord containing a 
> {{decimal}} (logical type) field, copying the record fails with:
> {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to 
> class java.nio.ByteBuffer}}
> I understand the problem arises when Flink tries to make a deep-copy of the 
> record for checkpointing.
> This code reproduces the problem 
> ([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]):
>  
> {code:java}
> AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);
> Sample s1 = Sample.newBuilder()
>    .setPrice(BigDecimal.valueOf(42.32))
>    .setId("A12345")
>    .build();
> Sample s2 = serializer.copy(s1);
> {code}
>  
>  
> The AVRO SpecificRecord is generated from this IDL (using the 
> maven-avro-plugin):
> {code:java}
> @namespace("example.avro")
>  protocol SampleProtocol {
>    record Sample{
>      string id;
>      decimal(9,2) price;
>      timestamp_ms eventTime;
>     }
>  }{code}
> In particular, I had the problem after attaching an 
> AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord 
> and using Confluent Schema Registry. The assigned extracts the event time 
> from the record and enabling bookmarking (not sure whether this is related).
>  A simplified version of the application is here: 
> [https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>  
> The problem looks similar to AVRO-1895 but that issue has been fixed since 
> AVRO 1.8.2.
> In fact, the following code does deep-copy only relying on AVRO and does 
> work:  
> {code:java}
> Sample s1 = Sample.newBuilder()
>    .setPrice(BigDecimal.valueOf(42.32))
>    .setId("A12345")
>    .build();
>  Sample s2 = Sample.newBuilder(s1).build();{code}
>  
> Code of the two tests and simplified application: 
> [https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to