It might help for me to describe what I have in mind. I'm still proposing that we build multimap, just not a globally-sorted multimap.
My previous proposal was that we provide a Multimap<Key, Value> state type, sorted by key. this would have two additional operations - multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey, endKey). The primary use case was timestamp sorting, but I felt that a sorted multimap provided a nice generalization - after all, you can simply key the multimap by timestamp to get timestamp sorting. This approach had some issues immediately that would take some work to solve. Since a multimap key can have any type and a runner will only be able to sort by encoded type, we would need to introduce a concept of order-preserving coders into Beam and plumb that through. Robert pointed out that even our existing standard coders for simple integral types don't preserve order, so there will likely be surprises here. My current proposal is for a multimap that is not sorted by key, but that can support.ordered values for a single key. Remember that a multimap maps K -> Iterable<V>, so this means that each individual Iterable<V> is ordered, but the keys have no specific order relative to each other. This is not too different from many multimap implementations where the keys are unordered, but the list of values for a single key at least has a stable order. The interface would look like this: public interface MultimapState<K, V> extends State { // Add a value with a default timestamp. void put(K key, V value); // Add a timestamped value. void put(K, key, TimestampedValue<V> value); // Remove all values for a key. void remove (K key); // Remove all values for a key with timestamps within the specified range. void removeRange(K key, Instant startTs, Instant endTs); // Get an Iterable of values for V. The Iterable will be returned sorted by timestamp. ReadableState<Iterable<TimestampedValue<V>>> get(K key); // Get an Iterable of values for V in the specified range. The Iterable will be returned sorted by timestamp. ReadableState<Iterable<TimestampedValue<V>>> getRange(K key, Instant startTs, Instant endTs); ReadableState<Iterable<K>> keys(); ReadableState<Iterable<TimestampedValue<V>>> values(); ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries; } We can of course provide helper functions that allow using MultimapState without deailing with TimestampValue for users who only want a multimap and don't want sorting. I think many users will only need a single sorted list - not a full multimap. It's worth offering this as well, and we can simply build it on top of MultimapState. It will look like an extension of BagState public interface TimestampSortedListState<T> extends State { void add(TimestampedValue<T> value); Iterable<TimestampedValue<T>> read(); Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant endTs); void clearRange(Instant startTs, Instant endTs); } On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com> wrote: > The portability layer is meant to live across multiple versions of Beam > and I don't think it should be treated by doing the simple and useful thing > now since I believe it will lead to a proliferation of the API. > > On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <k...@apache.org> wrote: > >> I have thoughts on the subject of whether to have APIs just for the >> lowest-level building blocks versus having APIs for higher-level >> constructs. Specifically this applies to providing only unsorted multimap >> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on >> time-ordered buffer; if it turns out to be easy to go all the way to sorted >> multimap that's nice-to-have; if it turns out to be easy to implement on >> top of unsorted map state that should probably be under the hood >> >> Reasons to build low-level multimap in the runner & fn api and layer >> higher-level things in the SDK: >> >> - It is less implementation for runners if they have to only provide >> fewer lower-level building blocks like multimap state. >> - There are many more runners than SDKs (and will be even more and more) >> so this saves overall. >> >> Reasons to build higher-level constructs directly in the runner and fn >> api: >> >> - Having multiple higher-level state types may actually be less >> implementation than one complex state type, especially if they map to >> runner primitives. >> - The runner may have better specialized implementations, especially for >> something like a time-ordered buffer. >> - The particular access patterns in an SDK-based implementation may not >> be ideal for each runner's underlying implementation of the low-level >> building block. >> - There may be excessive gRPC overhead even for optimal access patterns. >> >> There are ways to have best of both worlds, like: >> >> 1. Define multiple state types according to fundamental access patterns, >> like we did this before portability. >> 2. If it is easy to layer one on top of the other, do that inside the >> runner. Provide shared code so for runners providing the lowest-level >> primitive they get all the types for free. >> >> I understand that this is an oversimplification. It still creates some >> more work. And APIs are a burden so it is good to introduce as few as >> possible for maintenance. But it has performance benefits and also unblocks >> "just doing the simple and useful thing now" which I always like to do as >> long as it is compatible with future changes. If the APIs are fundamental, >> like sets, maps, timestamp ordering, then it is safe to guess that they >> will change rarely and be useful forever. >> >> Kenn >> >> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com> wrote: >> >>> I would be glad to take a stab at how to provide sorting on top of >>> unsorted multimap state. >>> Based upon your description, you want integer keys representing >>> timestamps and arbitrary user value for the values, is that correct? >>> What kinds of operations do you need on the sorted map state in order of >>> efficiency requirements? >>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y)) >>> What kinds of operations do we expect the underlying unsorted map state >>> to be able to provide? >>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g. >>> enumerate(K)?) >>> >>> I went through a similar exercise of how to provide a list like side >>> input view over a multimap[1] side input which efficiently allowed >>> computation of size and provided random access while only having access to >>> get(K) and enumerate K's. >>> >>> 1: >>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568 >>> >>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com> wrote: >>> >>>> Bringing this subject up again, >>>> >>>> I've spent some time looking into implementing this for the Dataflow >>>> runner. I'm unable to find a way to implement the arbitrary sorted multimap >>>> efficiently for the case where there are large numbers of unique keys. >>>> Since the primary driving use case is timestamp ordering (i.e. key is event >>>> timestamp), you would expect to have nearly a new key per element. I >>>> considered Luke's suggestion above, but unfortunately it doesn't really >>>> solve this issue. >>>> >>>> The primary use case for sorting always seems to be sorting by >>>> timestamp. I want to propose that instead of building the fully-general >>>> sorted multimap, we instead focus on a state type where the sort key is an >>>> integral type (like a timestamp or an integer). There is still a valid use >>>> case for multimap, but we can provide that as an unordered state. At least >>>> for Dataflow, it will be much easier >>>> >>>> While my difficulties here may be specific to the Dataflow runner, any >>>> such support would have to be built into other runners as well, and >>>> limiting to integral sorting likely makes it easier for other runners to >>>> implement this. Also, if you look at this >>>> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> >>>> Flink >>>> comment pointed out by Aljoscha, for Flink the main use case identified was >>>> also timestamp sorting. This will also simplify the API design for this >>>> feature: Sorted multimap with arbitrary keys would require us to introduce >>>> a way of mapping natural ordering to encoded ordering (i.e. a new >>>> OrderPreservingCoder), but if we limit sort keys to integral types, the API >>>> design is simpler as integral types can be represented directly. >>>> >>>> Reuven >>>> >>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> This sounds to me like a potential runner strategy. However if a >>>>> runner can natively support sorted maps (e.g. we expect the Dataflow >>>>> runner >>>>> to be able to do so, and I think it would be useful for other runners as >>>>> well), then it's probably preferable to allow the runner to use its native >>>>> capabilities. >>>>> >>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> For the API that you proposed, the map key is always "void" and the >>>>>> sort key == user key. So in my example of >>>>>> key: dummy value >>>>>> key.000: token, (0001, value4) >>>>>> key.001: token, (0010, value1), (0011, value2) >>>>>> key.01: token >>>>>> key.1: token, (1011, value3) >>>>>> you would have: >>>>>> "void": dummy value >>>>>> "void".000: token, (0001, value4) >>>>>> "void".001: token, (0010, value1), (0011, value2) >>>>>> "void".01: token >>>>>> "void".1: token, (1011, value3) >>>>>> >>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking the >>>>>> the prefixes until you find a common prefix for K and then filter for >>>>>> values where they have a sort key <= K. Using the example above, to find >>>>>> entriesUntil(0010) you would: >>>>>> look for key."", miss >>>>>> look for key.0, miss >>>>>> look for key.00, miss >>>>>> look for key.000, hit, sort all contained values using secondary key, >>>>>> provide value4 to user >>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort >>>>>> all contained values using secondary key, filter out value2 and provide >>>>>> value1 >>>>>> >>>>>> void removeUntil(K limit) also translates into walking the prefixes >>>>>> but instead we will clear them when we have a "hit" with some special >>>>>> logic >>>>>> for when the sort key is a prefix of the key. Used the example, to >>>>>> removeUntil(0010) you would: >>>>>> look for key."", miss >>>>>> look for key.0, miss >>>>>> look for key.00, miss >>>>>> look for key.000, hit, clear >>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort >>>>>> all contained values using secondary key, store in memory all values >>>>>> that > >>>>>> 0010, clear and append values stored in memory. >>>>>> >>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Can you explain how fetching and deleting ranges of keys would work >>>>>>> with this data structure? >>>>>>> >>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Reuven, for the example, I assume that we never want to store more >>>>>>>> then 2 values at a given sort key prefix, and if we do then we will >>>>>>>> create >>>>>>>> a new longer prefix splitting up the values based upon the sort key. >>>>>>>> >>>>>>>> Tuple representation in examples below is (key, sort key, value) >>>>>>>> and . is a character outside of the alphabet which can be represented >>>>>>>> by >>>>>>>> using an escaping encoding that wraps the key + sort key encoding. >>>>>>>> >>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes >>>>>>>> of 0010 finding one that is not empty. In this case its 0, so we append >>>>>>>> value to the map at key.0 ending up with (we also set the key to any >>>>>>>> dummy >>>>>>>> value to know that it it contains values): >>>>>>>> key: dummy value >>>>>>>> key."": token, (0010, value1) >>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all the >>>>>>>> prefixes of 0010, finding "", so we append value2 to key."" ending up >>>>>>>> with: >>>>>>>> key: dummy value >>>>>>>> key."": token, (0010, value1), (0011, value2) >>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all the >>>>>>>> prefixes of 1011 finding "" but notice that it is full, so we >>>>>>>> partition all >>>>>>>> the values into two prefixes 0 and 1. We also clear the "" prefix >>>>>>>> ending up >>>>>>>> with: >>>>>>>> key: dummy value >>>>>>>> key.0: token, (0010, value1), (0011, value2) >>>>>>>> key.1: token, (1011, value3) >>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all the >>>>>>>> prefixes of the value finding 0 but notice that it is full, so we >>>>>>>> partition >>>>>>>> all the values into two prefixes 00 and 01 but notice this doesn't >>>>>>>> help us >>>>>>>> since 00 will be too full so we split 00 again to 000, 001. We also >>>>>>>> clear >>>>>>>> the 0 prefix ending up with: >>>>>>>> key: dummy value >>>>>>>> key.000: token, (0001, value4) >>>>>>>> key.001: token, (0010, value1), (0011, value2) >>>>>>>> key.01: token >>>>>>>> key.1: token, (1011, value3) >>>>>>>> >>>>>>>> We are effectively building a trie[1] where we only have values at >>>>>>>> the leaves and control how full each leaf can be. There are other trie >>>>>>>> representations like a radix tree that may be better. >>>>>>>> >>>>>>>> Looking up the values in sorted order for "key" would go like this: >>>>>>>> Is key set, yes >>>>>>>> look for key."", miss >>>>>>>> look for key.0, miss >>>>>>>> look for key.00, miss >>>>>>>> look for key.000, hit, sort all contained values using secondary >>>>>>>> key, provide value4 to user >>>>>>>> look for key.001, hit, sort all contained values using secondary >>>>>>>> key, provide value1 followed by value2 to user >>>>>>>> look for key.01, hit, empty, return no values to user >>>>>>>> look for key.1, hit, sort all contained values using secondary key, >>>>>>>> provide value3 to user >>>>>>>> we have walked the entire prefix space, signal end of iterable >>>>>>>> >>>>>>>> Some notes for the above: >>>>>>>> * The dummy value is used to know that the key contains values and >>>>>>>> the token is to know whether there are any values deeper in the trie so >>>>>>>> when we know when to stop searching. >>>>>>>> * If we can recalculate the sort key from the combination of the >>>>>>>> key and value, then we don't need to store it. >>>>>>>> * Keys with lots of values will perform worse then keys with less >>>>>>>> values since we have to look up more keys but they will be empty >>>>>>>> reads. The >>>>>>>> number of misses can be controlled by how many elements we are willing >>>>>>>> to >>>>>>>> store at a given node before we subdivide. >>>>>>>> >>>>>>>> In reality you could build a lot of structures (e.g. red black >>>>>>>> tree, binary tree) using the sort key, the issue is the cost of >>>>>>>> rebalancing/re-organizing the structure in map form and whether it has >>>>>>>> a >>>>>>>> convenient pre-order traversal for lookups. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <re...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Some great comments! >>>>>>>>> >>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by >>>>>>>>> runners to be efficient. We can of course provide a default >>>>>>>>> (inefficient) >>>>>>>>> implementation, but ideally runners would provide better ones. >>>>>>>>> >>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by this. >>>>>>>>> E.g. >>>>>>>>> >>>>>>>>> *Robert* Great point about standard coders not satisfying this. >>>>>>>>> That's why I suggested that we provide a way to tag the coders that do >>>>>>>>> preserve order, and only accept those as key coders Alternatively we >>>>>>>>> could >>>>>>>>> present a more limited API - e.g. only allowing a hard-coded set of >>>>>>>>> types >>>>>>>>> to be used as keys - but that seems counter to the direction Beam >>>>>>>>> usually >>>>>>>>> goes. So users will have two ways .of creating multimap state specs: >>>>>>>>> >>>>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>>>> >>>>>>>>> or >>>>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>>>> >>>>>>>>> The second one will validate that the key coder preserves order, >>>>>>>>> and fails otherwise (similar to coder determinism checking in >>>>>>>>> GroupByKey). >>>>>>>>> (BTW we would also have versions of these functions that use coder >>>>>>>>> inference to "guess" the coder, but those will do the same checking) >>>>>>>>> >>>>>>>>> Also the API I proposed did support random access! We could >>>>>>>>> separate out OrderedBagState again if we think the use cases are >>>>>>>>> fundamentally different. I merged the proposal into that of >>>>>>>>> MultimapState >>>>>>>>> because there seemed be 99% overlap. >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw < >>>>>>>>> rober...@google.com> wrote: >>>>>>>>> >>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> > >>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <al...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <lc...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <ruw...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> A few obvious problems with this code: >>>>>>>>>> >>>>> 1. Removing the elements already processed from the bag >>>>>>>>>> requires clearing and rewriting the entire bag. This is O(n^2) in the >>>>>>>>>> number of input trades. >>>>>>>>>> >>>> >>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade state? >>>>>>>>>> >>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State { >>>>>>>>>> >>>>> // Add a value to the map. >>>>>>>>>> >>>>> void put(K key, V value); >>>>>>>>>> >>>>> // Get all values for a given key. >>>>>>>>>> >>>>> ReadableState<Iterable<V>> get(K key); >>>>>>>>>> >>>>> // Return all entries in the map. >>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> allEntries(); >>>>>>>>>> >>>>> // Return all entries in the map with keys <= limit. >>>>>>>>>> returned elements are sorted by the key. >>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit); >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> // Remove all values with the given key; >>>>>>>>>> >>>>> void remove(K key); >>>>>>>>>> >>>>> // Remove all entries in the map with keys <= limit. >>>>>>>>>> >>>>> void removeUntil(K limit); >>>>>>>>>> >>>> >>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove >>>>>>>>>> all entries in the map with keys < limit. >>>>>>>>>> >>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Runners will sort based on the encoded value of the key. In >>>>>>>>>> order to make this easier for users, I propose that we introduce a >>>>>>>>>> new tag >>>>>>>>>> on Coders PreservesOrder. A Coder that contains this tag guarantees >>>>>>>>>> that >>>>>>>>>> the encoded value preserves the same ordering as the base Java type. >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> Could you clarify what is "encoded value preserves the same >>>>>>>>>> ordering as the base Java type"? >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> Lets say A and B represent two different instances of the >>>>>>>>>> same Java type like a double, then A < B (using the languages >>>>>>>>>> comparison >>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are >>>>>>>>>> compared >>>>>>>>>> lexicographically) >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff >>>>>>>>>> e(A) < e(P) property to hold for all languages we support? What >>>>>>>>>> happens A, >>>>>>>>>> B sort differently in different languages? >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > That would have to be the property of the coder (which means >>>>>>>>>> that this property probably needs to be represented in the >>>>>>>>>> portability >>>>>>>>>> representation of the coder). I imagine the common use cases will be >>>>>>>>>> for >>>>>>>>>> simple coders like int, long, string, etc., which are likely to sort >>>>>>>>>> the >>>>>>>>>> same in most languages. >>>>>>>>>> >>>>>>>>>> The standard coders for both double and integral types do not >>>>>>>>>> respect >>>>>>>>>> the natural ordering (consider negative values). KV coders >>>>>>>>>> violate the >>>>>>>>>> "natural" lexicographic ordering on components as well. I think >>>>>>>>>> implicitly sorting on encoded value would yield many surprises. >>>>>>>>>> (The >>>>>>>>>> state, of course, could take a order-preserving, bytes >>>>>>>>>> (string?)-producing callable as a parameter of course). (As for >>>>>>>>>> naming, I'd probably call this OrderedBagState or something like >>>>>>>>>> that...rather than Map which tends to imply random access.) >>>>>>>>>> >>>>>>>>>