I think, that - under the assumption, that it would be reasonably easy to implement - we should enable sorting by a non-timestamp field. There are cases where we would like to sort by value that is not a timestamp itself, but _correlates_ with the timestamp and especially with the watermark. A typical use-case would be a sequential ID assigned to elements (e.g. Kafka offsets within partition), which can provide stable sorting even for elements with identical timestamps. The sequential ID assigns observer-dependent unique ordering of the elements and as such can be valuable in situations where such stable ordering matters.

 Jan

On 12/22/21 12:56, David Morávek wrote:

    The only thing I'm not sure of is firstKey, lastKey methods on
    SortedMap - could this be implemented efficiently in the rocksdb
    store?


I think some kind of skip scan should be enough to do this.

As for sorting by an arbitrary key, I'd be bit afraid of unlocking this door :) Just remember the long standing discussion about introducing sorting capabilities into the Beam Model. For years the community didn't want to hear anything about sorting and then came up to a conclusion that if you need sorting, you should never need to sort by anything else than a timestamp (eg. for state machines, temporal joins).

Personally I can still imagine some use cases along the lines of building HFiles, but for this case one sorting pass at the flush should be enough, so we might just use some kind of external sorter instead.

D.

On Wed, Dec 1, 2021 at 6:03 PM Reuven Lax <[email protected]> wrote:

    To randomize this even further, if Flink can support arbitrary
    keys, then another approach here would be to implement Java's
    SortedMap API. FetchRange could then be implemented via
    map.subMap(begin, end) and clearRange could be done via
    map.subMap(begin, end).clear(). The only thing I'm not sure of is
    firstKey, lastKey methods on SortedMap - could this be implemented
    efficiently in the rocksdb store?

    On Tue, Nov 30, 2021 at 8:43 PM Reuven Lax <[email protected]> wrote:

        Left a comment - wonder if we should let the list be sorted by
        an arbitrary key. instead of timestamp. Timestamp is likely
        the more-common use case, but I can imagine other use cases
        (e.g. imagining reading log entries and wanting to keep the
        entries sorted by LSN).

        On Mon, Nov 22, 2021 at 1:12 PM David Morávek
        <[email protected]> wrote:

            Hi Reuven,

            I've put together a little POC of the Flink side support
            [1] for OrderListState over the weekend, PTAL. Overall I'd
            say it should be pretty straightforward and backward
            compatible change.

            This effort will definitely require creating a new FLIP
            and making a good case for the Flink community. Would you
            be willing to participate in this contribution?

            [1]
            
https://github.com/dmvk/flink/commit/ecdbc774b13b515e8c0943b2c143fb1e34eca6f0

            Best,
            D.

            On Tue, Nov 16, 2021 at 8:34 PM David Morávek
            <[email protected]> wrote:

                Yes, my intuition is that it should be more or less
                the same. There might be some penalty in terms of
                having more keys in the RocksDB, eg. on the delete
                code path you need to write more tombstones +
                compaction, but other than that I can't think of anything.

                You'd still need to sort all the data in memory after
                reading them, but in this particular case it should
                save some cycles.

                On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax
                <[email protected]> wrote:

                    Right, but no worse than storing in ListState for
                    that case, right?

                    On Tue, Nov 16, 2021 at 10:44 AM David Morávek
                    <[email protected]> wrote:

                            Based on what Aljoscha said, storing it in
                            MapState would be better as in practice
                            the map will be sorted (as long as the
                            key encoding is order preserving, and on
                            quick glance InstantCoder is order
                            preserving), so I'll change that


                        This property only holds for RocksDB based
                        state backend, so we unfortunately can not
                        leverage that. For example HeapStateBackend is
                        backed by a simple HashMap.

                        Best,
                        D.

                        On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax
                        <[email protected]> wrote:

                            Sounds great!

                            I implemented the naive approach, but my
                            current impl is based on
                            ListState, sorting it every time it's
                            read. Based on what Aljoscha said, storing
                            it in MapState would be better as in
                            practice the map will be sorted (as long
                            as the key encoding is order preserving,
                            and on quick glance InstantCoder is order
                            preserving), so I'll change that. This
                            still requires fetching the entire list
                            every time, even if the user only issues a
                            range fetch, but probably good enough for
                            now. Would be cool if Flink were able to
                            support this natively and efficiently.

                            While trying to test this I realized that
                            the Flink runner also didn't support the
                            OnWindowExpiration callback and all the
                            relevant tests used that callback. I'll
                            first send a PR implementing
                            OnWindowExpiration.

                            To Robert's point - we should definitely
                            add this to the protos and support it on
                            the portable runners as well!

                            Reuven

                            On Tue, Nov 16, 2021 at 6:02 AM Aljoscha
                            Krettek <[email protected]> wrote:

                                Hi,

                                sorry for the slow response! I was on
                                vacation and only saw this now.

                                I can't add much more than what David
                                already said: right now, it's not
                                possible to do an *efficient*
                                implementation on Flink. The closest
                                Flink state type would be MapState,
                                which keeps state in multiple
                                underlying RocksDB key/value cells
                                (for the RocksDB state backend). That
                                already keeps things sorted, but it's
                                sorted by the serialized
                                representation in RocksDB, which might
                                just be the correct sorting for int64
                                timestamps. We don't expose that
                                guarantee, though, because it might
                                not be true for other state backends.

                                It would require convincing the Flink
                                folks to add such a new state type,
                                though.

                                Best,
                                Aljoscha

                                On Fri, Nov 12, 2021, at 04:35, Reuven
                                Lax wrote:
                                > OrderedListState
                                >
                                
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java>
                                > was
                                > added to Beam over a year ago. To
                                date it is only supported by the
                                > Dataflow
                                > runner and the DirectRunner. I want
                                to see if it's possible to support
                                > this
                                > well on the Flink runner (and
                                eventually on other runners as well).
                                >
                                > This is a state type that is indexed
                                by an int64 sorting key (the Beam API
                                > exposes this as a 64-bit timestamp,
                                as that's the most-common use case, but
                                > fundamentally it's just an integer).
                                Users can insert element, fetch ranges
                                > of elements, and delete ranges of
                                elements.
                                >
                                > Is there any way to implement this
                                on Flink? I could of course add a
                                > naive implementation - store
                                everything in a ListState and have the
                                Flink
                                > runner sort the list every time it
                                is fetched. This seems quite
                                > inefficient, and the range deletes
                                will be even less efficient.
                                >
                                https://issues.apache.org/jira/browse/FLINK-6219
                                seems to imply that Flink
                                > has considered sorted state
                                primitives, but I don't see any
                                activity on
                                > this issue.
                                >
                                > Is there any way to do this, or
                                should I just add the naive
                                implementation?
                                >
                                > Reuven

Reply via email to