> > The only thing I'm not sure of is firstKey, lastKey methods on SortedMap - > could this be implemented efficiently in the rocksdb store? >
I think some kind of skip scan should be enough to do this. As for sorting by an arbitrary key, I'd be bit afraid of unlocking this door :) Just remember the long standing discussion about introducing sorting capabilities into the Beam Model. For years the community didn't want to hear anything about sorting and then came up to a conclusion that if you need sorting, you should never need to sort by anything else than a timestamp (eg. for state machines, temporal joins). Personally I can still imagine some use cases along the lines of building HFiles, but for this case one sorting pass at the flush should be enough, so we might just use some kind of external sorter instead. D. On Wed, Dec 1, 2021 at 6:03 PM Reuven Lax <[email protected]> wrote: > To randomize this even further, if Flink can support arbitrary keys, then > another approach here would be to implement Java's SortedMap API. > FetchRange could then be implemented via map.subMap(begin, end) and > clearRange could be done via map.subMap(begin, end).clear(). The only thing > I'm not sure of is firstKey, lastKey methods on SortedMap - could this be > implemented efficiently in the rocksdb store? > > On Tue, Nov 30, 2021 at 8:43 PM Reuven Lax <[email protected]> wrote: > >> Left a comment - wonder if we should let the list be sorted by an >> arbitrary key. instead of timestamp. Timestamp is likely the more-common >> use case, but I can imagine other use cases (e.g. imagining reading log >> entries and wanting to keep the entries sorted by LSN). >> >> On Mon, Nov 22, 2021 at 1:12 PM David Morávek <[email protected]> wrote: >> >>> Hi Reuven, >>> >>> I've put together a little POC of the Flink side support [1] for >>> OrderListState over the weekend, PTAL. Overall I'd say it should be pretty >>> straightforward and backward compatible change. >>> >>> This effort will definitely require creating a new FLIP and making a >>> good case for the Flink community. Would you be willing to participate in >>> this contribution? >>> >>> [1] >>> https://github.com/dmvk/flink/commit/ecdbc774b13b515e8c0943b2c143fb1e34eca6f0 >>> >>> Best, >>> D. >>> >>> On Tue, Nov 16, 2021 at 8:34 PM David Morávek <[email protected]> wrote: >>> >>>> Yes, my intuition is that it should be more or less the same. There >>>> might be some penalty in terms of having more keys in the RocksDB, eg. on >>>> the delete code path you need to write more tombstones + compaction, but >>>> other than that I can't think of anything. >>>> >>>> You'd still need to sort all the data in memory after reading them, but >>>> in this particular case it should save some cycles. >>>> >>>> On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> Right, but no worse than storing in ListState for that case, right? >>>>> >>>>> On Tue, Nov 16, 2021 at 10:44 AM David Morávek <[email protected]> >>>>> wrote: >>>>> >>>>>> Based on what Aljoscha said, storing it in MapState would be better >>>>>>> as in practice the map will be sorted (as long as the key encoding is >>>>>>> order >>>>>>> preserving, and on quick glance InstantCoder is order preserving), so >>>>>>> I'll >>>>>>> change that >>>>>> >>>>>> >>>>>> This property only holds for RocksDB based state backend, so we >>>>>> unfortunately can not leverage that. For example HeapStateBackend is >>>>>> backed >>>>>> by a simple HashMap. >>>>>> >>>>>> Best, >>>>>> D. >>>>>> >>>>>> On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Sounds great! >>>>>>> >>>>>>> I implemented the naive approach, but my current impl is based on >>>>>>> ListState, sorting it every time it's read. Based on what Aljoscha said, >>>>>>> storing it in MapState would be better as in practice the map will be >>>>>>> sorted (as long as the key encoding is order preserving, and on quick >>>>>>> glance InstantCoder is order preserving), so I'll change that. This >>>>>>> still >>>>>>> requires fetching the entire list every time, even if the user only >>>>>>> issues >>>>>>> a range fetch, but probably good enough for now. Would be cool if Flink >>>>>>> were able to support this natively and efficiently. >>>>>>> >>>>>>> While trying to test this I realized that the Flink runner also >>>>>>> didn't support the OnWindowExpiration callback and all the relevant >>>>>>> tests >>>>>>> used that callback. I'll first send a PR implementing >>>>>>> OnWindowExpiration. >>>>>>> >>>>>>> To Robert's point - we should definitely add this to the protos and >>>>>>> support it on the portable runners as well! >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> sorry for the slow response! I was on vacation and only saw this >>>>>>>> now. >>>>>>>> >>>>>>>> I can't add much more than what David already said: right now, it's >>>>>>>> not possible to do an *efficient* implementation on Flink. The closest >>>>>>>> Flink state type would be MapState, which keeps state in multiple >>>>>>>> underlying RocksDB key/value cells (for the RocksDB state backend). >>>>>>>> That >>>>>>>> already keeps things sorted, but it's sorted by the serialized >>>>>>>> representation in RocksDB, which might just be the correct sorting for >>>>>>>> int64 timestamps. We don't expose that guarantee, though, because it >>>>>>>> might >>>>>>>> not be true for other state backends. >>>>>>>> >>>>>>>> It would require convincing the Flink folks to add such a new state >>>>>>>> type, though. >>>>>>>> >>>>>>>> Best, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote: >>>>>>>> > OrderedListState >>>>>>>> > < >>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java >>>>>>>> > >>>>>>>> > was >>>>>>>> > added to Beam over a year ago. To date it is only supported by >>>>>>>> the >>>>>>>> > Dataflow >>>>>>>> > runner and the DirectRunner. I want to see if it's possible to >>>>>>>> support >>>>>>>> > this >>>>>>>> > well on the Flink runner (and eventually on other runners as >>>>>>>> well). >>>>>>>> > >>>>>>>> > This is a state type that is indexed by an int64 sorting key (the >>>>>>>> Beam API >>>>>>>> > exposes this as a 64-bit timestamp, as that's the most-common use >>>>>>>> case, but >>>>>>>> > fundamentally it's just an integer). Users can insert element, >>>>>>>> fetch ranges >>>>>>>> > of elements, and delete ranges of elements. >>>>>>>> > >>>>>>>> > Is there any way to implement this on Flink? I could of course >>>>>>>> add a >>>>>>>> > naive implementation - store everything in a ListState and have >>>>>>>> the Flink >>>>>>>> > runner sort the list every time it is fetched. This seems quite >>>>>>>> > inefficient, and the range deletes will be even less efficient. >>>>>>>> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply >>>>>>>> that Flink >>>>>>>> > has considered sorted state primitives, but I don't see any >>>>>>>> activity on >>>>>>>> > this issue. >>>>>>>> > >>>>>>>> > Is there any way to do this, or should I just add the naive >>>>>>>> implementation? >>>>>>>> > >>>>>>>> > Reuven >>>>>>>> >>>>>>>
