Hi,

while implementing FlinkRunner for Flink 1.17 I tried to verify that a running Pipeline is able to successfully upgrade from Flink 1.16 to Flink 1.17. There is some change regarding serialization needed for Flink 1.17, so this was a concern. Unfortunately recently we merged core-construction-java into SDK, which resulted in some classes being repackaged. Unfortunately, we serialize some classes into Flink's check/savepoints. The renaming of the class therefore ends with the following exception trying to restore from the savepoint:

Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.core.construction.SerializablePipelineOptions
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:398)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)     at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)


This means that no Pipeline will be able to successfully upgrade from version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be restarted from scratch). I wanted to know how the community would feel about that, this consequence probably was not clear when we merged the artifacts. The only option would be to revert the merge and then try to figure out how to avoid Java serialization in Flink's savepoints. That would definitely be costly in terms of implementation and even more to provide ways to transfer old savepoints to the new format (can be possible using state processor API). I'm aware that Beam provides no general guarantees about the upgrade compatibility, so it might be fine to just ignore this, I just wanted to shout this out loud so that we can make a deliberate decision.

Best,

 Jan

Reply via email to