Ah, yes. We have this field where an SDK can declare its capabilities:
https://github.com/apache/beam/blob/908d43e65ca8281f4a8d6188dac30a5964cb213c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1559

Kenn

On Fri, Apr 25, 2025 at 10:44 AM Radek Stankiewicz <radosl...@google.com>
wrote:

> Breaking change is when a portable runner (e.g. UW) running on a newer
> version, starts to add an extension to WindowedValue which the old SDK
> hasn't expected. Old SDK only expects FIRST, ONE_INDEX or TWO_INDICES,
> without an extension bit.
>
> Radek
>
> On Fri, Apr 25, 2025 at 4:33 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> I love using one WITH_EXTENSIONS bit and then adding a proto. Especially
>> for full WindowedValue where there is already ~10-12 bytes overhead, it
>> doesn't make sense to create our own extensible encoding protocol.
>>
>>  - I think we can actually do this without a breaking change, too. Which
>> part were you thinking would be a breaking change?
>>  - Regarding "being a pain" to change, I think adding fields to
>> WindowedValue and adding fields to the extension *should* be about the same
>> code difficulty. In both cases the new fields must be optional and have
>> nulls or defaults. Putting it in a new object makes sense, though, since
>> it'll be encoded separately with proto, which is way easier and more robust.
>>
>> Anyhow +100 to this new directly for the idea. There are even more
>> extensions that I've been thinking about and this makes it very flexible.
>>
>> Kenn
>>
>> On Fri, Apr 25, 2025 at 9:17 AM Radek Stankiewicz <radosl...@google.com>
>> wrote:
>>
>>> hi Kenn,
>>> I would add option 3
>>> Option 3:  Use current way of using tag, add drain in additional proto
>>> object
>>>
>>>    -
>>>
>>>    The tag & 0b0111 indicates whether it is using FIRST, ONE_INDEX or
>>>    TWO_INDICES encoding and future options if needed.
>>>    -
>>>
>>>    tag & 0b10000 ("WITH_EXTENSION" ) indicates there is extension.
>>>    -
>>>
>>>    If extension is set, an additional proto object is passed.
>>>    -
>>>
>>>    Proto provides natural way of versioning and allows adding more
>>>    fields in the future
>>>    -
>>>
>>>       No extension bit unspecified:
>>>       -
>>>
>>>          Drain mode unspecified
>>>          -
>>>
>>>          Tracing Context is null
>>>          -
>>>
>>>       Extension bit set:
>>>       -
>>>
>>>          Drain mode can be set
>>>          -
>>>
>>>          Tracing context and trace state can be set
>>>          -
>>>
>>>       Extension bit set, some future fields are added:
>>>       -
>>>
>>>          etc.
>>>
>>> The Extension object within WindowedValue (WV) shouldn’t be accessed
>>> directly by the user, ideally when new field is added, e.g. drainMode, WV
>>> should have added getter getDrainMode() and access directly enum from
>>> extension
>>> public WindowedValueExtensions.DrainMode getDrainMode(){
>>>       return extensions.getDrain();
>>>     }
>>>
>>> To set the value drain mode:
>>> windowedElem=
>>> windowedElem.withExtension(windowedElem.getExtensions().toBuilder().setDrain(WindowedValueExtensions.DrainMode.
>>> *DRAIN_MODE_DRAINING*).build());
>>> or:
>>> public WindowedValue<T> withDrainMode(DrainMode dm){
>>>       return
>>> this.withExtension(windowedElem.getExtensions().toBuilder().setDrain(dm));
>>> }
>>>
>>> Adding new fields to WindowedValue is a pain as it requires multiple
>>> changes all over the place, so doing it once with extensions field should
>>> simplify future fields.
>>>
>>> There may be situations where due to performance reasons storing value
>>> in extension is not optimal - e.g. tracing context as object is parsed
>>> array of strings but for interoperability it is usually propagated as two
>>> strings (traceparent, tracestate). In such case:
>>>
>>>    -
>>>
>>>    WV should have added field traceContext of type Context
>>>    -
>>>
>>>    WV should have added getter getTraceContext() and access field
>>>    directly
>>>    -
>>>
>>>    WV should have added withTraceContext() method to construct WV with
>>>    previous fields
>>>    -
>>>
>>>    When sending WV over the wire, the WV coder should encode
>>>    traceContext into extensions object
>>>    -
>>>
>>>    When receiving WV over the wire, the WV coder should decode
>>>    extension and construct optimal traceContext if traceparent and 
>>> tracestate
>>>    exists in extension.
>>>
>>>
>>> Summary
>>> Pros:
>>>
>>>    - We are no longer limited with Pane bits as a way of extending
>>>    WindowedValue
>>>    - There is no overhead if none of the extended fields is set.
>>>
>>> Cons:
>>>
>>>    - Memory overhead on keeping additional deserialized objects (here,
>>>    trace context).
>>>    - Confusion where is the field (wv.getContext()  or
>>>    wv.getExtensions().getTraceParent())
>>>    - Breaking change
>>>
>>>
>>>
>>> On Wed, Apr 2, 2025 at 11:17 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Just to organize my thoughts and to refresh myself on how this would
>>>> integrate with each SDK, I wrote this short follow-up on the API, PaneInfo
>>>> encoding, and proto changes. I'm especially interested in feedback from
>>>> people familiar with various SDKs about mistakes I've made in that SDK and
>>>> the most idiomatic ways to represent this.
>>>>
>>>>    https://s.apache.org/beam-drain-mode
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Mar 18, 2025 at 11:42 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> Thanks for all the detailed insightful comments! I've made some edits
>>>>> to reflect your insights and also settled on what I think is the most
>>>>> feasible plan, considering effort and backwards-compatibility.
>>>>>
>>>>> The work is actually now quite small (yay!). I've added these at the
>>>>> bottom of the doc:
>>>>>
>>>>>  - Add "is drain" bit for aggregations
>>>>>  - Add "is drain" bit for timer callbacks
>>>>>  - Fire processing time timers instantly during drain (likely getting
>>>>> dropped because their window is expired)
>>>>>  - Immediately drop already-expired processing time timers (since they
>>>>> are doomed to be dropped when they fire)
>>>>>
>>>>> This is a set of backwards-compatible changes consistent with "option
>>>>> 1" change proposal in the doc.
>>>>>
>>>>> Let me know if you think this is not enough or too much.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Mar 12, 2025 at 6:33 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Hi Kenn,
>>>>>>
>>>>>> thanks for putting this down on paper. This is great initiative as it
>>>>>> might touch some core parts of the model we are actually somewhat 
>>>>>> circling
>>>>>> around. I left some comments and I'm looking forward to the broad
>>>>>> discussion this definitely deserves.
>>>>>>
>>>>>>  Jan
>>>>>> On 3/11/25 15:46, Kenneth Knowles wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I've spent some time on the rather hairy details of timers, batch,
>>>>>> drain, and timer loops lately. We have some inconsistencies in this area
>>>>>> that regularly bite users.
>>>>>>
>>>>>> I've written up what I *think* are the desired semantics. some notes
>>>>>> about the current status of implementation, and a few proposals for how 
>>>>>> we
>>>>>> can improve things.
>>>>>>
>>>>>> https://s.apache.org/beam-timers-and-drain
>>>>>>
>>>>>> Please take a look! This stuff is hard and I could really use as many
>>>>>> smart pairs of eyes on this as possible.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>

Reply via email to