pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not 
be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-462391160
 
 
   Thanks @EAlexRojas for the investigation. I am/was also in the process of 
discovering/fixing the same problems that you have mentioned and writing the 
tests for that. So far I have rewritten & fixed the tests provided by 
@tvielgouarin and encountered the same errors.
   
   TLDR;
   I'm thinking how to solve this situation, but I'm afraid it won't make it to 
1.8...
   
   Full story:
   Regarding the 0.11 vs 2.0 dependency, having both of them in the same class 
path won't work because of the dependency convergence, but it doesn't have to. 
Committing/aborting 0.11 transactions with 2.0 connector should work fine, so 
that shouldn't be an issue.
   
   Real problem is that `FlinkKafkaProducer` and `FlinkKafkaProducer011` have 
different names and they defined static classes ` NextTransactionalIdHint`, 
`KafkaTransactionState` and `KafkaTransactionContext` inside the parent 
classes. This is causing incompatibility problems since for example 
`FlinkKafkaProducer011.KafkaTransactionState` and 
`FlinkKafkaProducer.KafkaTransactionState` are treated as completely 
incompatible classes, despite being identical.
   
   It can probably be solved by:
   1. custom serialization logic, like keeping a fake/dummy 
`FlinkKafkaProducer011.XXXSerializer.XXXSerializerSnapshot` classes in the 
universal connector, as entry points for the deserialization
   2. Add a "force skip class compatibility check" flag to the current 
serialization stack. After all serialized binary data are exactly the same in 
all of those cases. This is work in progress by @tzulitai, this might happen in 
time for 1.8 release.
   3. Add a more powerful state migration function, that would be able to 
change type of a field/class. This is also on our road map, but won't happen in 
1.8.
   
   Either way, unfortunately I'm away for next two weeks and I can not solve 
this issue before 1.8 feature freeze. This fix will have to wait for 1.9 
release.
   
   I have implemented working regression tests for state compatibility between 
Flink versions: https://github.com/apache/flink/pull/7677
   Test for migration from 0.11 to universal connector is also easy to 
implement:
   
https://github.com/pnowojski/flink/tree/kafka-migration-0.11-to-universal-not-working
   But I didn't have time to make it work (as described above).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to