Hi Reuven, I've put together a little POC of the Flink side support [1] for OrderListState over the weekend, PTAL. Overall I'd say it should be pretty straightforward and backward compatible change.
This effort will definitely require creating a new FLIP and making a good case for the Flink community. Would you be willing to participate in this contribution? [1] https://github.com/dmvk/flink/commit/ecdbc774b13b515e8c0943b2c143fb1e34eca6f0 Best, D. On Tue, Nov 16, 2021 at 8:34 PM David Morávek <[email protected]> wrote: > Yes, my intuition is that it should be more or less the same. There might > be some penalty in terms of having more keys in the RocksDB, eg. on the > delete code path you need to write more tombstones + compaction, but other > than that I can't think of anything. > > You'd still need to sort all the data in memory after reading them, but in > this particular case it should save some cycles. > > On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax <[email protected]> wrote: > >> Right, but no worse than storing in ListState for that case, right? >> >> On Tue, Nov 16, 2021 at 10:44 AM David Morávek <[email protected]> wrote: >> >>> Based on what Aljoscha said, storing it in MapState would be better as >>>> in practice the map will be sorted (as long as the key encoding is order >>>> preserving, and on quick glance InstantCoder is order preserving), so I'll >>>> change that >>> >>> >>> This property only holds for RocksDB based state backend, so we >>> unfortunately can not leverage that. For example HeapStateBackend is backed >>> by a simple HashMap. >>> >>> Best, >>> D. >>> >>> On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax <[email protected]> wrote: >>> >>>> Sounds great! >>>> >>>> I implemented the naive approach, but my current impl is based on >>>> ListState, sorting it every time it's read. Based on what Aljoscha said, >>>> storing it in MapState would be better as in practice the map will be >>>> sorted (as long as the key encoding is order preserving, and on quick >>>> glance InstantCoder is order preserving), so I'll change that. This still >>>> requires fetching the entire list every time, even if the user only issues >>>> a range fetch, but probably good enough for now. Would be cool if Flink >>>> were able to support this natively and efficiently. >>>> >>>> While trying to test this I realized that the Flink runner also didn't >>>> support the OnWindowExpiration callback and all the relevant tests >>>> used that callback. I'll first send a PR implementing OnWindowExpiration. >>>> >>>> To Robert's point - we should definitely add this to the protos and >>>> support it on the portable runners as well! >>>> >>>> Reuven >>>> >>>> On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek <[email protected]> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> sorry for the slow response! I was on vacation and only saw this now. >>>>> >>>>> I can't add much more than what David already said: right now, it's >>>>> not possible to do an *efficient* implementation on Flink. The closest >>>>> Flink state type would be MapState, which keeps state in multiple >>>>> underlying RocksDB key/value cells (for the RocksDB state backend). That >>>>> already keeps things sorted, but it's sorted by the serialized >>>>> representation in RocksDB, which might just be the correct sorting for >>>>> int64 timestamps. We don't expose that guarantee, though, because it might >>>>> not be true for other state backends. >>>>> >>>>> It would require convincing the Flink folks to add such a new state >>>>> type, though. >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote: >>>>> > OrderedListState >>>>> > < >>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java >>>>> > >>>>> > was >>>>> > added to Beam over a year ago. To date it is only supported by the >>>>> > Dataflow >>>>> > runner and the DirectRunner. I want to see if it's possible to >>>>> support >>>>> > this >>>>> > well on the Flink runner (and eventually on other runners as well). >>>>> > >>>>> > This is a state type that is indexed by an int64 sorting key (the >>>>> Beam API >>>>> > exposes this as a 64-bit timestamp, as that's the most-common use >>>>> case, but >>>>> > fundamentally it's just an integer). Users can insert element, fetch >>>>> ranges >>>>> > of elements, and delete ranges of elements. >>>>> > >>>>> > Is there any way to implement this on Flink? I could of course add a >>>>> > naive implementation - store everything in a ListState and have the >>>>> Flink >>>>> > runner sort the list every time it is fetched. This seems quite >>>>> > inefficient, and the range deletes will be even less efficient. >>>>> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply >>>>> that Flink >>>>> > has considered sorted state primitives, but I don't see any activity >>>>> on >>>>> > this issue. >>>>> > >>>>> > Is there any way to do this, or should I just add the naive >>>>> implementation? >>>>> > >>>>> > Reuven >>>>> >>>>
