Great. Let me know if I can help. I broke it after all :-)

Kenn

On Thu, Feb 22, 2024 at 2:58 AM Jan Lukavský <je...@seznam.cz> wrote:

> Reasons we use Java serialization are not fundamental, probably only
> historical. Thinking about it, yes, there is lucky coincidence that we
> currently have to change the serialization because of Flink 1.17 support.
> Flink 1.17 actually removes the legacy java serialization from Flink and
> enforces custom serialization. Therefore, we need to introduce an upgrade
> compatible change of serialization to support Flink 1.17. This is already
> implemented in [1]. The PR can go further, though. We can replace Java
> serialization of Coder in the TypeSerializerSnapshot and use the portable
> representation of Coder (which will still use Java serialization in some
> cases, but might avoid it at least for well-known coders, moreover Coders
> should be more upgrade-stable classes).
>
> I'll try to restore the SerializablePipelineOptions (copy&paste) in
> FlinkRunner only and rework the serialization in a more stable way (at
> least avoid serializing the CoderTypeSerializer, which references the
> SerializablePipelineOptions).
>
> I created [2] and marked it as blocker for 2.55.0 release, because
> otherwise we would break the upgrade.
>
> Thanks for the discussion, it helped a lot.
>
>  Jan
>
> [1] https://github.com/apache/beam/pull/30197
>
> [2] https://github.com/apache/beam/issues/30385
> On 2/21/24 20:33, Kenneth Knowles wrote:
>
> Yea I think we should restore the necessary classes but also fix the
> FlinkRunner. Java serialization is inherently self-update-incompatible.
>
> On Wed, Feb 21, 2024 at 1:35 PM Reuven Lax via dev <dev@beam.apache.org>
> wrote:
>
>> Is there a fundamental reason we serialize java classes into Flink
>> savepoints.
>>
>> On Wed, Feb 21, 2024 at 9:51 AM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> We could consider merging the gradle targets without renaming the
>>> classpaths as an intermediate step.
>>>
>>> Optimistically, perhaps there's a small number of classes that we need
>>> to preserve (e.g. SerializablePipelineOptions looks like it was
>>> something specifically intended to be serialized; maybe that an a
>>> handful of others (that implement Serializable) could be left in their
>>> original packages for backwards compatibility reasons?
>>>
>>> On Wed, Feb 21, 2024 at 7:32 AM Jan Lukavský <je...@seznam.cz> wrote:
>>> >
>>> > Hi,
>>> >
>>> > while implementing FlinkRunner for Flink 1.17 I tried to verify that a
>>> > running Pipeline is able to successfully upgrade from Flink 1.16 to
>>> > Flink 1.17. There is some change regarding serialization needed for
>>> > Flink 1.17, so this was a concern. Unfortunately recently we merged
>>> > core-construction-java into SDK, which resulted in some classes being
>>> > repackaged. Unfortunately, we serialize some classes into Flink's
>>> > check/savepoints. The renaming of the class therefore ends with the
>>> > following exception trying to restore from the savepoint:
>>> >
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.beam.runners.core.construction.SerializablePipelineOptions
>>> >      at java.base/java.net
>>> .URLClassLoader.findClass(URLClassLoader.java:476)
>>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>> >      at
>>> >
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>>> >      at
>>> >
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> >      at
>>> >
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>>> >      at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>> >      at
>>> >
>>> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>>> >      at java.base/java.lang.Class.forName0(Native Method)
>>> >      at java.base/java.lang.Class.forName(Class.java:398)
>>> >      at
>>> >
>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>> >      at
>>> >
>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:251)
>>> >
>>> >
>>> > This means that no Pipeline will be able to successfully upgrade from
>>> > version prior to 2.55.0 to 2.55.0 (i.e. all Pipelines will have to be
>>> > restarted from scratch). I wanted to know how the community would feel
>>> > about that, this consequence probably was not clear when we merged the
>>> > artifacts. The only option would be to revert the merge and then try to
>>> > figure out how to avoid Java serialization in Flink's savepoints. That
>>> > would definitely be costly in terms of implementation and even more to
>>> > provide ways to transfer old savepoints to the new format (can be
>>> > possible using state processor API). I'm aware that Beam provides no
>>> > general guarantees about the upgrade compatibility, so it might be fine
>>> > to just ignore this, I just wanted to shout this out loud so that we
>>> can
>>> > make a deliberate decision.
>>> >
>>> > Best,
>>> >
>>> >   Jan
>>> >
>>>
>>

Reply via email to