Hi,
while implementing FlinkRunner for Flink 1.17 I tried to verify that a
running Pipeline is able to successfully upgrade from Flink 1.16 to
Flink 1.17. There is some change regarding serialization needed for
Flink 1.17, so this was a concern. Unfortunately recently we merged
core-construction-java into SDK, which resulted in some classes being
repackaged. Unfortunately, we serialize some classes into Flink's
check/savepoints. The renaming of the class therefore ends with the
following exception trying to restore from the savepoint:
Caused by: java.lang.ClassNotFoundException:
org.apache.beam.runners.core.construction.SerializablePipelineOptions
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at
org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at
org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
This means that no Pipeline will be able to successfully upgrade from
version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
restarted from scratch). I wanted to know how the community would feel
about that, this consequence probably was not clear when we merged the
artifacts. The only option would be to revert the merge and then try to
figure out how to avoid Java serialization in Flink's savepoints. That
would definitely be costly in terms of implementation and even more to
provide ways to transfer old savepoints to the new format (can be
possible using state processor API). I'm aware that Beam provides no
general guarantees about the upgrade compatibility, so it might be fine
to just ignore this, I just wanted to shout this out loud so that we can
make a deliberate decision.
Best,
Jan