The portability layer is meant to live across multiple versions of Beam and I don't think it should be treated by doing the simple and useful thing now since I believe it will lead to a proliferation of the API.
On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <[email protected]> wrote: > I have thoughts on the subject of whether to have APIs just for the > lowest-level building blocks versus having APIs for higher-level > constructs. Specifically this applies to providing only unsorted multimap > vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on > time-ordered buffer; if it turns out to be easy to go all the way to sorted > multimap that's nice-to-have; if it turns out to be easy to implement on > top of unsorted map state that should probably be under the hood > > Reasons to build low-level multimap in the runner & fn api and layer > higher-level things in the SDK: > > - It is less implementation for runners if they have to only provide > fewer lower-level building blocks like multimap state. > - There are many more runners than SDKs (and will be even more and more) > so this saves overall. > > Reasons to build higher-level constructs directly in the runner and fn api: > > - Having multiple higher-level state types may actually be less > implementation than one complex state type, especially if they map to > runner primitives. > - The runner may have better specialized implementations, especially for > something like a time-ordered buffer. > - The particular access patterns in an SDK-based implementation may not > be ideal for each runner's underlying implementation of the low-level > building block. > - There may be excessive gRPC overhead even for optimal access patterns. > > There are ways to have best of both worlds, like: > > 1. Define multiple state types according to fundamental access patterns, > like we did this before portability. > 2. If it is easy to layer one on top of the other, do that inside the > runner. Provide shared code so for runners providing the lowest-level > primitive they get all the types for free. > > I understand that this is an oversimplification. It still creates some > more work. And APIs are a burden so it is good to introduce as few as > possible for maintenance. But it has performance benefits and also unblocks > "just doing the simple and useful thing now" which I always like to do as > long as it is compatible with future changes. If the APIs are fundamental, > like sets, maps, timestamp ordering, then it is safe to guess that they > will change rarely and be useful forever. > > Kenn > > On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <[email protected]> wrote: > >> I would be glad to take a stab at how to provide sorting on top of >> unsorted multimap state. >> Based upon your description, you want integer keys representing >> timestamps and arbitrary user value for the values, is that correct? >> What kinds of operations do you need on the sorted map state in order of >> efficiency requirements? >> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y)) >> What kinds of operations do we expect the underlying unsorted map state >> to be able to provide? >> (at a minimum Get(K), Append(K), Clear(K) but what else e.g. >> enumerate(K)?) >> >> I went through a similar exercise of how to provide a list like side >> input view over a multimap[1] side input which efficiently allowed >> computation of size and provided random access while only having access to >> get(K) and enumerate K's. >> >> 1: >> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568 >> >> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <[email protected]> wrote: >> >>> Bringing this subject up again, >>> >>> I've spent some time looking into implementing this for the Dataflow >>> runner. I'm unable to find a way to implement the arbitrary sorted multimap >>> efficiently for the case where there are large numbers of unique keys. >>> Since the primary driving use case is timestamp ordering (i.e. key is event >>> timestamp), you would expect to have nearly a new key per element. I >>> considered Luke's suggestion above, but unfortunately it doesn't really >>> solve this issue. >>> >>> The primary use case for sorting always seems to be sorting by >>> timestamp. I want to propose that instead of building the fully-general >>> sorted multimap, we instead focus on a state type where the sort key is an >>> integral type (like a timestamp or an integer). There is still a valid use >>> case for multimap, but we can provide that as an unordered state. At least >>> for Dataflow, it will be much easier >>> >>> While my difficulties here may be specific to the Dataflow runner, any >>> such support would have to be built into other runners as well, and >>> limiting to integral sorting likely makes it easier for other runners to >>> implement this. Also, if you look at this >>> <https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> >>> Flink >>> comment pointed out by Aljoscha, for Flink the main use case identified was >>> also timestamp sorting. This will also simplify the API design for this >>> feature: Sorted multimap with arbitrary keys would require us to introduce >>> a way of mapping natural ordering to encoded ordering (i.e. a new >>> OrderPreservingCoder), but if we limit sort keys to integral types, the API >>> design is simpler as integral types can be represented directly. >>> >>> Reuven >>> >>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <[email protected]> wrote: >>> >>>> This sounds to me like a potential runner strategy. However if a runner >>>> can natively support sorted maps (e.g. we expect the Dataflow runner to be >>>> able to do so, and I think it would be useful for other runners as well), >>>> then it's probably preferable to allow the runner to use its native >>>> capabilities. >>>> >>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> For the API that you proposed, the map key is always "void" and the >>>>> sort key == user key. So in my example of >>>>> key: dummy value >>>>> key.000: token, (0001, value4) >>>>> key.001: token, (0010, value1), (0011, value2) >>>>> key.01: token >>>>> key.1: token, (1011, value3) >>>>> you would have: >>>>> "void": dummy value >>>>> "void".000: token, (0001, value4) >>>>> "void".001: token, (0010, value1), (0011, value2) >>>>> "void".01: token >>>>> "void".1: token, (1011, value3) >>>>> >>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into walking the >>>>> the prefixes until you find a common prefix for K and then filter for >>>>> values where they have a sort key <= K. Using the example above, to find >>>>> entriesUntil(0010) you would: >>>>> look for key."", miss >>>>> look for key.0, miss >>>>> look for key.00, miss >>>>> look for key.000, hit, sort all contained values using secondary key, >>>>> provide value4 to user >>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort >>>>> all contained values using secondary key, filter out value2 and provide >>>>> value1 >>>>> >>>>> void removeUntil(K limit) also translates into walking the prefixes >>>>> but instead we will clear them when we have a "hit" with some special >>>>> logic >>>>> for when the sort key is a prefix of the key. Used the example, to >>>>> removeUntil(0010) you would: >>>>> look for key."", miss >>>>> look for key.0, miss >>>>> look for key.00, miss >>>>> look for key.000, hit, clear >>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort >>>>> all contained values using secondary key, store in memory all values that >>>>> > >>>>> 0010, clear and append values stored in memory. >>>>> >>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> Can you explain how fetching and deleting ranges of keys would work >>>>>> with this data structure? >>>>>> >>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> Reuven, for the example, I assume that we never want to store more >>>>>>> then 2 values at a given sort key prefix, and if we do then we will >>>>>>> create >>>>>>> a new longer prefix splitting up the values based upon the sort key. >>>>>>> >>>>>>> Tuple representation in examples below is (key, sort key, value) and >>>>>>> . is a character outside of the alphabet which can be represented by >>>>>>> using >>>>>>> an escaping encoding that wraps the key + sort key encoding. >>>>>>> >>>>>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes of >>>>>>> 0010 finding one that is not empty. In this case its 0, so we append >>>>>>> value >>>>>>> to the map at key.0 ending up with (we also set the key to any dummy >>>>>>> value >>>>>>> to know that it it contains values): >>>>>>> key: dummy value >>>>>>> key."": token, (0010, value1) >>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + all the >>>>>>> prefixes of 0010, finding "", so we append value2 to key."" ending up >>>>>>> with: >>>>>>> key: dummy value >>>>>>> key."": token, (0010, value1), (0011, value2) >>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + all the >>>>>>> prefixes of 1011 finding "" but notice that it is full, so we partition >>>>>>> all >>>>>>> the values into two prefixes 0 and 1. We also clear the "" prefix >>>>>>> ending up >>>>>>> with: >>>>>>> key: dummy value >>>>>>> key.0: token, (0010, value1), (0011, value2) >>>>>>> key.1: token, (1011, value3) >>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + all the >>>>>>> prefixes of the value finding 0 but notice that it is full, so we >>>>>>> partition >>>>>>> all the values into two prefixes 00 and 01 but notice this doesn't help >>>>>>> us >>>>>>> since 00 will be too full so we split 00 again to 000, 001. We also >>>>>>> clear >>>>>>> the 0 prefix ending up with: >>>>>>> key: dummy value >>>>>>> key.000: token, (0001, value4) >>>>>>> key.001: token, (0010, value1), (0011, value2) >>>>>>> key.01: token >>>>>>> key.1: token, (1011, value3) >>>>>>> >>>>>>> We are effectively building a trie[1] where we only have values at >>>>>>> the leaves and control how full each leaf can be. There are other trie >>>>>>> representations like a radix tree that may be better. >>>>>>> >>>>>>> Looking up the values in sorted order for "key" would go like this: >>>>>>> Is key set, yes >>>>>>> look for key."", miss >>>>>>> look for key.0, miss >>>>>>> look for key.00, miss >>>>>>> look for key.000, hit, sort all contained values using secondary >>>>>>> key, provide value4 to user >>>>>>> look for key.001, hit, sort all contained values using secondary >>>>>>> key, provide value1 followed by value2 to user >>>>>>> look for key.01, hit, empty, return no values to user >>>>>>> look for key.1, hit, sort all contained values using secondary key, >>>>>>> provide value3 to user >>>>>>> we have walked the entire prefix space, signal end of iterable >>>>>>> >>>>>>> Some notes for the above: >>>>>>> * The dummy value is used to know that the key contains values and >>>>>>> the token is to know whether there are any values deeper in the trie so >>>>>>> when we know when to stop searching. >>>>>>> * If we can recalculate the sort key from the combination of the key >>>>>>> and value, then we don't need to store it. >>>>>>> * Keys with lots of values will perform worse then keys with less >>>>>>> values since we have to look up more keys but they will be empty reads. >>>>>>> The >>>>>>> number of misses can be controlled by how many elements we are willing >>>>>>> to >>>>>>> store at a given node before we subdivide. >>>>>>> >>>>>>> In reality you could build a lot of structures (e.g. red black tree, >>>>>>> binary tree) using the sort key, the issue is the cost of >>>>>>> rebalancing/re-organizing the structure in map form and whether it has a >>>>>>> convenient pre-order traversal for lookups. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax <[email protected]> wrote: >>>>>>> >>>>>>>> Some great comments! >>>>>>>> >>>>>>>> *Aljoscha*: absolutely this would have to be implemented by >>>>>>>> runners to be efficient. We can of course provide a default >>>>>>>> (inefficient) >>>>>>>> implementation, but ideally runners would provide better ones. >>>>>>>> >>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by this. >>>>>>>> E.g. >>>>>>>> >>>>>>>> *Robert* Great point about standard coders not satisfying this. >>>>>>>> That's why I suggested that we provide a way to tag the coders that do >>>>>>>> preserve order, and only accept those as key coders Alternatively we >>>>>>>> could >>>>>>>> present a more limited API - e.g. only allowing a hard-coded set of >>>>>>>> types >>>>>>>> to be used as keys - but that seems counter to the direction Beam >>>>>>>> usually >>>>>>>> goes. So users will have two ways .of creating multimap state specs: >>>>>>>> >>>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>>> >>>>>>>> or >>>>>>>> private final StateSpec<MultimapState<Long, String>> state = >>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of()); >>>>>>>> >>>>>>>> The second one will validate that the key coder preserves order, >>>>>>>> and fails otherwise (similar to coder determinism checking in >>>>>>>> GroupByKey). >>>>>>>> (BTW we would also have versions of these functions that use coder >>>>>>>> inference to "guess" the coder, but those will do the same checking) >>>>>>>> >>>>>>>> Also the API I proposed did support random access! We could >>>>>>>> separate out OrderedBagState again if we think the use cases are >>>>>>>> fundamentally different. I merged the proposal into that of >>>>>>>> MultimapState >>>>>>>> because there seemed be 99% overlap. >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> > >>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> A few obvious problems with this code: >>>>>>>>> >>>>> 1. Removing the elements already processed from the bag >>>>>>>>> requires clearing and rewriting the entire bag. This is O(n^2) in the >>>>>>>>> number of input trades. >>>>>>>>> >>>> >>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade state? >>>>>>>>> >>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends State { >>>>>>>>> >>>>> // Add a value to the map. >>>>>>>>> >>>>> void put(K key, V value); >>>>>>>>> >>>>> // Get all values for a given key. >>>>>>>>> >>>>> ReadableState<Iterable<V>> get(K key); >>>>>>>>> >>>>> // Return all entries in the map. >>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> allEntries(); >>>>>>>>> >>>>> // Return all entries in the map with keys <= limit. >>>>>>>>> returned elements are sorted by the key. >>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit); >>>>>>>>> >>>>> >>>>>>>>> >>>>> // Remove all values with the given key; >>>>>>>>> >>>>> void remove(K key); >>>>>>>>> >>>>> // Remove all entries in the map with keys <= limit. >>>>>>>>> >>>>> void removeUntil(K limit); >>>>>>>>> >>>> >>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will remove all >>>>>>>>> entries in the map with keys < limit. >>>>>>>>> >>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> Runners will sort based on the encoded value of the key. In >>>>>>>>> order to make this easier for users, I propose that we introduce a >>>>>>>>> new tag >>>>>>>>> on Coders PreservesOrder. A Coder that contains this tag guarantees >>>>>>>>> that >>>>>>>>> the encoded value preserves the same ordering as the base Java type. >>>>>>>>> >>>> >>>>>>>>> >>>> >>>>>>>>> >>>> Could you clarify what is "encoded value preserves the same >>>>>>>>> ordering as the base Java type"? >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> Lets say A and B represent two different instances of the same >>>>>>>>> Java type like a double, then A < B (using the languages comparison >>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded versions are >>>>>>>>> compared >>>>>>>>> lexicographically) >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> Since coders are shared across SDKs, do we expect A < B iff >>>>>>>>> e(A) < e(P) property to hold for all languages we support? What >>>>>>>>> happens A, >>>>>>>>> B sort differently in different languages? >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > That would have to be the property of the coder (which means >>>>>>>>> that this property probably needs to be represented in the portability >>>>>>>>> representation of the coder). I imagine the common use cases will be >>>>>>>>> for >>>>>>>>> simple coders like int, long, string, etc., which are likely to sort >>>>>>>>> the >>>>>>>>> same in most languages. >>>>>>>>> >>>>>>>>> The standard coders for both double and integral types do not >>>>>>>>> respect >>>>>>>>> the natural ordering (consider negative values). KV coders violate >>>>>>>>> the >>>>>>>>> "natural" lexicographic ordering on components as well. I think >>>>>>>>> implicitly sorting on encoded value would yield many surprises. >>>>>>>>> (The >>>>>>>>> state, of course, could take a order-preserving, bytes >>>>>>>>> (string?)-producing callable as a parameter of course). (As for >>>>>>>>> naming, I'd probably call this OrderedBagState or something like >>>>>>>>> that...rather than Map which tends to imply random access.) >>>>>>>>> >>>>>>>>
