Hi Reuven,

cc d...@flink.apache.org.

 Jan

On 11/12/21 04:35, Reuven Lax wrote:
OrderedListState <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java> was added to Beam over a year ago. To date it is only supported by the Dataflow runner and the DirectRunner. I want to see if it's possible to support this well on the Flink runner (and eventually on other runners as well).

This is a state type that is indexed by an int64 sorting key (the Beam API exposes this as a 64-bit timestamp, as that's the most-common use case, but fundamentally it's just an integer). Users can insert element, fetch ranges of elements, and delete ranges of elements.

Is there any way to implement this on Flink? I could of course add a naive implementation - store everything in a ListState and have the Flink runner sort the list every time it is fetched. This seems quite inefficient, and the range deletes will be even less efficient. https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that Flink has considered sorted state primitives, but I don't see any activity on this issue.

Is there any way to do this, or should I just add the naive implementation?

Reuven

Reply via email to