> > Based on what Aljoscha said, storing it in MapState would be better as in > practice the map will be sorted (as long as the key encoding is order > preserving, and on quick glance InstantCoder is order preserving), so I'll > change that
This property only holds for RocksDB based state backend, so we unfortunately can not leverage that. For example HeapStateBackend is backed by a simple HashMap. Best, D. On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax <[email protected]> wrote: > Sounds great! > > I implemented the naive approach, but my current impl is based on > ListState, sorting it every time it's read. Based on what Aljoscha said, > storing it in MapState would be better as in practice the map will be > sorted (as long as the key encoding is order preserving, and on quick > glance InstantCoder is order preserving), so I'll change that. This still > requires fetching the entire list every time, even if the user only issues > a range fetch, but probably good enough for now. Would be cool if Flink > were able to support this natively and efficiently. > > While trying to test this I realized that the Flink runner also didn't > support the OnWindowExpiration callback and all the relevant tests > used that callback. I'll first send a PR implementing OnWindowExpiration. > > To Robert's point - we should definitely add this to the protos and > support it on the portable runners as well! > > Reuven > > On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek <[email protected]> > wrote: > >> Hi, >> >> sorry for the slow response! I was on vacation and only saw this now. >> >> I can't add much more than what David already said: right now, it's not >> possible to do an *efficient* implementation on Flink. The closest Flink >> state type would be MapState, which keeps state in multiple underlying >> RocksDB key/value cells (for the RocksDB state backend). That already keeps >> things sorted, but it's sorted by the serialized representation in RocksDB, >> which might just be the correct sorting for int64 timestamps. We don't >> expose that guarantee, though, because it might not be true for other state >> backends. >> >> It would require convincing the Flink folks to add such a new state type, >> though. >> >> Best, >> Aljoscha >> >> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote: >> > OrderedListState >> > < >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java >> > >> > was >> > added to Beam over a year ago. To date it is only supported by the >> > Dataflow >> > runner and the DirectRunner. I want to see if it's possible to support >> > this >> > well on the Flink runner (and eventually on other runners as well). >> > >> > This is a state type that is indexed by an int64 sorting key (the Beam >> API >> > exposes this as a 64-bit timestamp, as that's the most-common use case, >> but >> > fundamentally it's just an integer). Users can insert element, fetch >> ranges >> > of elements, and delete ranges of elements. >> > >> > Is there any way to implement this on Flink? I could of course add a >> > naive implementation - store everything in a ListState and have the >> Flink >> > runner sort the list every time it is fetched. This seems quite >> > inefficient, and the range deletes will be even less efficient. >> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that >> Flink >> > has considered sorted state primitives, but I don't see any activity on >> > this issue. >> > >> > Is there any way to do this, or should I just add the naive >> implementation? >> > >> > Reuven >> >
