Hi Reuven,

I've put together a little POC of the Flink side support [1] for
OrderListState over the weekend, PTAL. Overall I'd say it should be pretty
straightforward and backward compatible change.

This effort will definitely require creating a new FLIP and making a good
case for the Flink community. Would you be willing to participate in this
contribution?

[1]
https://github.com/dmvk/flink/commit/ecdbc774b13b515e8c0943b2c143fb1e34eca6f0

Best,
D.

On Tue, Nov 16, 2021 at 8:34 PM David Morávek <[email protected]> wrote:

> Yes, my intuition is that it should be more or less the same. There might
> be some penalty in terms of having more keys in the RocksDB, eg. on the
> delete code path you need to write more tombstones + compaction, but other
> than that I can't think of anything.
>
> You'd still need to sort all the data in memory after reading them, but in
> this particular case it should save some cycles.
>
> On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax <[email protected]> wrote:
>
>> Right, but no worse than storing in ListState for that case, right?
>>
>> On Tue, Nov 16, 2021 at 10:44 AM David Morávek <[email protected]> wrote:
>>
>>> Based on what Aljoscha said, storing it in MapState would be better as
>>>> in practice the map will be sorted (as long as the key encoding is order
>>>> preserving, and on quick glance InstantCoder is order preserving), so I'll
>>>> change that
>>>
>>>
>>> This property only holds for RocksDB based state backend, so we
>>> unfortunately can not leverage that. For example HeapStateBackend is backed
>>> by a simple HashMap.
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> Sounds great!
>>>>
>>>> I implemented the naive approach, but my current impl is based on
>>>> ListState, sorting it every time it's read. Based on what Aljoscha said,
>>>> storing it in MapState would be better as in practice the map will be
>>>> sorted (as long as the key encoding is order preserving, and on quick
>>>> glance InstantCoder is order preserving), so I'll change that. This still
>>>> requires fetching the entire list every time, even if the user only issues
>>>> a range fetch, but probably good enough for now. Would be cool if Flink
>>>> were able to support this natively and efficiently.
>>>>
>>>> While trying to test this I realized that the Flink runner also didn't
>>>> support the OnWindowExpiration callback and all the relevant tests
>>>> used that callback. I'll first send a PR implementing OnWindowExpiration.
>>>>
>>>> To Robert's point - we should definitely add this to the protos and
>>>> support it on the portable runners as well!
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> sorry for the slow response! I was on vacation and only saw this now.
>>>>>
>>>>> I can't add much more than what David already said: right now, it's
>>>>> not possible to do an *efficient* implementation on Flink. The closest
>>>>> Flink state type would be MapState, which keeps state in multiple
>>>>> underlying RocksDB key/value cells (for the RocksDB state backend). That
>>>>> already keeps things sorted, but it's sorted by the serialized
>>>>> representation in RocksDB, which might just be the correct sorting for
>>>>> int64 timestamps. We don't expose that guarantee, though, because it might
>>>>> not be true for other state backends.
>>>>>
>>>>> It would require convincing the Flink folks to add such a new state
>>>>> type, though.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote:
>>>>> > OrderedListState
>>>>> > <
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
>>>>> >
>>>>> > was
>>>>> > added to Beam over a year ago. To date it is only supported by the
>>>>> > Dataflow
>>>>> > runner and the DirectRunner. I want to see if it's possible to
>>>>> support
>>>>> > this
>>>>> > well on the Flink runner (and eventually on other runners as well).
>>>>> >
>>>>> > This is a state type that is indexed by an int64 sorting key (the
>>>>> Beam API
>>>>> > exposes this as a 64-bit timestamp, as that's the most-common use
>>>>> case, but
>>>>> > fundamentally it's just an integer). Users can insert element, fetch
>>>>> ranges
>>>>> > of elements, and delete ranges of elements.
>>>>> >
>>>>> > Is there any way to implement this on Flink? I could of course add a
>>>>> > naive implementation - store everything in a ListState and have the
>>>>> Flink
>>>>> > runner sort the list every time it is fetched. This seems quite
>>>>> > inefficient, and the range deletes will be even less efficient.
>>>>> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply
>>>>> that Flink
>>>>> > has considered sorted state primitives, but I don't see any activity
>>>>> on
>>>>> > this issue.
>>>>> >
>>>>> > Is there any way to do this, or should I just add the naive
>>>>> implementation?
>>>>> >
>>>>> > Reuven
>>>>>
>>>>

Reply via email to