[ 
https://issues.apache.org/jira/browse/FLINK-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526709#comment-16526709
 ] 

Piotr Nowojski commented on FLINK-9690:
---------------------------------------

Yep, it looks so. Once I was trying to upgrade our connector to Kafka 1.0.0 and 
it required (very minor) changes in FlinkKafkaProducer. I have never committed 
it because there were also some failures in our consumer tests that I didn't 
have time to fix. From the stack trace it looks like there were even further 
changes in 1.1.0, don't know how big and easy to fix.

However what's the actual problem? We have never said that our connector 
supports using Kafka producers > 0.11.2.0.

> Restoring state with FlinkKafkaProducer and Kafka 1.1.0 client fails
> --------------------------------------------------------------------
>
>                 Key: FLINK-9690
>                 URL: https://issues.apache.org/jira/browse/FLINK-9690
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.2
>            Reporter: Ufuk Celebi
>            Priority: Major
>
> Restoring a job from a savepoint that includes {{FlinkKafkaProducer}} 
> packaged with {{kafka.version}} set to {{1.1.0}} in Flink 1.4.2.
> {code}
> java.lang.RuntimeException: Incompatible KafkaProducer version
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:301)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:292)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.resumeTransaction(FlinkKafkaProducer.java:195)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:723)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
>     at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:370)
>     at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:330)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
>     at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>     at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchFieldException: sequenceNumbers
>     at java.lang.Class.getDeclaredField(Class.java:2070)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.getValue(FlinkKafkaProducer.java:297)
>     ... 16 more
> {code}
> [~pnowojski] Any ideas about this issue? Judging from the stack trace it was 
> anticipated that reflective access might break with Kafka versions > 0.11.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to