Zooming in from generic philosophy to be clear: adding time
ordered buffer to the Fn state API is *not* a shortcut.It has
benefits that will not be achieved by SDK-side implementation on
top of either ordered or unordered multimap. Are those benefits
worth expanding the API? I don't know.
A change to allow a runner to have a specialized implementation
for time-buffered state would be one or more StateKey types,
right? Reuven, maybe put this and your Java API in a doc? A BIP?
Seems like there's at least the following to explore:
- how that Java API would map to an SDK-side implementation on
top of multimap state key
- how that Java API would map to a new StateKey
- whether there's actually more than one relevant implementation
of that StateKey
- whether SDK-side implementation on some other state key would
be performant enough in all SDK languages (present and future)
Zooming back out to generic philosophy: Proliferation of StateKey
types tuned by runners (which can very easily still share
implementation) is probably better than proliferation of complex
SDK-side implementations with varying completeness and performance.
Kenn
On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
It might help for me to describe what I have in mind. I'm
still proposing that we build multimap, just not a
globally-sorted multimap.
My previous proposal was that we provide a Multimap<Key,
Value> state type, sorted by key. this would have two
additional operations - multimap.getRange(startKey, endKey)
and multimap.deleteRange(startKey, endKey). The primary use
case was timestamp sorting, but I felt that a sorted multimap
provided a nice generalization - after all, you can simply
key the multimap by timestamp to get timestamp sorting.
This approach had some issues immediately that would take
some work to solve. Since a multimap key can have any type
and a runner will only be able to sort by encoded type, we
would need to introduce a concept of order-preserving coders
into Beam and plumb that through. Robert pointed out that
even our existing standard coders for simple integral types
don't preserve order, so there will likely be surprises here.
My current proposal is for a multimap that is not sorted by
key, but that can support.ordered values for a single key.
Remember that a multimap maps K -> Iterable<V>, so this means
that each individual Iterable<V> is ordered, but the keys
have no specific order relative to each other. This is not
too different from many multimap implementations where the
keys are unordered, but the list of values for a single key
at least has a stable order.
The interface would look like this:
public interface MultimapState<K, V> extends State {
// Add a value with a default timestamp.
void put(K key, V value);
// Add a timestamped value.
void put(K, key, TimestampedValue<V> value);
// Remove all values for a key.
void remove (K key);
// Remove all values for a key with timestamps within the
specified range.
void removeRange(K key, Instant startTs, Instant endTs);
// Get an Iterable of values for V. The Iterable will be
returned sorted by timestamp.
ReadableState<Iterable<TimestampedValue<V>>> get(K key);
// Get an Iterable of values for V in the specified range.
The Iterable will be returned sorted by timestamp.
ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
Instant startTs, Instant endTs);
ReadableState<Iterable<K>> keys();
ReadableState<Iterable<TimestampedValue<V>>> values();
ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>>
entries;
}
We can of course provide helper functions that allow using
MultimapState without deailing with TimestampValue for users
who only want a multimap and don't want sorting.
I think many users will only need a single sorted list - not
a full multimap. It's worth offering this as well, and we can
simply build it on top of MultimapState. It will look like an
extension of BagState
public interface TimestampSortedListState<T> extends State {
void add(TimestampedValue<T> value);
Iterable<TimestampedValue<T>> read();
Iterable<TimestampedValue<T>> readRange(Instant startTs,
Instant endTs);
void clearRange(Instant startTs, Instant endTs);
}
On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:
The portability layer is meant to live across multiple
versions of Beam and I don't think it should be treated
by doing the simple and useful thing now since I believe
it will lead to a proliferation of the API.
On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles
<[email protected] <mailto:[email protected]>> wrote:
I have thoughts on the subject of whether to have
APIs just for the lowest-level building blocks versus
having APIs for higher-level constructs. Specifically
this applies to providing only unsorted multimap vs
what I will call "time-ordered buffer". TL;DR: I'd
vote to focus on time-ordered buffer; if it turns out
to be easy to go all the way to sorted multimap
that's nice-to-have; if it turns out to be easy to
implement on top of unsorted map state that should
probably be under the hood
Reasons to build low-level multimap in the runner &
fn api and layer higher-level things in the SDK:
- It is less implementation for runners if they have
to only provide fewer lower-level building blocks
like multimap state.
- There are many more runners than SDKs (and will be
even more and more) so this saves overall.
Reasons to build higher-level constructs directly in
the runner and fn api:
- Having multiple higher-level state types may
actually be less implementation than one complex
state type, especially if they map to runner primitives.
- The runner may have better specialized
implementations, especially for something like a
time-ordered buffer.
- The particular access patterns in an SDK-based
implementation may not be ideal for each runner's
underlying implementation of the low-level building
block.
- There may be excessive gRPC overhead even for
optimal access patterns.
There are ways to have best of both worlds, like:
1. Define multiple state types according to
fundamental access patterns, like we did this before
portability.
2. If it is easy to layer one on top of the other, do
that inside the runner. Provide shared code so for
runners providing the lowest-level primitive they get
all the types for free.
I understand that this is an oversimplification. It
still creates some more work. And APIs are a burden
so it is good to introduce as few as possible for
maintenance. But it has performance benefits and also
unblocks "just doing the simple and useful thing now"
which I always like to do as long as it is compatible
with future changes. If the APIs are fundamental,
like sets, maps, timestamp ordering, then it is safe
to guess that they will change rarely and be useful
forever.
Kenn
On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik
<[email protected] <mailto:[email protected]>> wrote:
I would be glad to take a stab at how to provide
sorting on top of unsorted multimap state.
Based upon your description, you want integer
keys representing timestamps and arbitrary user
value for the values, is that correct?
What kinds of operations do you need on the
sorted map state in order of efficiency requirements?
(e.g. Next(x), Previous(x), GetAll(Range[x, y)),
ClearAll(Range[x, y))
What kinds of operations do we expect the
underlying unsorted map state to be able to provide?
(at a minimum Get(K), Append(K), Clear(K) but
what else e.g. enumerate(K)?)
I went through a similar exercise of how to
provide a list like side input view over a
multimap[1] side input which efficiently allowed
computation of size and provided random access
while only having access to get(K) and enumerate K's.
1:
https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax
<[email protected] <mailto:[email protected]>> wrote:
Bringing this subject up again,
I've spent some time looking into
implementing this for the Dataflow runner.
I'm unable to find a way to implement the
arbitrary sorted multimap efficiently for the
case where there are large numbers of unique
keys. Since the primary driving use case is
timestamp ordering (i.e. key is event
timestamp), you would expect to have nearly a
new key per element. I considered Luke's
suggestion above, but unfortunately it
doesn't really solve this issue.
The primary use case for sorting always seems
to be sorting by timestamp. I want to propose
that instead of building the fully-general
sorted multimap, we instead focus on a state
type where the sort key is an integral type
(like a timestamp or an integer). There is
still a valid use case for multimap, but we
can provide that as an unordered state. At
least for Dataflow, it will be much easier
While my difficulties here may be specific to
the Dataflow runner, any such support would
have to be built into other runners as well,
and limiting to integral sorting likely makes
it easier for other runners to implement
this. Also, if you look at this
<https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
Flink
comment pointed out by Aljoscha, for Flink
the main use case identified was also
timestamp sorting. This will also simplify
the API design for this feature: Sorted
multimap with arbitrary keys would require us
to introduce a way of mapping natural
ordering to encoded ordering (i.e. a new
OrderPreservingCoder), but if we limit sort
keys to integral types, the API design is
simpler as integral types can be represented
directly.
Reuven
On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax
<[email protected] <mailto:[email protected]>>
wrote:
This sounds to me like a potential runner
strategy. However if a runner can
natively support sorted maps (e.g. we
expect the Dataflow runner to be able to
do so, and I think it would be useful for
other runners as well), then it's
probably preferable to allow the runner
to use its native capabilities.
On Fri, May 24, 2019 at 11:05 AM Lukasz
Cwik <[email protected]
<mailto:[email protected]>> wrote:
For the API that you proposed, the
map key is always "void" and the sort
key == user key. So in my example of
key: dummy value
key.000: token, (0001, value4)
key.001: token, (0010, value1),
(0011, value2)
key.01: token
key.1: token, (1011, value3)
you would have:
"void": dummy value
"void".000: token, (0001, value4)
"void".001: token, (0010, value1),
(0011, value2)
"void".01: token
"void".1: token, (1011, value3)
Iterable<KV<K, V>> entriesUntil(K
limit) translates into walking the
the prefixes until you find a common
prefix for K and then filter for
values where they have a sort key <=
K. Using the example above, to find
entriesUntil(0010) you would:
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, sort all
contained values using secondary key,
provide value4 to user
look for key.001, hit, notice that
001 is a prefix of 0010 so we sort
all contained values using secondary
key, filter out value2 and provide value1
void removeUntil(K limit) also
translates into walking the prefixes
but instead we will clear them when
we have a "hit" with some special
logic for when the sort key is a
prefix of the key. Used the example,
to removeUntil(0010) you would:
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, clear
look for key.001, hit, notice that
001 is a prefix of 0010 so we sort
all contained values using secondary
key, store in memory all values that
> 0010, clear and append values
stored in memory.
On Fri, May 24, 2019 at 10:36 AM
Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
Can you explain how fetching and
deleting ranges of keys would
work with this data structure?
On Fri, May 24, 2019 at 9:50 AM
Lukasz Cwik <[email protected]
<mailto:[email protected]>> wrote:
Reuven, for the example, I
assume that we never want to
store more then 2 values at a
given sort key prefix, and if
we do then we will create a
new longer prefix splitting
up the values based upon the
sort key.
Tuple representation in
examples below is (key, sort
key, value) and . is a
character outside of the
alphabet which can be
represented by using an
escaping encoding that wraps
the key + sort key encoding.
To insert (key, 0010,
value1), we lookup "key" +
all the prefixes of 0010
finding one that is not
empty. In this case its 0, so
we append value to the map at
key.0 ending up with (we also
set the key to any dummy
value to know that it it
contains values):
key: dummy value
key."": token, (0010, value1)
Now we insert (key, 0011,
value2), we again lookup
"key" + all the prefixes of
0010, finding "", so we
append value2 to key.""
ending up with:
key: dummy value
key."": token, (0010,
value1), (0011, value2)
Now we insert (key, 1011,
value3), we again lookup
"key" + all the prefixes of
1011 finding "" but notice
that it is full, so we
partition all the values into
two prefixes 0 and 1. We also
clear the "" prefix ending up
with:
key: dummy value
key.0: token, (0010, value1),
(0011, value2)
key.1: token, (1011, value3)
Now we insert (key, 0001,
value4), we again lookup
"key" + all the prefixes of
the value finding 0 but
notice that it is full, so we
partition all the values into
two prefixes 00 and 01 but
notice this doesn't help us
since 00 will be too full so
we split 00 again to 000,
001. We also clear the 0
prefix ending up with:
key: dummy value
key.000: token, (0001, value4)
key.001: token, (0010,
value1), (0011, value2)
key.01: token
key.1: token, (1011, value3)
We are effectively building a
trie[1] where we only have
values at the leaves and
control how full each leaf
can be. There are other trie
representations like a radix
tree that may be better.
Looking up the values in
sorted order for "key" would
go like this:
Is key set, yes
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, sort
all contained values using
secondary key, provide value4
to user
look for key.001, hit, sort
all contained values using
secondary key, provide value1
followed by value2 to user
look for key.01, hit, empty,
return no values to user
look for key.1, hit, sort all
contained values using
secondary key, provide value3
to user
we have walked the entire
prefix space, signal end of
iterable
Some notes for the above:
* The dummy value is used to
know that the key contains
values and the token is to
know whether there are any
values deeper in the trie so
when we know when to stop
searching.
* If we can recalculate the
sort key from the combination
of the key and value, then we
don't need to store it.
* Keys with lots of values
will perform worse then keys
with less values since we
have to look up more keys but
they will be empty reads. The
number of misses can be
controlled by how many
elements we are willing to
store at a given node before
we subdivide.
In reality you could build a
lot of structures (e.g. red
black tree, binary tree)
using the sort key, the issue
is the cost of
rebalancing/re-organizing the
structure in map form and
whether it has a convenient
pre-order traversal for lookups.
On Fri, May 24, 2019 at 8:14
AM Reuven Lax
<[email protected]
<mailto:[email protected]>> wrote:
Some great comments!
*Aljoscha*: absolutely
this would have to be
implemented by runners to
be efficient. We can of
course provide a default
(inefficient)
implementation, but
ideally runners would
provide better ones.
*Jan* Exactly. I think
MapState can be dropped
or backed by this. E.g.
*Robert* Great point
about standard coders not
satisfying this. That's
why I suggested that we
provide a way to tag the
coders that do preserve
order, and only accept
those as key coders
Alternatively we could
present a more limited
API - e.g. only allowing
a hard-coded set of types
to be used as keys - but
that seems counter to the
direction Beam usually
goes. So users will have
two ways .of creating
multimap state specs:
private final
StateSpec<MultimapState<Long,
String>> state =
StateSpecs.multimap(VarLongCoder.of(),
StringUtf8Coder.of());
or
private final
StateSpec<MultimapState<Long,
String>> state =
StateSpecs.orderedMultimap(VarLongCoder.of(),
StringUtf8Coder.of());
The second one will
validate that the key
coder preserves order,
and fails otherwise
(similar to coder
determinism checking in
GroupByKey). (BTW we
would also have versions
of these functions that
use coder inference to
"guess" the coder, but
those will do the same
checking)
Also the API I proposed
did support
random access! We could
separate out
OrderedBagState again if
we think the use cases
are fundamentally
different. I merged the
proposal into that of
MultimapState because
there seemed be 99% overlap.
Reuven
On Fri, May 24, 2019 at
6:19 AM Robert Bradshaw
<[email protected]
<mailto:[email protected]>>
wrote:
On Fri, May 24, 2019
at 5:32 AM Reuven Lax
<[email protected]
<mailto:[email protected]>>
wrote:
>
> On Thu, May 23,
2019 at 1:53 PM Ahmet
Altay
<[email protected]
<mailto:[email protected]>>
wrote:
>>
>>
>>
>> On Thu, May 23,
2019 at 1:38 PM
Lukasz Cwik
<[email protected]
<mailto:[email protected]>>
wrote:
>>>
>>>
>>>
>>> On Thu, May 23,
2019 at 11:37 AM Rui
Wang
<[email protected]
<mailto:[email protected]>>
wrote:
>>>>>
>>>>> A few obvious
problems with this code:
>>>>> 1. Removing
the elements already
processed from the
bag requires clearing
and rewriting the
entire bag. This is
O(n^2) in the number
of input trades.
>>>>
>>>> why it's not O(2
* n) to clearing and
rewriting trade state?
>>>>
>>>>>
>>>>> public
interface
SortedMultimapState<K,
V> extends State {
>>>>> // Add a
value to the map.
>>>>> void put(K
key, V value);
>>>>> // Get all
values for a given key.
>>>>>
ReadableState<Iterable<V>>
get(K key);
>>>>> // Return all
entries in the map.
>>>>>
ReadableState<Iterable<KV<K,
V>>> allEntries();
>>>>> // Return all
entries in the map
with keys <= limit.
returned elements are
sorted by the key.
>>>>>
ReadableState<Iterable<KV<K,
V>>> entriesUntil(K
limit);
>>>>>
>>>>> // Remove all
values with the given
key;
>>>>> void remove(K
key);
>>>>> // Remove all
entries in the map
with keys <= limit.
>>>>> void
removeUntil(K limit);
>>>>
>>>> Will
removeUntilExcl(K
limit) also useful?
It will remove all
entries in the map
with keys < limit.
>>>>
>>>>>
>>>>> Runners will
sort based on the
encoded value of the
key. In order to make
this easier for
users, I propose that
we introduce a new
tag on Coders
PreservesOrder. A
Coder that contains
this tag guarantees
that the encoded
value preserves the
same ordering as the
base Java type.
>>>>
>>>>
>>>> Could you
clarify what is
"encoded value
preserves the same
ordering as the base
Java type"?
>>>
>>>
>>> Lets say A and B
represent two
different instances
of the same Java type
like a double, then A
< B (using the
languages comparison
operator) iff
encode(A) < encode(B)
(note the encoded
versions are compared
lexicographically)
>>
>>
>> Since coders are
shared across SDKs,
do we expect A < B
iff e(A) < e(P)
property to hold for
all languages we
support? What happens
A, B sort differently
in different languages?
>
>
> That would have to
be the property of
the coder (which
means that this
property probably
needs to be
represented in the
portability
representation of the
coder). I imagine the
common use cases will
be for simple coders
like int, long,
string, etc., which
are likely to sort
the same in most
languages.
The standard coders
for both double and
integral types do not
respect
the natural ordering
(consider negative
values). KV coders
violate the
"natural"
lexicographic
ordering on
components as well. I
think
implicitly sorting on
encoded value would
yield many surprises.
(The
state, of course,
could take a
order-preserving, bytes
(string?)-producing
callable as a
parameter of course).
(As for
naming, I'd probably
call this
OrderedBagState or
something like
that...rather than
Map which tends to
imply random access.)