On Wed, Feb 21, 2024 at 11:58 PM Jan Lukavský <je...@seznam.cz> wrote:
>
> Reasons we use Java serialization are not fundamental, probably only 
> historical. Thinking about it, yes, there is lucky coincidence that we 
> currently have to change the serialization because of Flink 1.17 support. 
> Flink 1.17 actually removes the legacy java serialization from Flink and 
> enforces custom serialization. Therefore, we need to introduce an upgrade 
> compatible change of serialization to support Flink 1.17. This is already 
> implemented in [1]. The PR can go further, though. We can replace Java 
> serialization of Coder in the TypeSerializerSnapshot and use the portable 
> representation of Coder (which will still use Java serialization in some 
> cases, but might avoid it at least for well-known coders, moreover Coders 
> should be more upgrade-stable classes).
>
> I'll try to restore the SerializablePipelineOptions (copy&paste) in 
> FlinkRunner only and rework the serialization in a more stable way (at least 
> avoid serializing the CoderTypeSerializer, which references the 
> SerializablePipelineOptions).

Thanks!

If other runners use this (SerializablePipelineOptions seems like it's
explicitly created for this purpose) we could consider putting the
copy into core rather than just the Flink packages.

> I created [2] and marked it as blocker for 2.55.0 release, because otherwise 
> we would break the upgrade.
>
> Thanks for the discussion, it helped a lot.
>
>  Jan
>
> [1] https://github.com/apache/beam/pull/30197
>
> [2] https://github.com/apache/beam/issues/30385
>
> On 2/21/24 20:33, Kenneth Knowles wrote:
>
> Yea I think we should restore the necessary classes but also fix the 
> FlinkRunner. Java serialization is inherently self-update-incompatible.
>
> On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev <dev@beam.apache.org> 
> wrote:
>>
>> Is there a fundamental reason we serialize java classes into Flink 
>> savepoints.
>>
>> On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev 
>> <dev@beam.apache.org> wrote:
>>>
>>> We could consider merging the gradle targets without renaming the
>>> classpaths as an intermediate step.
>>>
>>> Optimistically, perhaps there's a small number of classes that we need
>>> to preserve (e.g. SerializablePipelineOptions looks like it was
>>> something specifically intended to be serialized; maybe that an a
>>> handful of others (that implement Serializable) could be left in their
>>> original packages for backwards compatibility reasons?
>>>
>>> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz> wrote:
>>> >
>>> > Hi,
>>> >
>>> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
>>> > running Pipeline is able to successfully upgrade from Flink 1.16 to
>>> > Flink 1.17. There is some change regarding serialization needed for
>>> > Flink 1.17, so this was a concern. Unfortunately recently we merged
>>> > core-construction-java into SDK, which resulted in some classes being
>>> > repackaged. Unfortunately, we serialize some classes into Flink's
>>> > check/savepoints. The renaming of the class therefore ends with the
>>> > following exception trying to restore from the savepoint:
>>> >
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
>>> >      at 
>>> > java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>> >      at
>>> > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>>> >      at
>>> > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> >      at
>>> > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>> >      at
>>> > org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>>> >      at java.base/java.lang.Class.forName0(Native Method)
>>> >      at java.base/java.lang.Class.forName(Class.java:398)
>>> >      at
>>> > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>> >      at
>>> > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>>> >
>>> >
>>> > This means that no Pipeline will be able to successfully upgrade from
>>> > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
>>> > restarted from scratch). I wanted to know how the community would feel
>>> > about that, this consequence probably was not clear when we merged the
>>> > artifacts. The only option would be to revert the merge and then try to
>>> > figure out how to avoid Java serialization in Flink's savepoints. That
>>> > would definitely be costly in terms of implementation and even more to
>>> > provide ways to transfer old savepoints to the new format (can be
>>> > possible using state processor API). I'm aware that Beam provides no
>>> > general guarantees about the upgrade compatibility, so it might be fine
>>> > to just ignore this, I just wanted to shout this out loud so that we can
>>> > make a deliberate decision.
>>> >
>>> > Best,
>>> >
>>> >   Jan
>>> >

Reply via email to