I have thoughts on the subject of whether to have APIs just for the lowest-level building blocks versus having APIs for higher-level constructs. Specifically this applies to providing only unsorted multimap vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on time-ordered buffer; if it turns out to be easy to go all the way to sorted multimap that's nice-to-have; if it turns out to be easy to implement on top of unsorted map state that should probably be under the hood
Reasons to build low-level multimap in the runner & fn api and layer higher-level things in the SDK: - It is less implementation for runners if they have to only provide fewer lower-level building blocks like multimap state. - There are many more runners than SDKs (and will be even more and more) so this saves overall. Reasons to build higher-level constructs directly in the runner and fn api: - Having multiple higher-level state types may actually be less implementation than one complex state type, especially if they map to runner primitives. - The runner may have better specialized implementations, especially for something like a time-ordered buffer. - The particular access patterns in an SDK-based implementation may not be ideal for each runner's underlying implementation of the low-level building block. - There may be excessive gRPC overhead even for optimal access patterns. There are ways to have best of both worlds, like: 1. Define multiple state types according to fundamental access patterns, like we did this before portability. 2. If it is easy to layer one on top of the other, do that inside the runner. Provide shared code so for runners providing the lowest-level primitive they get all the types for free. I understand that this is an oversimplification. It still creates some more work. And APIs are a burden so it is good to introduce as few as possible for maintenance. But it has performance benefits and also unblocks "just doing the simple and useful thing now" which I always like to do as long as it is compatible with future changes. If the APIs are fundamental, like sets, maps, timestamp ordering, then it is safe to guess that they will change rarely and be useful forever. Kenn On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com> wrote: > I would be glad to take a stab at how to provide sorting on top of > unsorted multimap state. > Based upon your description, you want integer keys representing timestamps > and arbitrary user value for the values, is that correct? > What kinds of operations do you need on the sorted map state in order of > efficiency requirements? > (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y)) > What kinds of operations do we expect the underlying unsorted map state to > be able to provide? > (at a minimum Get(K), Append(K), Clear(K) but what else e.g. enumerate(K)?) > > I went through a similar exercise of how to provide a list like side input > view over a multimap[1] side input which efficiently allowed computation of > size and provided random access while only having access to get(K) and > enumerate K's. > > 1: > https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568 > > On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com> wrote: > >> Bringing this subject up again, >> >> I've spent some time looking into implementing this for the Dataflow >> runner. I'm unable to find a way to implement the arbitrary sorted multimap >> efficiently for the case where there are large numbers of unique keys. >> Since the primary driving use case is timestamp ordering (i.e. key is event >> timestamp), you would expect to have nearly a new key per element. I >> considered Luke's suggestion above, but unfortunately it doesn't really >> solve this issue. >> >> The primary use case for sorting always seems to be sorting by timestamp. >> I want to propose that instead of building the fully-general sorted >> multimap, we instead focus on a state type where the sort key is an >> integral type (like a timestamp or an integer). There is still a valid use >> case for multimap, but we can provide that as an unordered state. At least >> for Dataflow, it will be much easier >> >> While my difficulties here may be specific to the Dataflow runner, any >> such support would have to be built into other runners as well, and >> limiting to integral sorting likely makes it easier for other runners to >> implement this. Also, if you look at this >> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> >> Flink >> comment pointed out by Aljoscha, for Flink the main use case identified was >> also timestamp sorting. This will also simplify the API design for this >> feature: Sorted multimap with arbitrary keys would require us to introduce >> a way of mapping natural ordering to encoded ordering (i.e. a new >> OrderPreservingCoder), but if we limit sort keys to integral types, the API >> design is simpler as integral types can be represented directly. >> >> Reuven >> >> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com> wrote: >> >>> This sounds to me like a potential runner strategy. However if a runner >>> can natively support sorted maps (e.g. we expect the Dataflow runner to be >>> able to do so, and I think it would be useful for other runners as well), >>> then it's probably preferable to allow the runner to use its native >>> capabilities. >>> >>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> For the API that you proposed, the map key is always "void" and the >>>> sort key == user key. So in my example of >>>> key: dummy value >>>> key.000: token, (0001, value4) >>>> key.001: token, (0010, value1), (0011, value2) >>>> key.01: token >>>> key.1: token, (1011, value3) >>>> you would have: >>>> "void": dummy value >>>> "void".000: token, (0001, value4) >>>> "void".001: token, (0010, value1), (0011, value2) >>>> "void".01: token >>>> "void".1: token, (1011, value3) >>>> >>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking the >>>> the prefixes until you find a common prefix for K and then filter for >>>> values where they have a sort key <= K. Using the example above, to find >>>> entriesUntil(0010) you would: >>>> look for key."", miss >>>> look for key.0, miss >>>> look for key.00, miss >>>> look for key.000, hit, sort all contained values using secondary key, >>>> provide value4 to user >>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort >>>> all contained values using secondary key, filter out value2 and provide >>>> value1 >>>> >>>> void removeUntil(K limit) also translates into walking the prefixes but >>>> instead we will clear them when we have a "hit" with some special logic for >>>> when the sort key is a prefix of the key. Used the example, to >>>> removeUntil(0010) you would: >>>> look for key."", miss >>>> look for key.0, miss >>>> look for key.00, miss >>>> look for key.000, hit, clear >>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort >>>> all contained values using secondary key, store in memory all values that > >>>> 0010, clear and append values stored in memory. >>>> >>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Can you explain how fetching and deleting ranges of keys would work >>>>> with this data structure? >>>>> >>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> Reuven, for the example, I assume that we never want to store more >>>>>> then 2 values at a given sort key prefix, and if we do then we will >>>>>> create >>>>>> a new longer prefix splitting up the values based upon the sort key. >>>>>> >>>>>> Tuple representation in examples below is (key, sort key, value) and >>>>>> . is a character outside of the alphabet which can be represented by >>>>>> using >>>>>> an escaping encoding that wraps the key + sort key encoding. >>>>>> >>>>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes of >>>>>> 0010 finding one that is not empty. In this case its 0, so we append >>>>>> value >>>>>> to the map at key.0 ending up with (we also set the key to any dummy >>>>>> value >>>>>> to know that it it contains values): >>>>>> key: dummy value >>>>>> key."": token, (0010, value1) >>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all the >>>>>> prefixes of 0010, finding "", so we append value2 to key."" ending up >>>>>> with: >>>>>> key: dummy value >>>>>> key."": token, (0010, value1), (0011, value2) >>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all the >>>>>> prefixes of 1011 finding "" but notice that it is full, so we partition >>>>>> all >>>>>> the values into two prefixes 0 and 1. We also clear the "" prefix ending >>>>>> up >>>>>> with: >>>>>> key: dummy value >>>>>> key.0: token, (0010, value1), (0011, value2) >>>>>> key.1: token, (1011, value3) >>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all the >>>>>> prefixes of the value finding 0 but notice that it is full, so we >>>>>> partition >>>>>> all the values into two prefixes 00 and 01 but notice this doesn't help >>>>>> us >>>>>> since 00 will be too full so we split 00 again to 000, 001. We also clear >>>>>> the 0 prefix ending up with: >>>>>> key: dummy value >>>>>> key.000: token, (0001, value4) >>>>>> key.001: token, (0010, value1), (0011, value2) >>>>>> key.01: token >>>>>> key.1: token, (1011, value3) >>>>>> >>>>>> We are effectively building a trie[1] where we only have values at >>>>>> the leaves and control how full each leaf can be. There are other trie >>>>>> representations like a radix tree that may be better. >>>>>> >>>>>> Looking up the values in sorted order for "key" would go like this: >>>>>> Is key set, yes >>>>>> look for key."", miss >>>>>> look for key.0, miss >>>>>> look for key.00, miss >>>>>> look for key.000, hit, sort all contained values using secondary key, >>>>>> provide value4 to user >>>>>> look for key.001, hit, sort all contained values using secondary key, >>>>>> provide value1 followed by value2 to user >>>>>> look for key.01, hit, empty, return no values to user >>>>>> look for key.1, hit, sort all contained values using secondary key, >>>>>> provide value3 to user >>>>>> we have walked the entire prefix space, signal end of iterable >>>>>> >>>>>> Some notes for the above: >>>>>> * The dummy value is used to know that the key contains values and >>>>>> the token is to know whether there are any values deeper in the trie so >>>>>> when we know when to stop searching. >>>>>> * If we can recalculate the sort key from the combination of the key >>>>>> and value, then we don't need to store it. >>>>>> * Keys with lots of values will perform worse then keys with less >>>>>> values since we have to look up more keys but they will be empty reads. >>>>>> The >>>>>> number of misses can be controlled by how many elements we are willing to >>>>>> store at a given node before we subdivide. >>>>>> >>>>>> In reality you could build a lot of structures (e.g. red black tree, >>>>>> binary tree) using the sort key, the issue is the cost of >>>>>> rebalancing/re-organizing the structure in map form and whether it has a >>>>>> convenient pre-order traversal for lookups. >>>>>> >>>>>> >>>>>> >>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Some great comments! >>>>>>> >>>>>>> *Aljoscha*: absolutely this would have to be implemented by runners >>>>>>> to be efficient. We can of course provide a default (inefficient) >>>>>>> implementation, but ideally runners would provide better ones. >>>>>>> >>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by this. >>>>>>> E.g. >>>>>>> >>>>>>> *Robert* Great point about standard coders not satisfying this. >>>>>>> That's why I suggested that we provide a way to tag the coders that do >>>>>>> preserve order, and only accept those as key coders Alternatively we >>>>>>> could >>>>>>> present a more limited API - e.g. only allowing a hard-coded set of >>>>>>> types >>>>>>> to be used as keys - but that seems counter to the direction Beam >>>>>>> usually >>>>>>> goes. So users will have two ways .of creating multimap state specs: >>>>>>> >>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>> >>>>>>> or >>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>> >>>>>>> The second one will validate that the key coder preserves order, and >>>>>>> fails otherwise (similar to coder determinism checking in GroupByKey). >>>>>>> (BTW >>>>>>> we would also have versions of these functions that use coder inference >>>>>>> to >>>>>>> "guess" the coder, but those will do the same checking) >>>>>>> >>>>>>> Also the API I proposed did support random access! We could separate >>>>>>> out OrderedBagState again if we think the use cases are fundamentally >>>>>>> different. I merged the proposal into that of MultimapState because >>>>>>> there >>>>>>> seemed be 99% overlap. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw <rober...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <re...@google.com> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <al...@google.com> >>>>>>>> wrote: >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <ruw...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> A few obvious problems with this code: >>>>>>>> >>>>> 1. Removing the elements already processed from the bag >>>>>>>> requires clearing and rewriting the entire bag. This is O(n^2) in the >>>>>>>> number of input trades. >>>>>>>> >>>> >>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade state? >>>>>>>> >>>> >>>>>>>> >>>>> >>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State { >>>>>>>> >>>>> // Add a value to the map. >>>>>>>> >>>>> void put(K key, V value); >>>>>>>> >>>>> // Get all values for a given key. >>>>>>>> >>>>> ReadableState<Iterable<V>> get(K key); >>>>>>>> >>>>> // Return all entries in the map. >>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> allEntries(); >>>>>>>> >>>>> // Return all entries in the map with keys <= limit. >>>>>>>> returned elements are sorted by the key. >>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit); >>>>>>>> >>>>> >>>>>>>> >>>>> // Remove all values with the given key; >>>>>>>> >>>>> void remove(K key); >>>>>>>> >>>>> // Remove all entries in the map with keys <= limit. >>>>>>>> >>>>> void removeUntil(K limit); >>>>>>>> >>>> >>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove all >>>>>>>> entries in the map with keys < limit. >>>>>>>> >>>> >>>>>>>> >>>>> >>>>>>>> >>>>> Runners will sort based on the encoded value of the key. In >>>>>>>> order to make this easier for users, I propose that we introduce a new >>>>>>>> tag >>>>>>>> on Coders PreservesOrder. A Coder that contains this tag guarantees >>>>>>>> that >>>>>>>> the encoded value preserves the same ordering as the base Java type. >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> >>>> Could you clarify what is "encoded value preserves the same >>>>>>>> ordering as the base Java type"? >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> Lets say A and B represent two different instances of the same >>>>>>>> Java type like a double, then A < B (using the languages comparison >>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are >>>>>>>> compared >>>>>>>> lexicographically) >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff e(A) >>>>>>>> < e(P) property to hold for all languages we support? What happens A, B >>>>>>>> sort differently in different languages? >>>>>>>> > >>>>>>>> > >>>>>>>> > That would have to be the property of the coder (which means that >>>>>>>> this property probably needs to be represented in the portability >>>>>>>> representation of the coder). I imagine the common use cases will be >>>>>>>> for >>>>>>>> simple coders like int, long, string, etc., which are likely to sort >>>>>>>> the >>>>>>>> same in most languages. >>>>>>>> >>>>>>>> The standard coders for both double and integral types do not >>>>>>>> respect >>>>>>>> the natural ordering (consider negative values). KV coders violate >>>>>>>> the >>>>>>>> "natural" lexicographic ordering on components as well. I think >>>>>>>> implicitly sorting on encoded value would yield many surprises. (The >>>>>>>> state, of course, could take a order-preserving, bytes >>>>>>>> (string?)-producing callable as a parameter of course). (As for >>>>>>>> naming, I'd probably call this OrderedBagState or something like >>>>>>>> that...rather than Map which tends to imply random access.) >>>>>>>> >>>>>>>