Hi All,
        I am reading from Kafka with Json schema and i get the following
error message


I am using flink 1.19 and tried the same on 1.18

objectNode is imported as : import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

KafkaSource<ObjectNode> kafkaSource = KafkaSource.<ObjectNode>builder()
        .setTopics("test-ingest")
        .setBootstrapServers("localhost:9092")
        .setGroupId("g11")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new
JsonDeserializationSchema<>(ObjectNode.class))
        //.setValueOnlyDeserializer(new SimpleStringSchema())
        .build();


I noticed that when i run the example with StringSchema the error
disappears.

Also if I read it as a string schema and later use ObjectMapper to create a
json node in a map function I get the same error as below.

Please advise, what am i doing wrong?


Error: 2024-08-08 15:29:11
java.lang.IllegalAccessError: class
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.
JsonNodeFactoryConstructorAccess tried to access protected method 'void
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory.<init>()'
(org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.
JsonNodeFactoryConstructorAccess is in unnamed module of loader
com.esotericsoftware.reflectasm.AccessClassLoader @2a656533;
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.
JsonNodeFactory is in unnamed module of loader 'app')
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.
JsonNodeFactoryConstructorAccess.newInstance(Unknown Source)
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy$1.newInstance(
Kryo.java:1193)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(
FieldSerializer.java:620)
at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(
FieldSerializer.java:624)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
.copy(UnsafeCacheFields.java:297)
at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(
FieldSerializer.java:634)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer
.java:157)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer
.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
.copy(UnsafeCacheFields.java:297)
at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(
FieldSerializer.java:634)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer
.java:157)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer
.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
.copy(UnsafeCacheFields.java:297)
at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(
FieldSerializer.java:634)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
KryoSerializer.java:298)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput
.pushToOperator(CopyingChainingOutput.java:74)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(
CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(
CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(
StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput
.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(
CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(
CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.
SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(
SourceOperatorStreamTask.java:310)
at org.apache.flink.streaming.api.operators.source.
SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.connector.kafka.source.reader.
KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
at org.apache.flink.api.common.serialization.DeserializationSchema
.deserialize(DeserializationSchema.java:84)
at org.apache.flink.connector.kafka.source.reader.deserializer.
KafkaValueOnlyDeserializationSchemaWrapper.deserialize(
KafkaValueOnlyDeserializationSchemaWrapper.java:51)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
.emitRecord(KafkaRecordEmitter.java:53)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(
SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:579)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:1570)


Regards,
Taher Koitawala

Reply via email to