Great. Let me know if I can help. I broke it after all :-) Kenn
On Thu, Feb 22, 2024 at 2:58 AM Jan Lukavský <je...@seznam.cz> wrote: > Reasons we use Java serialization are not fundamental, probably only > historical. Thinking about it, yes, there is lucky coincidence that we > currently have to change the serialization because of Flink 1.17 support. > Flink 1.17 actually removes the legacy java serialization from Flink and > enforces custom serialization. Therefore, we need to introduce an upgrade > compatible change of serialization to support Flink 1.17. This is already > implemented in [1]. The PR can go further, though. We can replace Java > serialization of Coder in the TypeSerializerSnapshot and use the portable > representation of Coder (which will still use Java serialization in some > cases, but might avoid it at least for well-known coders, moreover Coders > should be more upgrade-stable classes). > > I'll try to restore the SerializablePipelineOptions (copy&paste) in > FlinkRunner only and rework the serialization in a more stable way (at > least avoid serializing the CoderTypeSerializer, which references the > SerializablePipelineOptions). > > I created [2] and marked it as blocker for 2.55.0 release, because > otherwise we would break the upgrade. > > Thanks for the discussion, it helped a lot. > > Jan > > [1] https://github.com/apache/beam/pull/30197 > > [2] https://github.com/apache/beam/issues/30385 > On 2/21/24 20:33, Kenneth Knowles wrote: > > Yea I think we should restore the necessary classes but also fix the > FlinkRunner. Java serialization is inherently self-update-incompatible. > > On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev <dev@beam.apache.org> > wrote: > >> Is there a fundamental reason we serialize java classes into Flink >> savepoints. >> >> On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev < >> dev@beam.apache.org> wrote: >> >>> We could consider merging the gradle targets without renaming the >>> classpaths as an intermediate step. >>> >>> Optimistically, perhaps there's a small number of classes that we need >>> to preserve (e.g. SerializablePipelineOptions looks like it was >>> something specifically intended to be serialized; maybe that an a >>> handful of others (that implement Serializable) could be left in their >>> original packages for backwards compatibility reasons? >>> >>> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz> wrote: >>> > >>> > Hi, >>> > >>> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a >>> > running Pipeline is able to successfully upgrade from Flink 1.16 to >>> > Flink 1.17. There is some change regarding serialization needed for >>> > Flink 1.17, so this was a concern. Unfortunately recently we merged >>> > core-construction-java into SDK, which resulted in some classes being >>> > repackaged. Unfortunately, we serialize some classes into Flink's >>> > check/savepoints. The renaming of the class therefore ends with the >>> > following exception trying to restore from the savepoint: >>> > >>> > Caused by: java.lang.ClassNotFoundException: >>> > org.apache.beam.runners.core.construction.SerializablePipelineOptions >>> > at java.base/java.net >>> .URLClassLoader.findClass(URLClassLoader.java:476) >>> > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>> > at >>> > >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67) >>> > at >>> > >>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>> > at >>> > >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) >>> > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>> > at >>> > >>> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192) >>> > at java.base/java.lang.Class.forName0(Native Method) >>> > at java.base/java.lang.Class.forName(Class.java:398) >>> > at >>> > >>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) >>> > at >>> > >>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251) >>> > >>> > >>> > This means that no Pipeline will be able to successfully upgrade from >>> > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be >>> > restarted from scratch). I wanted to know how the community would feel >>> > about that, this consequence probably was not clear when we merged the >>> > artifacts. The only option would be to revert the merge and then try to >>> > figure out how to avoid Java serialization in Flink's savepoints. That >>> > would definitely be costly in terms of implementation and even more to >>> > provide ways to transfer old savepoints to the new format (can be >>> > possible using state processor API). I'm aware that Beam provides no >>> > general guarantees about the upgrade compatibility, so it might be fine >>> > to just ignore this, I just wanted to shout this out loud so that we >>> can >>> > make a deliberate decision. >>> > >>> > Best, >>> > >>> > Jan >>> > >>> >>