Yes, my intuition is that it should be more or less the same. There might
be some penalty in terms of having more keys in the RocksDB, eg. on the
delete code path you need to write more tombstones + compaction, but other
than that I can't think of anything.

You'd still need to sort all the data in memory after reading them, but in
this particular case it should save some cycles.

On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax <[email protected]> wrote:

> Right, but no worse than storing in ListState for that case, right?
>
> On Tue, Nov 16, 2021 at 10:44 AM David Morávek <[email protected]> wrote:
>
>> Based on what Aljoscha said, storing it in MapState would be better as in
>>> practice the map will be sorted (as long as the key encoding is order
>>> preserving, and on quick glance InstantCoder is order preserving), so I'll
>>> change that
>>
>>
>> This property only holds for RocksDB based state backend, so we
>> unfortunately can not leverage that. For example HeapStateBackend is backed
>> by a simple HashMap.
>>
>> Best,
>> D.
>>
>> On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax <[email protected]> wrote:
>>
>>> Sounds great!
>>>
>>> I implemented the naive approach, but my current impl is based on
>>> ListState, sorting it every time it's read. Based on what Aljoscha said,
>>> storing it in MapState would be better as in practice the map will be
>>> sorted (as long as the key encoding is order preserving, and on quick
>>> glance InstantCoder is order preserving), so I'll change that. This still
>>> requires fetching the entire list every time, even if the user only issues
>>> a range fetch, but probably good enough for now. Would be cool if Flink
>>> were able to support this natively and efficiently.
>>>
>>> While trying to test this I realized that the Flink runner also didn't
>>> support the OnWindowExpiration callback and all the relevant tests
>>> used that callback. I'll first send a PR implementing OnWindowExpiration.
>>>
>>> To Robert's point - we should definitely add this to the protos and
>>> support it on the portable runners as well!
>>>
>>> Reuven
>>>
>>> On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> sorry for the slow response! I was on vacation and only saw this now.
>>>>
>>>> I can't add much more than what David already said: right now, it's not
>>>> possible to do an *efficient* implementation on Flink. The closest Flink
>>>> state type would be MapState, which keeps state in multiple underlying
>>>> RocksDB key/value cells (for the RocksDB state backend). That already keeps
>>>> things sorted, but it's sorted by the serialized representation in RocksDB,
>>>> which might just be the correct sorting for int64 timestamps. We don't
>>>> expose that guarantee, though, because it might not be true for other state
>>>> backends.
>>>>
>>>> It would require convincing the Flink folks to add such a new state
>>>> type, though.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote:
>>>> > OrderedListState
>>>> > <
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
>>>> >
>>>> > was
>>>> > added to Beam over a year ago. To date it is only supported by the
>>>> > Dataflow
>>>> > runner and the DirectRunner. I want to see if it's possible to
>>>> support
>>>> > this
>>>> > well on the Flink runner (and eventually on other runners as well).
>>>> >
>>>> > This is a state type that is indexed by an int64 sorting key (the
>>>> Beam API
>>>> > exposes this as a 64-bit timestamp, as that's the most-common use
>>>> case, but
>>>> > fundamentally it's just an integer). Users can insert element, fetch
>>>> ranges
>>>> > of elements, and delete ranges of elements.
>>>> >
>>>> > Is there any way to implement this on Flink? I could of course add a
>>>> > naive implementation - store everything in a ListState and have the
>>>> Flink
>>>> > runner sort the list every time it is fetched. This seems quite
>>>> > inefficient, and the range deletes will be even less efficient.
>>>> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that
>>>> Flink
>>>> > has considered sorted state primitives, but I don't see any activity
>>>> on
>>>> > this issue.
>>>> >
>>>> > Is there any way to do this, or should I just add the naive
>>>> implementation?
>>>> >
>>>> > Reuven
>>>>
>>>

Reply via email to