The use case is to support an unbounded stream-stream join, where the elements are arriving in roughly time sorted order. Removing a specific element from the timestamp indexed collection is necessary when a match is found. Having clearRange is helpful to expire elements that are no longer relevant according to a user-provided time based join predicate (e.g. WHEN ABS(leftElement.timestamp - rightElement.timestamp) < 5 minutes).
I'll think a bit more on how to use MapState instead if having a remove() like method for a single element isn't an option. On Tue, Sep 15, 2020 at 8:52 PM Reuven Lax <re...@google.com> wrote: > Hi, > > Currently we only support removing a timestamp range. You can remove a > single timestamp of course by removing [ts, ts+1), however if there are > multiple elements with the same timestamp this will remove all of those > elements. > > Does this fit your use case? If not, I wonder if MapState is closer to > what you are looking for? > > Reuven > > On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton <tyso...@google.com> wrote: > >> Hi Reuven, >> >> I noticed that there was an implementation of the in-memory >> OrderedListState introduced [1]. Where can I find out more regarding the >> plan and design? Is there a design doc? I'd like to know more details about >> the implementation to see if it fits my use case. I was hoping it would >> have a remove(TimestampedValue<T> e) method. >> >> Thanks, >> -Tyson >> >> >> [1]: >> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41 >> >> >> On 2020/08/03 21:41:46, Catlyn Kong <catl...@yelp.com> wrote: >> > Hey folks, >> > >> > Sry I'm late to this thread but this might be very helpful for the >> problem >> > we're dealing with. Do we have a design doc or a jira ticket I can >> follow? >> > >> > Cheers, >> > Catlyn >> > >> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský <je...@seznam.cz> wrote: >> > >> > > My questions were just an example. I fully agree there is a >> fundamental >> > > need for a sorted state (of some form, and I also think this links to >> > > efficient implementation of retrations) - I was reacting to Kenn's >> question >> > > about BIP. This one would be pretty nice example why it would be good >> to >> > > have such a "process" - not everything can be solved on ML and there >> are >> > > fundamental decisions that might need a closer attention. >> > > On 6/18/20 5:28 PM, Reuven Lax wrote: >> > > >> > > Jan - my proposal is exactly TimeSortedBagState (more accurately - >> > > TimeSortedListState), though I went a bit further and also proposed a >> way >> > > to have a dynamic number of tagged TimeSortedBagStates. >> > > >> > > You are correct that the runner doesn't really have to store the data >> time >> > > sorted - what's actually needed is the ability to fetch and remove >> > > timestamp ranges of data (though that does include fetching the entire >> > > list); TimeOrderedState is probably a more accurate name then >> > > TimeSortedState. I don't think we could get away with operations that >> only >> > > act on the smallest timestamp, however we could limit the API to only >> being >> > > able to fetch and remove prefixes of data (ordered by timestamp). >> However >> > > if we support prefixes, we might as well support arbitrary subranges. >> > > >> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský <je...@seznam.cz> wrote: >> > > >> > >> Big +1 for a BIP, as this might really help clarify all the pros and >> cons >> > >> of all possibilities. There seem to be questions that need answering >> and >> > >> motivating use cases - do we need sorted map state or can we solve >> our use >> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState? >> Does >> > >> that really have to be time-sorted structure, or does it "only" have >> to >> > >> have operations that can efficiently find and remove element with >> smallest >> > >> timestamp (like a PriorityQueue)? >> > >> >> > >> Jan >> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote: >> > >> >> > >> Zooming in from generic philosophy to be clear: adding time ordered >> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that >> will >> > >> not be achieved by SDK-side implementation on top of either ordered >> or >> > >> unordered multimap. Are those benefits worth expanding the API? I >> don't >> > >> know. >> > >> >> > >> A change to allow a runner to have a specialized implementation for >> > >> time-buffered state would be one or more StateKey types, right? >> Reuven, >> > >> maybe put this and your Java API in a doc? A BIP? Seems like there's >> at >> > >> least the following to explore: >> > >> >> > >> - how that Java API would map to an SDK-side implementation on top >> of >> > >> multimap state key >> > >> - how that Java API would map to a new StateKey >> > >> - whether there's actually more than one relevant implementation of >> that >> > >> StateKey >> > >> - whether SDK-side implementation on some other state key would be >> > >> performant enough in all SDK languages (present and future) >> > >> >> > >> Zooming back out to generic philosophy: Proliferation of StateKey >> > >> types tuned by runners (which can very easily still share >> implementation) >> > >> is probably better than proliferation of complex SDK-side >> implementations >> > >> with varying completeness and performance. >> > >> >> > >> Kenn >> > >> >> > >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com> wrote: >> > >> >> > >>> It might help for me to describe what I have in mind. I'm still >> > >>> proposing that we build multimap, just not a globally-sorted >> multimap. >> > >>> >> > >>> My previous proposal was that we provide a Multimap<Key, Value> >> state >> > >>> type, sorted by key. this would have two additional operations - >> > >>> multimap.getRange(startKey, endKey) and >> multimap.deleteRange(startKey, >> > >>> endKey). The primary use case was timestamp sorting, but I felt >> that a >> > >>> sorted multimap provided a nice generalization - after all, you can >> simply >> > >>> key the multimap by timestamp to get timestamp sorting. >> > >>> >> > >>> This approach had some issues immediately that would take some work >> to >> > >>> solve. Since a multimap key can have any type and a runner will >> only be >> > >>> able to sort by encoded type, we would need to introduce a concept >> of >> > >>> order-preserving coders into Beam and plumb that through. Robert >> pointed >> > >>> out that even our existing standard coders for simple integral >> types don't >> > >>> preserve order, so there will likely be surprises here. >> > >>> >> > >>> My current proposal is for a multimap that is not sorted by key, but >> > >>> that can support.ordered values for a single key. Remember that a >> multimap >> > >>> maps K -> Iterable<V>, so this means that each individual >> Iterable<V> is >> > >>> ordered, but the keys have no specific order relative to each >> other. This >> > >>> is not too different from many multimap implementations where the >> keys are >> > >>> unordered, but the list of values for a single key at least has a >> stable >> > >>> order. >> > >>> >> > >>> The interface would look like this: >> > >>> >> > >>> public interface MultimapState<K, V> extends State { >> > >>> // Add a value with a default timestamp. >> > >>> void put(K key, V value); >> > >>> >> > >>> // Add a timestamped value. >> > >>> void put(K, key, TimestampedValue<V> value); >> > >>> >> > >>> // Remove all values for a key. >> > >>> void remove (K key); >> > >>> >> > >>> // Remove all values for a key with timestamps within the >> specified >> > >>> range. >> > >>> void removeRange(K key, Instant startTs, Instant endTs); >> > >>> >> > >>> // Get an Iterable of values for V. The Iterable will be returned >> > >>> sorted by timestamp. >> > >>> ReadableState<Iterable<TimestampedValue<V>>> get(K key); >> > >>> >> > >>> // Get an Iterable of values for V in the specified range. The >> > >>> Iterable will be returned sorted by timestamp. >> > >>> ReadableState<Iterable<TimestampedValue<V>>> getRange(K key, >> Instant >> > >>> startTs, Instant endTs); >> > >>> >> > >>> ReadableState<Iterable<K>> keys(); >> > >>> ReadableState<Iterable<TimestampedValue<V>>> values(); >> > >>> ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries; >> > >>> } >> > >>> >> > >>> We can of course provide helper functions that allow using >> MultimapState >> > >>> without deailing with TimestampValue for users who only want a >> multimap and >> > >>> don't want sorting. >> > >>> >> > >>> I think many users will only need a single sorted list - not a full >> > >>> multimap. It's worth offering this as well, and we can simply build >> it on >> > >>> top of MultimapState. It will look like an extension of BagState >> > >>> >> > >>> public interface TimestampSortedListState<T> extends State { >> > >>> void add(TimestampedValue<T> value); >> > >>> Iterable<TimestampedValue<T>> read(); >> > >>> Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant >> > >>> endTs); >> > >>> void clearRange(Instant startTs, Instant endTs); >> > >>> } >> > >>> >> > >>> >> > >>> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com> wrote: >> > >>> >> > >>>> The portability layer is meant to live across multiple versions of >> Beam >> > >>>> and I don't think it should be treated by doing the simple and >> useful thing >> > >>>> now since I believe it will lead to a proliferation of the API. >> > >>>> >> > >>>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles <k...@apache.org> >> > >>>> wrote: >> > >>>> >> > >>>>> I have thoughts on the subject of whether to have APIs just for >> the >> > >>>>> lowest-level building blocks versus having APIs for higher-level >> > >>>>> constructs. Specifically this applies to providing only unsorted >> multimap >> > >>>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to >> focus on >> > >>>>> time-ordered buffer; if it turns out to be easy to go all the way >> to sorted >> > >>>>> multimap that's nice-to-have; if it turns out to be easy to >> implement on >> > >>>>> top of unsorted map state that should probably be under the hood >> > >>>>> >> > >>>>> Reasons to build low-level multimap in the runner & fn api and >> layer >> > >>>>> higher-level things in the SDK: >> > >>>>> >> > >>>>> - It is less implementation for runners if they have to only >> provide >> > >>>>> fewer lower-level building blocks like multimap state. >> > >>>>> - There are many more runners than SDKs (and will be even more >> and >> > >>>>> more) so this saves overall. >> > >>>>> >> > >>>>> Reasons to build higher-level constructs directly in the runner >> and fn >> > >>>>> api: >> > >>>>> >> > >>>>> - Having multiple higher-level state types may actually be less >> > >>>>> implementation than one complex state type, especially if they >> map to >> > >>>>> runner primitives. >> > >>>>> - The runner may have better specialized implementations, >> especially >> > >>>>> for something like a time-ordered buffer. >> > >>>>> - The particular access patterns in an SDK-based implementation >> may >> > >>>>> not be ideal for each runner's underlying implementation of the >> low-level >> > >>>>> building block. >> > >>>>> - There may be excessive gRPC overhead even for optimal access >> > >>>>> patterns. >> > >>>>> >> > >>>>> There are ways to have best of both worlds, like: >> > >>>>> >> > >>>>> 1. Define multiple state types according to fundamental access >> > >>>>> patterns, like we did this before portability. >> > >>>>> 2. If it is easy to layer one on top of the other, do that inside >> the >> > >>>>> runner. Provide shared code so for runners providing the >> lowest-level >> > >>>>> primitive they get all the types for free. >> > >>>>> >> > >>>>> I understand that this is an oversimplification. It still creates >> some >> > >>>>> more work. And APIs are a burden so it is good to introduce as >> few as >> > >>>>> possible for maintenance. But it has performance benefits and >> also unblocks >> > >>>>> "just doing the simple and useful thing now" which I always like >> to do as >> > >>>>> long as it is compatible with future changes. If the APIs are >> fundamental, >> > >>>>> like sets, maps, timestamp ordering, then it is safe to guess >> that they >> > >>>>> will change rarely and be useful forever. >> > >>>>> >> > >>>>> Kenn >> > >>>>> >> > >>>>> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik <lc...@google.com> >> wrote: >> > >>>>> >> > >>>>>> I would be glad to take a stab at how to provide sorting on top >> of >> > >>>>>> unsorted multimap state. >> > >>>>>> Based upon your description, you want integer keys representing >> > >>>>>> timestamps and arbitrary user value for the values, is that >> correct? >> > >>>>>> What kinds of operations do you need on the sorted map state in >> order >> > >>>>>> of efficiency requirements? >> > >>>>>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), >> ClearAll(Range[x, y)) >> > >>>>>> What kinds of operations do we expect the underlying unsorted map >> > >>>>>> state to be able to provide? >> > >>>>>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g. >> > >>>>>> enumerate(K)?) >> > >>>>>> >> > >>>>>> I went through a similar exercise of how to provide a list like >> side >> > >>>>>> input view over a multimap[1] side input which efficiently >> allowed >> > >>>>>> computation of size and provided random access while only having >> access to >> > >>>>>> get(K) and enumerate K's. >> > >>>>>> >> > >>>>>> 1: >> > >>>>>> >> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568 >> > >>>>>> >> > >>>>>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax <re...@google.com> >> wrote: >> > >>>>>> >> > >>>>>>> Bringing this subject up again, >> > >>>>>>> >> > >>>>>>> I've spent some time looking into implementing this for the >> Dataflow >> > >>>>>>> runner. I'm unable to find a way to implement the arbitrary >> sorted multimap >> > >>>>>>> efficiently for the case where there are large numbers of >> unique keys. >> > >>>>>>> Since the primary driving use case is timestamp ordering (i.e. >> key is event >> > >>>>>>> timestamp), you would expect to have nearly a new key per >> element. I >> > >>>>>>> considered Luke's suggestion above, but unfortunately it >> doesn't really >> > >>>>>>> solve this issue. >> > >>>>>>> >> > >>>>>>> The primary use case for sorting always seems to be sorting by >> > >>>>>>> timestamp. I want to propose that instead of building the >> fully-general >> > >>>>>>> sorted multimap, we instead focus on a state type where the >> sort key is an >> > >>>>>>> integral type (like a timestamp or an integer). There is still >> a valid use >> > >>>>>>> case for multimap, but we can provide that as an unordered >> state. At least >> > >>>>>>> for Dataflow, it will be much easier >> > >>>>>>> >> > >>>>>>> While my difficulties here may be specific to the Dataflow >> runner, >> > >>>>>>> any such support would have to be built into other runners as >> well, and >> > >>>>>>> limiting to integral sorting likely makes it easier for other >> runners to >> > >>>>>>> implement this. Also, if you look at this >> > >>>>>>> < >> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95> >> Flink >> > >>>>>>> comment pointed out by Aljoscha, for Flink the main use case >> identified was >> > >>>>>>> also timestamp sorting. This will also simplify the API design >> for this >> > >>>>>>> feature: Sorted multimap with arbitrary keys would require us >> to introduce >> > >>>>>>> a way of mapping natural ordering to encoded ordering (i.e. a >> new >> > >>>>>>> OrderPreservingCoder), but if we limit sort keys to integral >> types, the API >> > >>>>>>> design is simpler as integral types can be represented directly. >> > >>>>>>> >> > >>>>>>> Reuven >> > >>>>>>> >> > >>>>>>> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax <re...@google.com> >> wrote: >> > >>>>>>> >> > >>>>>>>> This sounds to me like a potential runner strategy. However if >> a >> > >>>>>>>> runner can natively support sorted maps (e.g. we expect the >> Dataflow runner >> > >>>>>>>> to be able to do so, and I think it would be useful for other >> runners as >> > >>>>>>>> well), then it's probably preferable to allow the runner to >> use its native >> > >>>>>>>> capabilities. >> > >>>>>>>> >> > >>>>>>>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik <lc...@google.com >> > >> > >>>>>>>> wrote: >> > >>>>>>>> >> > >>>>>>>>> For the API that you proposed, the map key is always "void" >> and >> > >>>>>>>>> the sort key == user key. So in my example of >> > >>>>>>>>> key: dummy value >> > >>>>>>>>> key.000: token, (0001, value4) >> > >>>>>>>>> key.001: token, (0010, value1), (0011, value2) >> > >>>>>>>>> key.01: token >> > >>>>>>>>> key.1: token, (1011, value3) >> > >>>>>>>>> you would have: >> > >>>>>>>>> "void": dummy value >> > >>>>>>>>> "void".000: token, (0001, value4) >> > >>>>>>>>> "void".001: token, (0010, value1), (0011, value2) >> > >>>>>>>>> "void".01: token >> > >>>>>>>>> "void".1: token, (1011, value3) >> > >>>>>>>>> >> > >>>>>>>>> Iterable<KV<K, V>> entriesUntil(K limit) translates into >> walking >> > >>>>>>>>> the the prefixes until you find a common prefix for K and >> then filter for >> > >>>>>>>>> values where they have a sort key <= K. Using the example >> above, to find >> > >>>>>>>>> entriesUntil(0010) you would: >> > >>>>>>>>> look for key."", miss >> > >>>>>>>>> look for key.0, miss >> > >>>>>>>>> look for key.00, miss >> > >>>>>>>>> look for key.000, hit, sort all contained values using >> secondary >> > >>>>>>>>> key, provide value4 to user >> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so >> we >> > >>>>>>>>> sort all contained values using secondary key, filter out >> value2 and >> > >>>>>>>>> provide value1 >> > >>>>>>>>> >> > >>>>>>>>> void removeUntil(K limit) also translates into walking the >> > >>>>>>>>> prefixes but instead we will clear them when we have a "hit" >> with some >> > >>>>>>>>> special logic for when the sort key is a prefix of the key. >> Used the >> > >>>>>>>>> example, to removeUntil(0010) you would: >> > >>>>>>>>> look for key."", miss >> > >>>>>>>>> look for key.0, miss >> > >>>>>>>>> look for key.00, miss >> > >>>>>>>>> look for key.000, hit, clear >> > >>>>>>>>> look for key.001, hit, notice that 001 is a prefix of 0010 so >> we >> > >>>>>>>>> sort all contained values using secondary key, store in >> memory all values >> > >>>>>>>>> that > 0010, clear and append values stored in memory. >> > >>>>>>>>> >> > >>>>>>>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax <re...@google.com >> > >> > >>>>>>>>> wrote: >> > >>>>>>>>> >> > >>>>>>>>>> Can you explain how fetching and deleting ranges of keys >> would >> > >>>>>>>>>> work with this data structure? >> > >>>>>>>>>> >> > >>>>>>>>>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik < >> lc...@google.com> >> > >>>>>>>>>> wrote: >> > >>>>>>>>>> >> > >>>>>>>>>>> Reuven, for the example, I assume that we never want to >> store >> > >>>>>>>>>>> more then 2 values at a given sort key prefix, and if we do >> then we will >> > >>>>>>>>>>> create a new longer prefix splitting up the values based >> upon the sort key. >> > >>>>>>>>>>> >> > >>>>>>>>>>> Tuple representation in examples below is (key, sort key, >> value) >> > >>>>>>>>>>> and . is a character outside of the alphabet which can be >> represented by >> > >>>>>>>>>>> using an escaping encoding that wraps the key + sort key >> encoding. >> > >>>>>>>>>>> >> > >>>>>>>>>>> To insert (key, 0010, value1), we lookup "key" + all the >> > >>>>>>>>>>> prefixes of 0010 finding one that is not empty. In this >> case its 0, so we >> > >>>>>>>>>>> append value to the map at key.0 ending up with (we also >> set the key to any >> > >>>>>>>>>>> dummy value to know that it it contains values): >> > >>>>>>>>>>> key: dummy value >> > >>>>>>>>>>> key."": token, (0010, value1) >> > >>>>>>>>>>> Now we insert (key, 0011, value2), we again lookup "key" + >> all >> > >>>>>>>>>>> the prefixes of 0010, finding "", so we append value2 to >> key."" ending up >> > >>>>>>>>>>> with: >> > >>>>>>>>>>> key: dummy value >> > >>>>>>>>>>> key."": token, (0010, value1), (0011, value2) >> > >>>>>>>>>>> Now we insert (key, 1011, value3), we again lookup "key" + >> all >> > >>>>>>>>>>> the prefixes of 1011 finding "" but notice that it is full, >> so we partition >> > >>>>>>>>>>> all the values into two prefixes 0 and 1. We also clear the >> "" prefix >> > >>>>>>>>>>> ending up with: >> > >>>>>>>>>>> key: dummy value >> > >>>>>>>>>>> key.0: token, (0010, value1), (0011, value2) >> > >>>>>>>>>>> key.1: token, (1011, value3) >> > >>>>>>>>>>> Now we insert (key, 0001, value4), we again lookup "key" + >> all >> > >>>>>>>>>>> the prefixes of the value finding 0 but notice that it is >> full, so we >> > >>>>>>>>>>> partition all the values into two prefixes 00 and 01 but >> notice this >> > >>>>>>>>>>> doesn't help us since 00 will be too full so we split 00 >> again to 000, 001. >> > >>>>>>>>>>> We also clear the 0 prefix ending up with: >> > >>>>>>>>>>> key: dummy value >> > >>>>>>>>>>> key.000: token, (0001, value4) >> > >>>>>>>>>>> key.001: token, (0010, value1), (0011, value2) >> > >>>>>>>>>>> key.01: token >> > >>>>>>>>>>> key.1: token, (1011, value3) >> > >>>>>>>>>>> >> > >>>>>>>>>>> We are effectively building a trie[1] where we only have >> values >> > >>>>>>>>>>> at the leaves and control how full each leaf can be. There >> are other trie >> > >>>>>>>>>>> representations like a radix tree that may be better. >> > >>>>>>>>>>> >> > >>>>>>>>>>> Looking up the values in sorted order for "key" would go >> like >> > >>>>>>>>>>> this: >> > >>>>>>>>>>> Is key set, yes >> > >>>>>>>>>>> look for key."", miss >> > >>>>>>>>>>> look for key.0, miss >> > >>>>>>>>>>> look for key.00, miss >> > >>>>>>>>>>> look for key.000, hit, sort all contained values using >> secondary >> > >>>>>>>>>>> key, provide value4 to user >> > >>>>>>>>>>> look for key.001, hit, sort all contained values using >> secondary >> > >>>>>>>>>>> key, provide value1 followed by value2 to user >> > >>>>>>>>>>> look for key.01, hit, empty, return no values to user >> > >>>>>>>>>>> look for key.1, hit, sort all contained values using >> secondary >> > >>>>>>>>>>> key, provide value3 to user >> > >>>>>>>>>>> we have walked the entire prefix space, signal end of >> iterable >> > >>>>>>>>>>> >> > >>>>>>>>>>> Some notes for the above: >> > >>>>>>>>>>> * The dummy value is used to know that the key contains >> values >> > >>>>>>>>>>> and the token is to know whether there are any values >> deeper in the trie so >> > >>>>>>>>>>> when we know when to stop searching. >> > >>>>>>>>>>> * If we can recalculate the sort key from the combination >> of the >> > >>>>>>>>>>> key and value, then we don't need to store it. >> > >>>>>>>>>>> * Keys with lots of values will perform worse then keys with >> > >>>>>>>>>>> less values since we have to look up more keys but they >> will be empty >> > >>>>>>>>>>> reads. The number of misses can be controlled by how many >> elements we are >> > >>>>>>>>>>> willing to store at a given node before we subdivide. >> > >>>>>>>>>>> >> > >>>>>>>>>>> In reality you could build a lot of structures (e.g. red >> black >> > >>>>>>>>>>> tree, binary tree) using the sort key, the issue is the >> cost of >> > >>>>>>>>>>> rebalancing/re-organizing the structure in map form and >> whether it has a >> > >>>>>>>>>>> convenient pre-order traversal for lookups. >> > >>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax < >> re...@google.com> >> > >>>>>>>>>>> wrote: >> > >>>>>>>>>>> >> > >>>>>>>>>>>> Some great comments! >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> *Aljoscha*: absolutely this would have to be implemented by >> > >>>>>>>>>>>> runners to be efficient. We can of course provide a >> default (inefficient) >> > >>>>>>>>>>>> implementation, but ideally runners would provide better >> ones. >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> *Jan* Exactly. I think MapState can be dropped or backed by >> > >>>>>>>>>>>> this. E.g. >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> *Robert* Great point about standard coders not satisfying >> > >>>>>>>>>>>> this. That's why I suggested that we provide a way to tag >> the coders that >> > >>>>>>>>>>>> do preserve order, and only accept those as key coders >> Alternatively we >> > >>>>>>>>>>>> could present a more limited API - e.g. only allowing a >> hard-coded set of >> > >>>>>>>>>>>> types to be used as keys - but that seems counter to the >> direction Beam >> > >>>>>>>>>>>> usually goes. So users will have two ways .of creating >> multimap state specs: >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> private final StateSpec<MultimapState<Long, String>> >> state = >> > >>>>>>>>>>>> StateSpecs.multimap(VarLongCoder.of(), >> StringUtf8Coder.of()); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> or >> > >>>>>>>>>>>> private final StateSpec<MultimapState<Long, String>> >> state = >> > >>>>>>>>>>>> StateSpecs.orderedMultimap(VarLongCoder.of(), >> StringUtf8Coder.of()); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> The second one will validate that the key coder preserves >> > >>>>>>>>>>>> order, and fails otherwise (similar to coder determinism >> checking in >> > >>>>>>>>>>>> GroupByKey). (BTW we would also have versions of these >> functions that use >> > >>>>>>>>>>>> coder inference to "guess" the coder, but those will do >> the same checking) >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> Also the API I proposed did support random access! We could >> > >>>>>>>>>>>> separate out OrderedBagState again if we think the use >> cases are >> > >>>>>>>>>>>> fundamentally different. I merged the proposal into that >> of MultimapState >> > >>>>>>>>>>>> because there seemed be 99% overlap. >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> Reuven >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw < >> > >>>>>>>>>>>> rober...@google.com> wrote: >> > >>>>>>>>>>>> >> > >>>>>>>>>>>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax < >> re...@google.com> >> > >>>>>>>>>>>>> wrote: >> > >>>>>>>>>>>>> > >> > >>>>>>>>>>>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay < >> > >>>>>>>>>>>>> al...@google.com> wrote: >> > >>>>>>>>>>>>> >> >> > >>>>>>>>>>>>> >> >> > >>>>>>>>>>>>> >> >> > >>>>>>>>>>>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik < >> > >>>>>>>>>>>>> lc...@google.com> wrote: >> > >>>>>>>>>>>>> >>> >> > >>>>>>>>>>>>> >>> >> > >>>>>>>>>>>>> >>> >> > >>>>>>>>>>>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang < >> > >>>>>>>>>>>>> ruw...@google.com> wrote: >> > >>>>>>>>>>>>> >>>>> >> > >>>>>>>>>>>>> >>>>> A few obvious problems with this code: >> > >>>>>>>>>>>>> >>>>> 1. Removing the elements already processed from >> the >> > >>>>>>>>>>>>> bag requires clearing and rewriting the entire bag. This >> is O(n^2) in the >> > >>>>>>>>>>>>> number of input trades. >> > >>>>>>>>>>>>> >>>> >> > >>>>>>>>>>>>> >>>> why it's not O(2 * n) to clearing and rewriting trade >> > >>>>>>>>>>>>> state? >> > >>>>>>>>>>>>> >>>> >> > >>>>>>>>>>>>> >>>>> >> > >>>>>>>>>>>>> >>>>> public interface SortedMultimapState<K, V> extends >> State >> > >>>>>>>>>>>>> { >> > >>>>>>>>>>>>> >>>>> // Add a value to the map. >> > >>>>>>>>>>>>> >>>>> void put(K key, V value); >> > >>>>>>>>>>>>> >>>>> // Get all values for a given key. >> > >>>>>>>>>>>>> >>>>> ReadableState<Iterable<V>> get(K key); >> > >>>>>>>>>>>>> >>>>> // Return all entries in the map. >> > >>>>>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> allEntries(); >> > >>>>>>>>>>>>> >>>>> // Return all entries in the map with keys <= >> limit. >> > >>>>>>>>>>>>> returned elements are sorted by the key. >> > >>>>>>>>>>>>> >>>>> ReadableState<Iterable<KV<K, V>>> entriesUntil(K >> > >>>>>>>>>>>>> limit); >> > >>>>>>>>>>>>> >>>>> >> > >>>>>>>>>>>>> >>>>> // Remove all values with the given key; >> > >>>>>>>>>>>>> >>>>> void remove(K key); >> > >>>>>>>>>>>>> >>>>> // Remove all entries in the map with keys <= >> limit. >> > >>>>>>>>>>>>> >>>>> void removeUntil(K limit); >> > >>>>>>>>>>>>> >>>> >> > >>>>>>>>>>>>> >>>> Will removeUntilExcl(K limit) also useful? It will >> remove >> > >>>>>>>>>>>>> all entries in the map with keys < limit. >> > >>>>>>>>>>>>> >>>> >> > >>>>>>>>>>>>> >>>>> >> > >>>>>>>>>>>>> >>>>> Runners will sort based on the encoded value of the >> key. >> > >>>>>>>>>>>>> In order to make this easier for users, I propose that we >> introduce a new >> > >>>>>>>>>>>>> tag on Coders PreservesOrder. A Coder that contains this >> tag guarantees >> > >>>>>>>>>>>>> that the encoded value preserves the same ordering as the >> base Java type. >> > >>>>>>>>>>>>> >>>> >> > >>>>>>>>>>>>> >>>> >> > >>>>>>>>>>>>> >>>> Could you clarify what is "encoded value preserves >> the >> > >>>>>>>>>>>>> same ordering as the base Java type"? >> > >>>>>>>>>>>>> >>> >> > >>>>>>>>>>>>> >>> >> > >>>>>>>>>>>>> >>> Lets say A and B represent two different instances of >> the >> > >>>>>>>>>>>>> same Java type like a double, then A < B (using the >> languages comparison >> > >>>>>>>>>>>>> operator) iff encode(A) < encode(B) (note the encoded >> versions are compared >> > >>>>>>>>>>>>> lexicographically) >> > >>>>>>>>>>>>> >> >> > >>>>>>>>>>>>> >> >> > >>>>>>>>>>>>> >> Since coders are shared across SDKs, do we expect A < >> B iff >> > >>>>>>>>>>>>> e(A) < e(P) property to hold for all languages we >> support? What happens A, >> > >>>>>>>>>>>>> B sort differently in different languages? >> > >>>>>>>>>>>>> > >> > >>>>>>>>>>>>> > >> > >>>>>>>>>>>>> > That would have to be the property of the coder (which >> means >> > >>>>>>>>>>>>> that this property probably needs to be represented in >> the portability >> > >>>>>>>>>>>>> representation of the coder). I imagine the common use >> cases will be for >> > >>>>>>>>>>>>> simple coders like int, long, string, etc., which are >> likely to sort the >> > >>>>>>>>>>>>> same in most languages. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> The standard coders for both double and integral types do >> not >> > >>>>>>>>>>>>> respect >> > >>>>>>>>>>>>> the natural ordering (consider negative values). KV coders >> > >>>>>>>>>>>>> violate the >> > >>>>>>>>>>>>> "natural" lexicographic ordering on components as well. I >> think >> > >>>>>>>>>>>>> implicitly sorting on encoded value would yield many >> > >>>>>>>>>>>>> surprises. (The >> > >>>>>>>>>>>>> state, of course, could take a order-preserving, bytes >> > >>>>>>>>>>>>> (string?)-producing callable as a parameter of course). >> (As for >> > >>>>>>>>>>>>> naming, I'd probably call this OrderedBagState or >> something >> > >>>>>>>>>>>>> like >> > >>>>>>>>>>>>> that...rather than Map which tends to imply random >> access.) >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >> >