My big picture hot take: this is useful, but the problem we really need to
solve is topology change, which will obsolete coder evolution.

I think Beam model has a role in this. It isn't just a runner-specific
thing. We need to ensure the model makes it possible/easy to replace one
pipeline with another, and to define how that should look. For example, a
composite PTransform with the same input/output types should have its
internals replaced in some blue/green way in some cases. This would of
course includes lots of coder changes without any notion of compatibility
possible. And for a pipeline that looks pretty much the same except for
some encoding change, we should definitely see if we can define a more
localized migration process.

I haven't thought about this in a while so I don't have technical proposals
for achieving it yet. Ultimately as a user of Beam and/or any runner I
would consider being able to run a brand new unrelated pipeline and hot
swap it for the live one to be a prerequisite to production-readiness, but
it has been a while since this was what I did day-to-day.


On Wed, May 3, 2023 at 8:07 AM Byron Ellis via dev <>

> I think I'm not understanding the use case here? Are we talking about
> encoding of data in motion (e.g. between stages of a streaming pipeline) or
> data at rest? (e.g. input formats and output formats) Or maybe something
> else?
> On Wed, May 3, 2023 at 6:58 AM Jan Lukavský <> wrote:
>> Hi,
>> I'd like to discuss a topic, that from time to time appears in different
>> contexts (e.g. [1]). I'd like restate the problem in a slightly more
>> generic way as: "Should we have a way to completely exchange coder of a
>> PCollection/state of a _running_ Pipeline?". First my motivation for
>> this question - Beam has an extension called
>> beam-sdks-java-extensions-kryo, which contains a KryoCoder. This coder
>> uses Kryo [2] to serialize virtually any Java class into binary format.
>> Unfortunately, this binary representation differs between Kryo versions
>> and it does not contain any way to recognize which version of Kryo was
>> used to serialize the data. Attempt to deserialize bytes produced by
>> incompatible version of Kryo results in an exception. The current
>> version of Kryo that is used by the KryoCoder is already more than 5
>> years old and upgrade to newer version is needed, because the current
>> version does not work with JDK17+ [3]. Thus, the only option seems to be
>> the creation of a different Coder (e.g. Kryo5Coder), but then we need
>> the ability to transfer Pipelines using the old KryoCoder to the newer
>> one. That is, we need to completely switch coder that encodes
>> PCollection and/or state.
>> We have therefore the following options:
>>   1) Simply ignore this and let users rerun the Pipeline from scratch.
>> This is possible, essentially should be applicable, but if anything
>> else, for some Pipelines it might be costly to reprocess all historical
>> data.
>>   2) We can create the new Coder and let users use a runner-specific way
>> to convert the Pipeline. E.g. in case of Flink, this could be done by
>> converting savepoint into the new format. This requires knowledge of how
>> Beam stores state (namespaces) and is kind of involved on the user side.
>> We could probably provide runner-specific tools for this, but some
>> runners, in general, might not allow such state manipulation.
>>   3) We can include the information of a Coder update into the Pipeline
>> and resubmit it to the runner and let the runner handle it. Upon
>> Pipeline restart, a runner would have to convert all state and all
>> inflight data from the old Coder to the new one, before resuming the
>> Pipeline.
>> Option 3) seems like the most natural, but it requires support on the
>> runner side.
>> I leave the details on how a runner would do this open, I'm currently
>> interested in knowing what is the community's position on this.
>>   Jan
>> [1]
>> [2]
>> [3]

Reply via email to