On Wed, Sep 16, 2020 at 8:48 AM Tyson Hamilton <[email protected]> wrote:
> The use case is to support an unbounded stream-stream join, where the > elements are arriving in roughly time sorted order. Removing a specific > element from the timestamp indexed collection is necessary when a match is > found. > Just checking - this is an optimization when you already know that the join is 1:1? Kenn > Having clearRange is helpful to expire elements that are no longer > relevant according to a user-provided time based join predicate (e.g. WHEN > ABS(leftElement.timestamp - rightElement.timestamp) < 5 minutes). > > I'll think a bit more on how to use MapState instead if having a remove() > like method for a single element isn't an option. > > On Tue, Sep 15, 2020 at 8:52 PM Reuven Lax <[email protected]> wrote: > >> Hi, >> >> Currently we only support removing a timestamp range. You can remove a >> single timestamp of course by removing [ts, ts+1), however if there are >> multiple elements with the same timestamp this will remove all of those >> elements. >> >> Does this fit your use case? If not, I wonder if MapState is closer to >> what you are looking for? >> >> Reuven >> >> On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton <[email protected]> >> wrote: >> >>> Hi Reuven, >>> >>> I noticed that there was an implementation of the in-memory >>> OrderedListState introduced [1]. Where can I find out more regarding the >>> plan and design? Is there a design doc? I'd like to know more details about >>> the implementation to see if it fits my use case. I was hoping it would >>> have a remove(TimestampedValue<T> e) method. >>> >>> Thanks, >>> -Tyson >>> >>> >>> [1]: >>> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41 >>> >>> >>> On 2020/08/03 21:41:46, Catlyn Kong <[email protected]> wrote: >>> > Hey folks, >>> > >>> > Sry I'm late to this thread but this might be very helpful for the >>> problem >>> > we're dealing with. Do we have a design doc or a jira ticket I can >>> follow? >>> > >>> > Cheers, >>> > Catlyn >>> > >>> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <[email protected]> wrote: >>> > >>> > > My questions were just an example. I fully agree there is a >>> fundamental >>> > > need for a sorted state (of some form, and I also think this links to >>> > > efficient implementation of retrations) - I was reacting to Kenn's >>> question >>> > > about BIP. This one would be pretty nice example why it would be >>> good to >>> > > have such a "process" - not everything can be solved on ML and there >>> are >>> > > fundamental decisions that might need a closer attention. >>> > > On 6/18/20 5:28 PM, Reuven Lax wrote: >>> > > >>> > > Jan - my proposal is exactly TimeSortedBagState (more accurately - >>> > > TimeSortedListState), though I went a bit further and also proposed >>> a way >>> > > to have a dynamic number of tagged TimeSortedBagStates. >>> > > >>> > > You are correct that the runner doesn't really have to store the >>> data time >>> > > sorted - what's actually needed is the ability to fetch and remove >>> > > timestamp ranges of data (though that does include fetching the >>> entire >>> > > list); TimeOrderedState is probably a more accurate name then >>> > > TimeSortedState. I don't think we could get away with operations >>> that only >>> > > act on the smallest timestamp, however we could limit the API to >>> only being >>> > > able to fetch and remove prefixes of data (ordered by timestamp). >>> However >>> > > if we support prefixes, we might as well support arbitrary subranges. >>> > > >>> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <[email protected]> >>> wrote: >>> > > >>> > >> Big +1 for a BIP, as this might really help clarify all the pros >>> and cons >>> > >> of all possibilities. There seem to be questions that need >>> answering and >>> > >> motivating use cases - do we need sorted map state or can we solve >>> our use >>> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState? >>> Does >>> > >> that really have to be time-sorted structure, or does it "only" >>> have to >>> > >> have operations that can efficiently find and remove element with >>> smallest >>> > >> timestamp (like a PriorityQueue)? >>> > >> >>> > >> Jan >>> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote: >>> > >> >>> > >> Zooming in from generic philosophy to be clear: adding time ordered >>> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that >>> will >>> > >> not be achieved by SDK-side implementation on top of either ordered >>> or >>> > >> unordered multimap. Are those benefits worth expanding the API? I >>> don't >>> > >> know. >>> > >> >>> > >> A change to allow a runner to have a specialized implementation for >>> > >> time-buffered state would be one or more StateKey types, right? >>> Reuven, >>> > >> maybe put this and your Java API in a doc? A BIP? Seems like >>> there's at >>> > >> least the following to explore: >>> > >> >>> > >> - how that Java API would map to an SDK-side implementation on top >>> of >>> > >> multimap state key >>> > >> - how that Java API would map to a new StateKey >>> > >> - whether there's actually more than one relevant implementation >>> of that >>> > >> StateKey >>> > >> - whether SDK-side implementation on some other state key would be >>> > >> performant enough in all SDK languages (present and future) >>> > >> >>> > >> Zooming back out to generic philosophy: Proliferation of StateKey >>> > >> types tuned by runners (which can very easily still share >>> implementation) >>> > >> is probably better than proliferation of complex SDK-side >>> implementations >>> > >> with varying completeness and performance. >>> > >> >>> > >> Kenn >>> > >> >>> > >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <[email protected]> >>> wrote: >>> > >> >>> > >>> It might help for me to describe what I have in mind. I'm still >>> > >>> proposing that we build multimap, just not a globally-sorted >>> multimap. >>> > >>> >>> > >>> My previous proposal was that we provide a Multimap<Key, Value> >>> state >>> > >>> type, sorted by key. this would have two additional operations - >>> > >>> multimap.getRange(startKey, endKey) and >>> multimap.deleteRange(startKey, >>> > >>> endKey). The primary use case was timestamp sorting, but I felt >>> that a >>> > >>> sorted multimap provided a nice generalization - after all, you >>> can simply >>> > >>> key the multimap by timestamp to get timestamp sorting. >>> > >>> >>> > >>> This approach had some issues immediately that would take some >>> work to >>> > >>> solve. Since a multimap key can have any type and a runner will >>> only be >>> > >>> able to sort by encoded type, we would need to introduce a concept >>> of >>> > >>> order-preserving coders into Beam and plumb that through. Robert >>> pointed >>> > >>> out that even our existing standard coders for simple integral >>> types don't >>> > >>> preserve order, so there will likely be surprises here. >>> > >>> >>> > >>> My current proposal is for a multimap that is not sorted by key, >>> but >>> > >>> that can support.ordered values for a single key. Remember that a >>> multimap >>> > >>> maps K -> Iterable<V>, so this means that each individual >>> Iterable<V> is >>> > >>> ordered, but the keys have no specific order relative to each >>> other. This >>> > >>> is not too different from many multimap implementations where the >>> keys are >>> > >>> unordered, but the list of values for a single key at least has a >>> stable >>> > >>> order. >>> > >>> >>> > >>> The interface would look like this: >>> > >>> >>> > >>> public interface MultimapState<K, V> extends State { >>> > >>> // Add a value with a default timestamp. >>> > >>> void put(K key, V value); >>> > >>> >>> > >>> // Add a timestamped value. >>> > >>> void put(K, key, TimestampedValue<V> value); >>> > >>> >>> > >>> // Remove all values for a key. >>> > >>> void remove (K key); >>> > >>> >>> > >>> // Remove all values for a key with timestamps within the >>> specified >>> > >>> range. >>> > >>> void removeRange(K key, Instant startTs, Instant endTs); >>> > >>> >>> > >>> // Get an Iterable of values for V. The Iterable will be returned >>> > >>> sorted by timestamp. >>> > >>> ReadableState<Iterable<TimestampedValue<V>>> get(K key); >>> > >>> >>> > >>> // Get an Iterable of values for V in the specified range. The >>> > >>> Iterable will be returned sorted by timestamp. >>> > >>> ReadableState<Iterable<TimestampedValue<V>>> getRange(K key, >>> Instant >>> > >>> startTs, Instant endTs); >>> > >>> >>> > >>> ReadableState<Iterable<K>> keys(); >>> > >>> ReadableState<Iterable<TimestampedValue<V>>> values(); >>> > >>> ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries; >>> > >>> } >>> > >>> >>> > >>> We can of course provide helper functions that allow using >>> MultimapState >>> > >>> without deailing with TimestampValue for users who only want a >>> multimap and >>> > >>> don't want sorting. >>> > >>> >>> > >>> I think many users will only need a single sorted list - not a full >>> > >>> multimap. It's worth offering this as well, and we can simply >>> build it on >>> > >>> top of MultimapState. It will look like an extension of BagState >>> > >>> >>> > >>> public interface TimestampSortedListState<T> extends State { >>> > >>> void add(TimestampedValue<T> value); >>> > >>> Iterable<TimestampedValue<T>> read(); >>> > >>> Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant >>> > >>> endTs); >>> > >>> void clearRange(Instant startTs, Instant endTs); >>> > >>> } >>> > >>> >>> > >>> >>> > >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <[email protected]> >>> wrote: >>> > >>> >>> > >>>> The portability layer is meant to live across multiple versions >>> of Beam >>> > >>>> and I don't think it should be treated by doing the simple and >>> useful thing >>> > >>>> now since I believe it will lead to a proliferation of the API. >>> > >>>> >>> > >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <[email protected]> >>> > >>>> wrote: >>> > >>>> >>> > >>>>> I have thoughts on the subject of whether to have APIs just for >>> the >>> > >>>>> lowest-level building blocks versus having APIs for higher-level >>> > >>>>> constructs. Specifically this applies to providing only unsorted >>> multimap >>> > >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to >>> focus on >>> > >>>>> time-ordered buffer; if it turns out to be easy to go all the >>> way to sorted >>> > >>>>> multimap that's nice-to-have; if it turns out to be easy to >>> implement on >>> > >>>>> top of unsorted map state that should probably be under the hood >>> > >>>>> >>> > >>>>> Reasons to build low-level multimap in the runner & fn api and >>> layer >>> > >>>>> higher-level things in the SDK: >>> > >>>>> >>> > >>>>> - It is less implementation for runners if they have to only >>> provide >>> > >>>>> fewer lower-level building blocks like multimap state. >>> > >>>>> - There are many more runners than SDKs (and will be even more >>> and >>> > >>>>> more) so this saves overall. >>> > >>>>> >>> > >>>>> Reasons to build higher-level constructs directly in the runner >>> and fn >>> > >>>>> api: >>> > >>>>> >>> > >>>>> - Having multiple higher-level state types may actually be less >>> > >>>>> implementation than one complex state type, especially if they >>> map to >>> > >>>>> runner primitives. >>> > >>>>> - The runner may have better specialized implementations, >>> especially >>> > >>>>> for something like a time-ordered buffer. >>> > >>>>> - The particular access patterns in an SDK-based implementation >>> may >>> > >>>>> not be ideal for each runner's underlying implementation of the >>> low-level >>> > >>>>> building block. >>> > >>>>> - There may be excessive gRPC overhead even for optimal access >>> > >>>>> patterns. >>> > >>>>> >>> > >>>>> There are ways to have best of both worlds, like: >>> > >>>>> >>> > >>>>> 1. Define multiple state types according to fundamental access >>> > >>>>> patterns, like we did this before portability. >>> > >>>>> 2. If it is easy to layer one on top of the other, do that >>> inside the >>> > >>>>> runner. Provide shared code so for runners providing the >>> lowest-level >>> > >>>>> primitive they get all the types for free. >>> > >>>>> >>> > >>>>> I understand that this is an oversimplification. It still >>> creates some >>> > >>>>> more work. And APIs are a burden so it is good to introduce as >>> few as >>> > >>>>> possible for maintenance. But it has performance benefits and >>> also unblocks >>> > >>>>> "just doing the simple and useful thing now" which I always like >>> to do as >>> > >>>>> long as it is compatible with future changes. If the APIs are >>> fundamental, >>> > >>>>> like sets, maps, timestamp ordering, then it is safe to guess >>> that they >>> > >>>>> will change rarely and be useful forever. >>> > >>>>> >>> > >>>>> Kenn >>> > >>>>> >>> > >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <[email protected]> >>> wrote: >>> > >>>>> >>> > >>>>>> I would be glad to take a stab at how to provide sorting on top >>> of >>> > >>>>>> unsorted multimap state. >>> > >>>>>> Based upon your description, you want integer keys representing >>> > >>>>>> timestamps and arbitrary user value for the values, is that >>> correct? >>> > >>>>>> What kinds of operations do you need on the sorted map state in >>> order >>> > >>>>>> of efficiency requirements? >>> > >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), >>> ClearAll(Range[x, y)) >>> > >>>>>> What kinds of operations do we expect the underlying unsorted >>> map >>> > >>>>>> state to be able to provide? >>> > >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g. >>> > >>>>>> enumerate(K)?) >>> > >>>>>> >>> > >>>>>> I went through a similar exercise of how to provide a list like >>> side >>> > >>>>>> input view over a multimap[1] side input which efficiently >>> allowed >>> > >>>>>> computation of size and provided random access while only >>> having access to >>> > >>>>>> get(K) and enumerate K's. >>> > >>>>>> >>> > >>>>>> 1: >>> > >>>>>> >>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568 >>> > >>>>>> >>> > >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <[email protected]> >>> wrote: >>> > >>>>>> >>> > >>>>>>> Bringing this subject up again, >>> > >>>>>>> >>> > >>>>>>> I've spent some time looking into implementing this for the >>> Dataflow >>> > >>>>>>> runner. I'm unable to find a way to implement the arbitrary >>> sorted multimap >>> > >>>>>>> efficiently for the case where there are large numbers of >>> unique keys. >>> > >>>>>>> Since the primary driving use case is timestamp ordering (i.e. >>> key is event >>> > >>>>>>> timestamp), you would expect to have nearly a new key per >>> element. I >>> > >>>>>>> considered Luke's suggestion above, but unfortunately it >>> doesn't really >>> > >>>>>>> solve this issue. >>> > >>>>>>> >>> > >>>>>>> The primary use case for sorting always seems to be sorting by >>> > >>>>>>> timestamp. I want to propose that instead of building the >>> fully-general >>> > >>>>>>> sorted multimap, we instead focus on a state type where the >>> sort key is an >>> > >>>>>>> integral type (like a timestamp or an integer). There is still >>> a valid use >>> > >>>>>>> case for multimap, but we can provide that as an unordered >>> state. At least >>> > >>>>>>> for Dataflow, it will be much easier >>> > >>>>>>> >>> > >>>>>>> While my difficulties here may be specific to the Dataflow >>> runner, >>> > >>>>>>> any such support would have to be built into other runners as >>> well, and >>> > >>>>>>> limiting to integral sorting likely makes it easier for other >>> runners to >>> > >>>>>>> implement this. Also, if you look at this >>> > >>>>>>> < >>> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> >>> Flink >>> > >>>>>>> comment pointed out by Aljoscha, for Flink the main use case >>> identified was >>> > >>>>>>> also timestamp sorting. This will also simplify the API design >>> for this >>> > >>>>>>> feature: Sorted multimap with arbitrary keys would require us >>> to introduce >>> > >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a >>> new >>> > >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral >>> types, the API >>> > >>>>>>> design is simpler as integral types can be represented >>> directly. >>> > >>>>>>> >>> > >>>>>>> Reuven >>> > >>>>>>> >>> > >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <[email protected]> >>> wrote: >>> > >>>>>>> >>> > >>>>>>>> This sounds to me like a potential runner strategy. However >>> if a >>> > >>>>>>>> runner can natively support sorted maps (e.g. we expect the >>> Dataflow runner >>> > >>>>>>>> to be able to do so, and I think it would be useful for other >>> runners as >>> > >>>>>>>> well), then it's probably preferable to allow the runner to >>> use its native >>> > >>>>>>>> capabilities. >>> > >>>>>>>> >>> > >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik < >>> [email protected]> >>> > >>>>>>>> wrote: >>> > >>>>>>>> >>> > >>>>>>>>> For the API that you proposed, the map key is always "void" >>> and >>> > >>>>>>>>> the sort key == user key. So in my example of >>> > >>>>>>>>> key: dummy value >>> > >>>>>>>>> key.000: token, (0001, value4) >>> > >>>>>>>>> key.001: token, (0010, value1), (0011, value2) >>> > >>>>>>>>> key.01: token >>> > >>>>>>>>> key.1: token, (1011, value3) >>> > >>>>>>>>> you would have: >>> > >>>>>>>>> "void": dummy value >>> > >>>>>>>>> "void".000: token, (0001, value4) >>> > >>>>>>>>> "void".001: token, (0010, value1), (0011, value2) >>> > >>>>>>>>> "void".01: token >>> > >>>>>>>>> "void".1: token, (1011, value3) >>> > >>>>>>>>> >>> > >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into >>> walking >>> > >>>>>>>>> the the prefixes until you find a common prefix for K and >>> then filter for >>> > >>>>>>>>> values where they have a sort key <= K. Using the example >>> above, to find >>> > >>>>>>>>> entriesUntil(0010) you would: >>> > >>>>>>>>> look for key."", miss >>> > >>>>>>>>> look for key.0, miss >>> > >>>>>>>>> look for key.00, miss >>> > >>>>>>>>> look for key.000, hit, sort all contained values using >>> secondary >>> > >>>>>>>>> key, provide value4 to user >>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 >>> so we >>> > >>>>>>>>> sort all contained values using secondary key, filter out >>> value2 and >>> > >>>>>>>>> provide value1 >>> > >>>>>>>>> >>> > >>>>>>>>> void removeUntil(K limit) also translates into walking the >>> > >>>>>>>>> prefixes but instead we will clear them when we have a "hit" >>> with some >>> > >>>>>>>>> special logic for when the sort key is a prefix of the key. >>> Used the >>> > >>>>>>>>> example, to removeUntil(0010) you would: >>> > >>>>>>>>> look for key."", miss >>> > >>>>>>>>> look for key.0, miss >>> > >>>>>>>>> look for key.00, miss >>> > >>>>>>>>> look for key.000, hit, clear >>> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 >>> so we >>> > >>>>>>>>> sort all contained values using secondary key, store in >>> memory all values >>> > >>>>>>>>> that > 0010, clear and append values stored in memory. >>> > >>>>>>>>> >>> > >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax < >>> [email protected]> >>> > >>>>>>>>> wrote: >>> > >>>>>>>>> >>> > >>>>>>>>>> Can you explain how fetching and deleting ranges of keys >>> would >>> > >>>>>>>>>> work with this data structure? >>> > >>>>>>>>>> >>> > >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik < >>> [email protected]> >>> > >>>>>>>>>> wrote: >>> > >>>>>>>>>> >>> > >>>>>>>>>>> Reuven, for the example, I assume that we never want to >>> store >>> > >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we >>> do then we will >>> > >>>>>>>>>>> create a new longer prefix splitting up the values based >>> upon the sort key. >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> Tuple representation in examples below is (key, sort key, >>> value) >>> > >>>>>>>>>>> and . is a character outside of the alphabet which can be >>> represented by >>> > >>>>>>>>>>> using an escaping encoding that wraps the key + sort key >>> encoding. >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the >>> > >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this >>> case its 0, so we >>> > >>>>>>>>>>> append value to the map at key.0 ending up with (we also >>> set the key to any >>> > >>>>>>>>>>> dummy value to know that it it contains values): >>> > >>>>>>>>>>> key: dummy value >>> > >>>>>>>>>>> key."": token, (0010, value1) >>> > >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + >>> all >>> > >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to >>> key."" ending up >>> > >>>>>>>>>>> with: >>> > >>>>>>>>>>> key: dummy value >>> > >>>>>>>>>>> key."": token, (0010, value1), (0011, value2) >>> > >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + >>> all >>> > >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is >>> full, so we partition >>> > >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear >>> the "" prefix >>> > >>>>>>>>>>> ending up with: >>> > >>>>>>>>>>> key: dummy value >>> > >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2) >>> > >>>>>>>>>>> key.1: token, (1011, value3) >>> > >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + >>> all >>> > >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is >>> full, so we >>> > >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but >>> notice this >>> > >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00 >>> again to 000, 001. >>> > >>>>>>>>>>> We also clear the 0 prefix ending up with: >>> > >>>>>>>>>>> key: dummy value >>> > >>>>>>>>>>> key.000: token, (0001, value4) >>> > >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2) >>> > >>>>>>>>>>> key.01: token >>> > >>>>>>>>>>> key.1: token, (1011, value3) >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> We are effectively building a trie[1] where we only have >>> values >>> > >>>>>>>>>>> at the leaves and control how full each leaf can be. There >>> are other trie >>> > >>>>>>>>>>> representations like a radix tree that may be better. >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> Looking up the values in sorted order for "key" would go >>> like >>> > >>>>>>>>>>> this: >>> > >>>>>>>>>>> Is key set, yes >>> > >>>>>>>>>>> look for key."", miss >>> > >>>>>>>>>>> look for key.0, miss >>> > >>>>>>>>>>> look for key.00, miss >>> > >>>>>>>>>>> look for key.000, hit, sort all contained values using >>> secondary >>> > >>>>>>>>>>> key, provide value4 to user >>> > >>>>>>>>>>> look for key.001, hit, sort all contained values using >>> secondary >>> > >>>>>>>>>>> key, provide value1 followed by value2 to user >>> > >>>>>>>>>>> look for key.01, hit, empty, return no values to user >>> > >>>>>>>>>>> look for key.1, hit, sort all contained values using >>> secondary >>> > >>>>>>>>>>> key, provide value3 to user >>> > >>>>>>>>>>> we have walked the entire prefix space, signal end of >>> iterable >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> Some notes for the above: >>> > >>>>>>>>>>> * The dummy value is used to know that the key contains >>> values >>> > >>>>>>>>>>> and the token is to know whether there are any values >>> deeper in the trie so >>> > >>>>>>>>>>> when we know when to stop searching. >>> > >>>>>>>>>>> * If we can recalculate the sort key from the combination >>> of the >>> > >>>>>>>>>>> key and value, then we don't need to store it. >>> > >>>>>>>>>>> * Keys with lots of values will perform worse then keys >>> with >>> > >>>>>>>>>>> less values since we have to look up more keys but they >>> will be empty >>> > >>>>>>>>>>> reads. The number of misses can be controlled by how many >>> elements we are >>> > >>>>>>>>>>> willing to store at a given node before we subdivide. >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> In reality you could build a lot of structures (e.g. red >>> black >>> > >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the >>> cost of >>> > >>>>>>>>>>> rebalancing/re-organizing the structure in map form and >>> whether it has a >>> > >>>>>>>>>>> convenient pre-order traversal for lookups. >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> >>> > >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax < >>> [email protected]> >>> > >>>>>>>>>>> wrote: >>> > >>>>>>>>>>> >>> > >>>>>>>>>>>> Some great comments! >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented >>> by >>> > >>>>>>>>>>>> runners to be efficient. We can of course provide a >>> default (inefficient) >>> > >>>>>>>>>>>> implementation, but ideally runners would provide better >>> ones. >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed >>> by >>> > >>>>>>>>>>>> this. E.g. >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying >>> > >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag >>> the coders that >>> > >>>>>>>>>>>> do preserve order, and only accept those as key coders >>> Alternatively we >>> > >>>>>>>>>>>> could present a more limited API - e.g. only allowing a >>> hard-coded set of >>> > >>>>>>>>>>>> types to be used as keys - but that seems counter to the >>> direction Beam >>> > >>>>>>>>>>>> usually goes. So users will have two ways .of creating >>> multimap state specs: >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> private final StateSpec<MultimapState<Long, String>> >>> state = >>> > >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), >>> StringUtf8Coder.of()); >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> or >>> > >>>>>>>>>>>> private final StateSpec<MultimapState<Long, String>> >>> state = >>> > >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), >>> StringUtf8Coder.of()); >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> The second one will validate that the key coder preserves >>> > >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism >>> checking in >>> > >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these >>> functions that use >>> > >>>>>>>>>>>> coder inference to "guess" the coder, but those will do >>> the same checking) >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> Also the API I proposed did support random access! We >>> could >>> > >>>>>>>>>>>> separate out OrderedBagState again if we think the use >>> cases are >>> > >>>>>>>>>>>> fundamentally different. I merged the proposal into that >>> of MultimapState >>> > >>>>>>>>>>>> because there seemed be 99% overlap. >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> Reuven >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw < >>> > >>>>>>>>>>>> [email protected]> wrote: >>> > >>>>>>>>>>>> >>> > >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax < >>> [email protected]> >>> > >>>>>>>>>>>>> wrote: >>> > >>>>>>>>>>>>> > >>> > >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay < >>> > >>>>>>>>>>>>> [email protected]> wrote: >>> > >>>>>>>>>>>>> >> >>> > >>>>>>>>>>>>> >> >>> > >>>>>>>>>>>>> >> >>> > >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik < >>> > >>>>>>>>>>>>> [email protected]> wrote: >>> > >>>>>>>>>>>>> >>> >>> > >>>>>>>>>>>>> >>> >>> > >>>>>>>>>>>>> >>> >>> > >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang < >>> > >>>>>>>>>>>>> [email protected]> wrote: >>> > >>>>>>>>>>>>> >>>>> >>> > >>>>>>>>>>>>> >>>>> A few obvious problems with this code: >>> > >>>>>>>>>>>>> >>>>> 1. Removing the elements already processed from >>> the >>> > >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This >>> is O(n^2) in the >>> > >>>>>>>>>>>>> number of input trades. >>> > >>>>>>>>>>>>> >>>> >>> > >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting >>> trade >>> > >>>>>>>>>>>>> state? >>> > >>>>>>>>>>>>> >>>> >>> > >>>>>>>>>>>>> >>>>> >>> > >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends >>> State >>> > >>>>>>>>>>>>> { >>> > >>>>>>>>>>>>> >>>>> // Add a value to the map. >>> > >>>>>>>>>>>>> >>>>> void put(K key, V value); >>> > >>>>>>>>>>>>> >>>>> // Get all values for a given key. >>> > >>>>>>>>>>>>> >>>>> ReadableState<Iterable<V>> get(K key); >>> > >>>>>>>>>>>>> >>>>> // Return all entries in the map. >>> > >>>>>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> allEntries(); >>> > >>>>>>>>>>>>> >>>>> // Return all entries in the map with keys <= >>> limit. >>> > >>>>>>>>>>>>> returned elements are sorted by the key. >>> > >>>>>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> entriesUntil(K >>> > >>>>>>>>>>>>> limit); >>> > >>>>>>>>>>>>> >>>>> >>> > >>>>>>>>>>>>> >>>>> // Remove all values with the given key; >>> > >>>>>>>>>>>>> >>>>> void remove(K key); >>> > >>>>>>>>>>>>> >>>>> // Remove all entries in the map with keys <= >>> limit. >>> > >>>>>>>>>>>>> >>>>> void removeUntil(K limit); >>> > >>>>>>>>>>>>> >>>> >>> > >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will >>> remove >>> > >>>>>>>>>>>>> all entries in the map with keys < limit. >>> > >>>>>>>>>>>>> >>>> >>> > >>>>>>>>>>>>> >>>>> >>> > >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of >>> the key. >>> > >>>>>>>>>>>>> In order to make this easier for users, I propose that >>> we introduce a new >>> > >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this >>> tag guarantees >>> > >>>>>>>>>>>>> that the encoded value preserves the same ordering as >>> the base Java type. >>> > >>>>>>>>>>>>> >>>> >>> > >>>>>>>>>>>>> >>>> >>> > >>>>>>>>>>>>> >>>> Could you clarify what is "encoded value preserves >>> the >>> > >>>>>>>>>>>>> same ordering as the base Java type"? >>> > >>>>>>>>>>>>> >>> >>> > >>>>>>>>>>>>> >>> >>> > >>>>>>>>>>>>> >>> Lets say A and B represent two different instances >>> of the >>> > >>>>>>>>>>>>> same Java type like a double, then A < B (using the >>> languages comparison >>> > >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded >>> versions are compared >>> > >>>>>>>>>>>>> lexicographically) >>> > >>>>>>>>>>>>> >> >>> > >>>>>>>>>>>>> >> >>> > >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < >>> B iff >>> > >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we >>> support? What happens A, >>> > >>>>>>>>>>>>> B sort differently in different languages? >>> > >>>>>>>>>>>>> > >>> > >>>>>>>>>>>>> > >>> > >>>>>>>>>>>>> > That would have to be the property of the coder (which >>> means >>> > >>>>>>>>>>>>> that this property probably needs to be represented in >>> the portability >>> > >>>>>>>>>>>>> representation of the coder). I imagine the common use >>> cases will be for >>> > >>>>>>>>>>>>> simple coders like int, long, string, etc., which are >>> likely to sort the >>> > >>>>>>>>>>>>> same in most languages. >>> > >>>>>>>>>>>>> >>> > >>>>>>>>>>>>> The standard coders for both double and integral types >>> do not >>> > >>>>>>>>>>>>> respect >>> > >>>>>>>>>>>>> the natural ordering (consider negative values). KV >>> coders >>> > >>>>>>>>>>>>> violate the >>> > >>>>>>>>>>>>> "natural" lexicographic ordering on components as well. >>> I think >>> > >>>>>>>>>>>>> implicitly sorting on encoded value would yield many >>> > >>>>>>>>>>>>> surprises. (The >>> > >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes >>> > >>>>>>>>>>>>> (string?)-producing callable as a parameter of course). >>> (As for >>> > >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or >>> something >>> > >>>>>>>>>>>>> like >>> > >>>>>>>>>>>>> that...rather than Map which tends to imply random >>> access.) >>> > >>>>>>>>>>>>> >>> > >>>>>>>>>>>> >>> > >>> >>
