[
https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lorenzo Nicora updated FLINK-17486:
-----------------------------------
Description:
When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}}
(logical type) field, copying the record fails with:
{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to
class java.nio.ByteBuffer}}
I understand the problem arises when Flink tries to make a deep-copy of the
record for checkpointing.
This code reproduces the problem
([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]):
{code:java}
AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);
Sample s1 = Sample.newBuilder()
.setPrice(BigDecimal.valueOf(42.32))
.setId("A12345")
.build();
Sample s2 = serializer.copy(s1);
{code}
The AVRO SpecificRecord is generated from this IDL (using the
maven-avro-plugin):
{code:java}
@namespace("example.avro")
protocol SampleProtocol {
record Sample{
string id;
decimal(9,2) price;
timestamp_ms eventTime;
}
}{code}
In particular, I had the problem after attaching an
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord
and using Confluent Schema Registry. The assigned extracts the event time from
the record and enabling bookmarking (not sure whether this is related).
A simplified version of the application is here:
[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO
1.8.2.
In fact, the following code does deep-copy only relying on AVRO and does work:
{code:java}
Sample s1 = Sample.newBuilder()
.setPrice(BigDecimal.valueOf(42.32))
.setId("A12345")
.build();
Sample s2 = Sample.newBuilder(s1).build();{code}
Code of the two tests and simplified application:
[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
was:
When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}}
(logical type) field, copying the record fails with:
{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to
class java.nio.ByteBuffer}}
I understand the problem arises when Flink tries to make a deep-copy of the
record for checkpointing.
This code reproduces the problem:
{{AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);}}
{{Sample s1 = Sample.newBuilder()}}
{{ .setPrice(BigDecimal.valueOf(42.32))}}
{{ .setId("A12345")}}
{{ .build();}}
{{Sample s2 = serializer.copy(s1);}}
The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL:
{{@namespace("example.avro")}}
{{protocol SampleProtocol {}}
{{ record Sample{}}
{{ string id;}}
{{ decimal(9,2) price;}}
{{ timestamp_ms eventTime;}}
{{ }}}
{{}}}
In particular, I had the problem after attaching an
AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord
and using Confluent Schema Registry. The assigned extracts the event time from
the record and enabling bookmarking (not sure whether this is related).
A simplified version of the application is
[here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].
The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO
1.8.2.
In fact, the following code does deep-copy only relying on AVRO and does work:
{{Sample s1 = Sample.newBuilder()}}
{{ .setPrice(BigDecimal.valueOf(42.32))}}
{{ .setId("A12345")}}
{{ .build();}}
{{Sample s2 = Sample.newBuilder(s1).build();}}
The code of the two tests and simplified application are
[here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]]
> ClassCastException when copying AVRO SpecificRecord containing a decimal field
> ------------------------------------------------------------------------------
>
> Key: FLINK-17486
> URL: https://issues.apache.org/jira/browse/FLINK-17486
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.10.0
> Environment: Flink 1.10.0
> AVRO 1.9.2
> Java 1.8.0 (but also Java 14)
> Scala binary 2.11
> Reporter: Lorenzo Nicora
> Priority: Critical
>
> When consuming from a Kafka source AVRO SpecificRecord containing a
> {{decimal}} (logical type) field, copying the record fails with:
> {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to
> class java.nio.ByteBuffer}}
> I understand the problem arises when Flink tries to make a deep-copy of the
> record for checkpointing.
> This code reproduces the problem
> ([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]):
>
> {code:java}
> AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);
> Sample s1 = Sample.newBuilder()
> .setPrice(BigDecimal.valueOf(42.32))
> .setId("A12345")
> .build();
> Sample s2 = serializer.copy(s1);
> {code}
>
>
> The AVRO SpecificRecord is generated from this IDL (using the
> maven-avro-plugin):
> {code:java}
> @namespace("example.avro")
> protocol SampleProtocol {
> record Sample{
> string id;
> decimal(9,2) price;
> timestamp_ms eventTime;
> }
> }{code}
> In particular, I had the problem after attaching an
> AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord
> and using Confluent Schema Registry. The assigned extracts the event time
> from the record and enabling bookmarking (not sure whether this is related).
> A simplified version of the application is here:
> [https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>
> The problem looks similar to AVRO-1895 but that issue has been fixed since
> AVRO 1.8.2.
> In fact, the following code does deep-copy only relying on AVRO and does
> work:
> {code:java}
> Sample s1 = Sample.newBuilder()
> .setPrice(BigDecimal.valueOf(42.32))
> .setId("A12345")
> .build();
> Sample s2 = Sample.newBuilder(s1).build();{code}
>
> Code of the two tests and simplified application:
> [https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)