pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-462391160 Thanks @EAlexRojas for the investigation. I am/was also in the process of discovering/fixing the same problems that you have mentioned and writing the tests for that. So far I have rewritten & fixed the tests provided by @tvielgouarin and encountered the same errors. TLDR; I'm thinking how to solve this situation, but I'm afraid it won't make it to 1.8... Full story: Regarding the 0.11 vs 2.0 dependency, having both of them in the same class path won't work because of the dependency convergence, but it doesn't have to. Committing/aborting 0.11 transactions with 2.0 connector should work fine, so that shouldn't be an issue. Real problem is that `FlinkKafkaProducer` and `FlinkKafkaProducer011` have different names and they defined static classes ` NextTransactionalIdHint`, `KafkaTransactionState` and `KafkaTransactionContext` inside the parent classes. This is causing incompatibility problems since for example `FlinkKafkaProducer011.KafkaTransactionState` and `FlinkKafkaProducer.KafkaTransactionState` are treated as completely incompatible classes, despite being identical. It can probably be solved by: 1. custom serialization logic, like keeping a fake/dummy `FlinkKafkaProducer011.XXXSerializer.XXXSerializerSnapshot` classes in the universal connector, as entry points for the deserialization 2. Add a "force skip class compatibility check" flag to the current serialization stack. After all serialized binary data are exactly the same in all of those cases. This is work in progress by @tzulitai, this might happen in time for 1.8 release. 3. Add a more powerful state migration function, that would be able to change type of a field/class. This is also on our road map, but won't happen in 1.8. Either way, unfortunately I'm away for next two weeks and I can not solve this issue before 1.8 feature freeze. This fix will have to wait for 1.9 release. I have implemented working regression tests for state compatibility between Flink versions: https://github.com/apache/flink/pull/7677 Test for migration from 0.11 to universal connector is also easy to implement: https://github.com/pnowojski/flink/tree/kafka-migration-0.11-to-universal-not-working But I didn't have time to make it work (as described above).
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
