The only thing I'm not sure of is firstKey, lastKey methods on
SortedMap - could this be implemented efficiently in the rocksdb
store?
I think some kind of skip scan should be enough to do this.
As for sorting by an arbitrary key, I'd be bit afraid of unlocking
this door :) Just remember the long standing discussion about
introducing sorting capabilities into the Beam Model. For years the
community didn't want to hear anything about sorting and then came up
to a conclusion that if you need sorting, you should never need to
sort by anything else than a timestamp (eg. for state machines,
temporal joins).
Personally I can still imagine some use cases along the lines of
building HFiles, but for this case one sorting pass at the flush
should be enough, so we might just use some kind of external sorter
instead.
D.
On Wed, Dec 1, 2021 at 6:03 PM Reuven Lax <[email protected]> wrote:
To randomize this even further, if Flink can support arbitrary
keys, then another approach here would be to implement Java's
SortedMap API. FetchRange could then be implemented via
map.subMap(begin, end) and clearRange could be done via
map.subMap(begin, end).clear(). The only thing I'm not sure of is
firstKey, lastKey methods on SortedMap - could this be implemented
efficiently in the rocksdb store?
On Tue, Nov 30, 2021 at 8:43 PM Reuven Lax <[email protected]> wrote:
Left a comment - wonder if we should let the list be sorted by
an arbitrary key. instead of timestamp. Timestamp is likely
the more-common use case, but I can imagine other use cases
(e.g. imagining reading log entries and wanting to keep the
entries sorted by LSN).
On Mon, Nov 22, 2021 at 1:12 PM David Morávek
<[email protected]> wrote:
Hi Reuven,
I've put together a little POC of the Flink side support
[1] for OrderListState over the weekend, PTAL. Overall I'd
say it should be pretty straightforward and backward
compatible change.
This effort will definitely require creating a new FLIP
and making a good case for the Flink community. Would you
be willing to participate in this contribution?
[1]
https://github.com/dmvk/flink/commit/ecdbc774b13b515e8c0943b2c143fb1e34eca6f0
Best,
D.
On Tue, Nov 16, 2021 at 8:34 PM David Morávek
<[email protected]> wrote:
Yes, my intuition is that it should be more or less
the same. There might be some penalty in terms of
having more keys in the RocksDB, eg. on the delete
code path you need to write more tombstones +
compaction, but other than that I can't think of anything.
You'd still need to sort all the data in memory after
reading them, but in this particular case it should
save some cycles.
On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax
<[email protected]> wrote:
Right, but no worse than storing in ListState for
that case, right?
On Tue, Nov 16, 2021 at 10:44 AM David Morávek
<[email protected]> wrote:
Based on what Aljoscha said, storing it in
MapState would be better as in practice
the map will be sorted (as long as the
key encoding is order preserving, and on
quick glance InstantCoder is order
preserving), so I'll change that
This property only holds for RocksDB based
state backend, so we unfortunately can not
leverage that. For example HeapStateBackend is
backed by a simple HashMap.
Best,
D.
On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax
<[email protected]> wrote:
Sounds great!
I implemented the naive approach, but my
current impl is based on
ListState, sorting it every time it's
read. Based on what Aljoscha said, storing
it in MapState would be better as in
practice the map will be sorted (as long
as the key encoding is order preserving,
and on quick glance InstantCoder is order
preserving), so I'll change that. This
still requires fetching the entire list
every time, even if the user only issues a
range fetch, but probably good enough for
now. Would be cool if Flink were able to
support this natively and efficiently.
While trying to test this I realized that
the Flink runner also didn't support the
OnWindowExpiration callback and all the
relevant tests used that callback. I'll
first send a PR implementing
OnWindowExpiration.
To Robert's point - we should definitely
add this to the protos and support it on
the portable runners as well!
Reuven
On Tue, Nov 16, 2021 at 6:02 AM Aljoscha
Krettek <[email protected]> wrote:
Hi,
sorry for the slow response! I was on
vacation and only saw this now.
I can't add much more than what David
already said: right now, it's not
possible to do an *efficient*
implementation on Flink. The closest
Flink state type would be MapState,
which keeps state in multiple
underlying RocksDB key/value cells
(for the RocksDB state backend). That
already keeps things sorted, but it's
sorted by the serialized
representation in RocksDB, which might
just be the correct sorting for int64
timestamps. We don't expose that
guarantee, though, because it might
not be true for other state backends.
It would require convincing the Flink
folks to add such a new state type,
though.
Best,
Aljoscha
On Fri, Nov 12, 2021, at 04:35, Reuven
Lax wrote:
> OrderedListState
>
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java>
> was
> added to Beam over a year ago. To
date it is only supported by the
> Dataflow
> runner and the DirectRunner. I want
to see if it's possible to support
> this
> well on the Flink runner (and
eventually on other runners as well).
>
> This is a state type that is indexed
by an int64 sorting key (the Beam API
> exposes this as a 64-bit timestamp,
as that's the most-common use case, but
> fundamentally it's just an integer).
Users can insert element, fetch ranges
> of elements, and delete ranges of
elements.
>
> Is there any way to implement this
on Flink? I could of course add a
> naive implementation - store
everything in a ListState and have the
Flink
> runner sort the list every time it
is fetched. This seems quite
> inefficient, and the range deletes
will be even less efficient.
>
https://issues.apache.org/jira/browse/FLINK-6219
seems to imply that Flink
> has considered sorted state
primitives, but I don't see any
activity on
> this issue.
>
> Is there any way to do this, or
should I just add the naive
implementation?
>
> Reuven