On 5/3/23 19:57, Kenneth Knowles wrote:
My big picture hot take: this is useful, but the problem we really need to solve is topology change, which will obsolete coder evolution.

I think Beam model has a role in this. It isn't just a runner-specific thing. We need to ensure the model makes it possible/easy to replace one pipeline with another, and to define how that should look. For example, a composite PTransform with the same input/output types should have its internals replaced in some blue/green way in some cases. This would of course includes lots of coder changes without any notion of compatibility possible. And for a pipeline that looks pretty much the same except for some encoding change, we should definitely see if we can define a more localized migration process.
Yes, change of coder can be viewed as a simplistic change in topology (if we assume that coder is a property of edge connecting two nodes). In this sense, if we were to provide way for topology upgrade, then this would be solved as well. On the other hand I'm a little afraid that fully generic change in topology is unachievable, because the state stored in the previous Pipeline might not have any relation with what should be stored in the upgraded Pipeline. Yes, there can be (probably many) cases when this could be possible.

I haven't thought about this in a while so I don't have technical proposals for achieving it yet. Ultimately as a user of Beam and/or any runner I would consider being able to run a brand new unrelated pipeline and hot swap it for the live one to be a prerequisite to production-readiness, but it has been a while since this was what I did day-to-day.
Agree, the ability to do this is indeed necessary. The argument is that this might be costly. It would help if we provided a way to bootstrap state for streaming Pipeline via batch Pipeline (and even better, in runner-agnostic way, so that users could use different runners for both). But this is a completely different topic. :-)

Returning back to my original motivation - all this would be a lot of work, so I think it is reasonable to propose a short cut - deprecate KryoCoder and introduce Kryo5Coder (or wait for Kryo 6) as an alternative and let users handle the transition themselves.

 Jan


Kenn

On Wed, May 3, 2023 at 8:07 AM Byron Ellis via dev <dev@beam.apache.org> wrote:

    I think I'm not understanding the use case here? Are we talking
    about encoding of data in motion (e.g. between stages of a
    streaming pipeline) or data at rest? (e.g. input formats and
    output formats) Or maybe something else?

This is both data in motion (between stages of Pipeline) and at rest, but inside the Pipeline (state). Not input/output formats.


    On Wed, May 3, 2023 at 6:58 AM Jan Lukavský <je...@seznam.cz> wrote:

        Hi,

        I'd like to discuss a topic, that from time to time appears in
        different
        contexts (e.g. [1]). I'd like restate the problem in a
        slightly more
        generic way as: "Should we have a way to completely exchange
        coder of a
        PCollection/state of a _running_ Pipeline?". First my
        motivation for
        this question - Beam has an extension called
        beam-sdks-java-extensions-kryo, which contains a KryoCoder.
        This coder
        uses Kryo [2] to serialize virtually any Java class into
        binary format.
        Unfortunately, this binary representation differs between Kryo
        versions
        and it does not contain any way to recognize which version of
        Kryo was
        used to serialize the data. Attempt to deserialize bytes
        produced by
        incompatible version of Kryo results in an exception. The current
        version of Kryo that is used by the KryoCoder is already more
        than 5
        years old and upgrade to newer version is needed, because the
        current
        version does not work with JDK17+ [3]. Thus, the only option
        seems to be
        the creation of a different Coder (e.g. Kryo5Coder), but then
        we need
        the ability to transfer Pipelines using the old KryoCoder to
        the newer
        one. That is, we need to completely switch coder that encodes
        PCollection and/or state.

        We have therefore the following options:

          1) Simply ignore this and let users rerun the Pipeline from
        scratch.
        This is possible, essentially should be applicable, but if
        anything
        else, for some Pipelines it might be costly to reprocess all
        historical
        data.

          2) We can create the new Coder and let users use a
        runner-specific way
        to convert the Pipeline. E.g. in case of Flink, this could be
        done by
        converting savepoint into the new format. This requires
        knowledge of how
        Beam stores state (namespaces) and is kind of involved on the
        user side.
        We could probably provide runner-specific tools for this, but
        some
        runners, in general, might not allow such state manipulation.

          3) We can include the information of a Coder update into the
        Pipeline
        and resubmit it to the runner and let the runner handle it. Upon
        Pipeline restart, a runner would have to convert all state and
        all
        inflight data from the old Coder to the new one, before
        resuming the
        Pipeline.

        Option 3) seems like the most natural, but it requires support
        on the
        runner side.

        I leave the details on how a runner would do this open, I'm
        currently
        interested in knowing what is the community's position on this.

          Jan

        [1]
        https://lists.apache.org/thread/z2m1hg4l5k2kb7nhjkv2lnwf8g4t9wps

        [2] https://github.com/EsotericSoftware/kryo

        [3] https://github.com/EsotericSoftware/kryo/issues/885

Reply via email to