Title: Message Title
| |
|
|
| Change By: |
Pedro Mázala |
| Environment: |
*Build.gradle*
{code:groovy} ext { flinkVersion = "1.15.0" }
dependencies { implementation "org.apache.flink:flink-core:${flinkVersion}" implementation "org.apache.flink:flink-clients:${flinkVersion}" implementation "org.apache.flink:flink-avro:${flinkVersion}"
implementation "org.apache.flink:flink-connector-kafka:${flinkVersion}"
implementation "org.apache.flink:flink-connector-elasticsearch7:${flinkVersion}" } {code}
*Deserializer*
{code:java} class ApicurioKafkaDeserializationSchema implements KafkaDeserializationSchema<MyEvent> { private Deserializer deserializer;
@Override public void open(DeserializationSchema.InitializationContext context) throws Exception { KafkaDeserializationSchema.super.open(context);
deserializer = new MyDeserializer(); }
@Override public boolean isEndOfStream(MyEvent nextElement) { return false; }
@Override public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) { String key = new String(record.key()); String value = deserializer.deserialize(record.value());
return MyEvent.of(key, value); }
@Override public TypeInformation<MyEvent> getProducedType() { return Types.GENERIC(MyEvent.class); } } {code}
*Stacktrace*
{code:java} java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = customer.cdc.snapshot.aggregate, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1656327039514, serialized key size = 39, serialized value size = 268, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@758cd630, value = [B@1cfd2da5). at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 common frames omitted Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:115) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:81) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ... 15 common frames omitted Caused by: java.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @1e92bd61 at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:69) at org.apache.flink.api.java.typeutils.runtime.kryo.FlinkChillPackageRegistrar.registerSerializers(FlinkChillPackageRegistrar.java:67) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:512) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:521) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:347) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:132) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ... 18 common frames omitted Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @1e92bd61 at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178) at java.base/java.lang.reflect.Field.setAccessible(Field.java:172) at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:67) ... 35 common frames omitted {code} |
|
It is not possible to deserialize Kafka messages using `KafkaDeserializationSchema`. An exception is thrown (`java.lang.reflect.InaccessibleObjectException`) when forwarding (`out.collect(deserialized)`) the deserialized object.
It happens on `org.apache.flink.runtime.io.network.api.writer.RecordWriter#serializeRecord` at `record.write(serializer)`. It seems it is not being possible to perform the write of `DataOutputSerializer` because the exception happens on this object and not on my domain object for deserialization.
--------
*Build.gradle*
{code:groovy} ext { flinkVersion = "1.15.0" }
dependencies { implementation "org.apache.flink:flink-core:${flinkVersion}" implementation "org.apache.flink:flink-clients:${flinkVersion}" implementation "org.apache.flink:flink-avro:${flinkVersion}"
implementation "org.apache.flink:flink-connector-kafka:${flinkVersion}"
implementation "org.apache.flink:flink-connector-elasticsearch7:${flinkVersion}" } {code}
*Deserializer*
{code:java} class ApicurioKafkaDeserializationSchema implements KafkaDeserializationSchema<MyEvent> { private Deserializer deserializer;
@Override public void open(DeserializationSchema.InitializationContext context) throws Exception { KafkaDeserializationSchema.super.open(context);
deserializer = new MyDeserializer(); }
@Override public boolean isEndOfStream(MyEvent nextElement) { return false; }
@Override public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) { String key = new String(record.key()); String value = deserializer.deserialize(record.value());
return MyEvent.of(key, value); }
@Override public TypeInformation<MyEvent> getProducedType() { return Types.GENERIC(MyEvent.class); } } {code}
*Stacktrace*
{code:java} java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = customer.cdc.snapshot.aggregate, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1656327039514, serialized key size = 39, serialized value size = 268, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@758cd630, value = [B@1cfd2da5). at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 common frames omitted Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:115) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:81) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ... 15 common frames omitted Caused by: java.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @1e92bd61 at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:69) at org.apache.flink.api.java.typeutils.runtime.kryo.FlinkChillPackageRegistrar.registerSerializers(FlinkChillPackageRegistrar.java:67) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:512) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:521) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:347) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:132) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106) at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ... 18 common frames omitted Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @1e92bd61 at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178) at java.base/java.lang.reflect.Field.setAccessible(Field.java:172) at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:67) ... 35 common frames omitted {code} |
|
|
|
| |
|
|
- [jira] [Updated] (FLINK-28295) Error deserializing kafka records Jira
-