>
> Based on what Aljoscha said, storing it in MapState would be better as in
> practice the map will be sorted (as long as the key encoding is order
> preserving, and on quick glance InstantCoder is order preserving), so I'll
> change that


This property only holds for RocksDB based state backend, so we
unfortunately can not leverage that. For example HeapStateBackend is backed
by a simple HashMap.

Best,
D.

On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax <[email protected]> wrote:

> Sounds great!
>
> I implemented the naive approach, but my current impl is based on
> ListState, sorting it every time it's read. Based on what Aljoscha said,
> storing it in MapState would be better as in practice the map will be
> sorted (as long as the key encoding is order preserving, and on quick
> glance InstantCoder is order preserving), so I'll change that. This still
> requires fetching the entire list every time, even if the user only issues
> a range fetch, but probably good enough for now. Would be cool if Flink
> were able to support this natively and efficiently.
>
> While trying to test this I realized that the Flink runner also didn't
> support the OnWindowExpiration callback and all the relevant tests
> used that callback. I'll first send a PR implementing OnWindowExpiration.
>
> To Robert's point - we should definitely add this to the protos and
> support it on the portable runners as well!
>
> Reuven
>
> On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek <[email protected]>
> wrote:
>
>> Hi,
>>
>> sorry for the slow response! I was on vacation and only saw this now.
>>
>> I can't add much more than what David already said: right now, it's not
>> possible to do an *efficient* implementation on Flink. The closest Flink
>> state type would be MapState, which keeps state in multiple underlying
>> RocksDB key/value cells (for the RocksDB state backend). That already keeps
>> things sorted, but it's sorted by the serialized representation in RocksDB,
>> which might just be the correct sorting for int64 timestamps. We don't
>> expose that guarantee, though, because it might not be true for other state
>> backends.
>>
>> It would require convincing the Flink folks to add such a new state type,
>> though.
>>
>> Best,
>> Aljoscha
>>
>> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote:
>> > OrderedListState
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
>> >
>> > was
>> > added to Beam over a year ago. To date it is only supported by the
>> > Dataflow
>> > runner and the DirectRunner. I want to see if it's possible to support
>> > this
>> > well on the Flink runner (and eventually on other runners as well).
>> >
>> > This is a state type that is indexed by an int64 sorting key (the Beam
>> API
>> > exposes this as a 64-bit timestamp, as that's the most-common use case,
>> but
>> > fundamentally it's just an integer). Users can insert element, fetch
>> ranges
>> > of elements, and delete ranges of elements.
>> >
>> > Is there any way to implement this on Flink? I could of course add a
>> > naive implementation - store everything in a ListState and have the
>> Flink
>> > runner sort the list every time it is fetched. This seems quite
>> > inefficient, and the range deletes will be even less efficient.
>> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that
>> Flink
>> > has considered sorted state primitives, but I don't see any activity on
>> > this issue.
>> >
>> > Is there any way to do this, or should I just add the naive
>> implementation?
>> >
>> > Reuven
>>
>

Reply via email to