Hi,

sorry for the slow response! I was on vacation and only saw this now.

I can't add much more than what David already said: right now, it's not 
possible to do an *efficient* implementation on Flink. The closest Flink state 
type would be MapState, which keeps state in multiple underlying RocksDB 
key/value cells (for the RocksDB state backend). That already keeps things 
sorted, but it's sorted by the serialized representation in RocksDB, which 
might just be the correct sorting for int64 timestamps. We don't expose that 
guarantee, though, because it might not be true for other state backends.

It would require convincing the Flink folks to add such a new state type, 
though.

Best,
Aljoscha

On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote:
> OrderedListState
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java>
> was
> added to Beam over a year ago. To date it is only supported by the 
> Dataflow
> runner and the DirectRunner. I want to see if it's possible to support 
> this
> well on the Flink runner (and eventually on other runners as well).
>
> This is a state type that is indexed by an int64 sorting key (the Beam API
> exposes this as a 64-bit timestamp, as that's the most-common use case, but
> fundamentally it's just an integer). Users can insert element, fetch ranges
> of elements, and delete ranges of elements.
>
> Is there any way to implement this on Flink? I could of course add a
> naive implementation - store everything in a ListState and have the Flink
> runner sort the list every time it is fetched. This seems quite
> inefficient, and the range deletes will be even less efficient.
> https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that Flink
> has considered sorted state primitives, but I don't see any activity on
> this issue.
>
> Is there any way to do this, or should I just add the naive implementation?
>
> Reuven

Reply via email to