Hi, absolutely +1 to add this to the model, but does this imply that
MapState can be dropped (or backed by this)? It can have different insert or
delete time complexity (O(1)) instead of O(logn).

Jan

---------- Původní e-mail ----------
Od: Aljoscha Krettek <aljos...@apache.org>
Komu: dev@beam.apache.org
Datum: 24. 5. 2019 10:56:45
Předmět: Re: DISCUSS: Sorted MapState API
"
This is quite interesting! The Flink Table API (relational and SQL) has an
implementation for the type of join you mention in the example. We call it
Temporal Table Join, and it works on something we call Temporal Tables: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/
temporal_tables.html
(https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html)



The implementation would have benefited from something like a Sorted
MapState and we actually discussed adding such a state type during
implementation. You can still see that in the TODOs here, actually: https://
github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-
table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/
join/TemporalRowtimeJoin.scala#L95
(https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95)




So I’m +1 for this, but someone would have to implement that for the
different Runners as well. 😅




Aljoscha



"
On 24. May 2019, at 05:32, Reuven Lax <re...@google.com
(mailto:re...@google.com)> wrote:






On Thu, May 23, 2019 at 1:53 PM Ahmet Altay <al...@google.com
(mailto:al...@google.com)> wrote:

"






On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik <lc...@google.com
(mailto:lc...@google.com)> wrote:

"






On Thu, May 23, 2019 at 11:37 AM Rui Wang <ruw...@google.com
(mailto:ruw...@google.com)> wrote:

"

"

A few obvious problems with this code:


  1. Removing the elements already processed from the bag requires clearing
and rewriting the entire bag. This is O(n^2) in the number of input trades.

"
why it's not O(2 * n) to clearing and rewriting trade state?



"




public interface SortedMultimapState<K, V> extends State {

  // Add a value to the map.

  void put(K key, V value);

  // Get all values for a given key. 

  ReadableState<Iterable<V>> get(K key);

 // Return all entries in the map.

  ReadableState<Iterable<KV<K, V>>> allEntries();

  // Return all entries in the map with keys <= limit. returned elements are
sorted by the key.


  ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit); 


""


 // Remove all values with the given key;

  void remove(K key);

 // Remove all entries in the map with keys <= limit.

  void removeUntil(K limit);


"
Will removeUntilExcl(K limit) also useful? It will remove all entries in the
map with keys < limit.

 
"

Runners will sort based on the encoded value of the key. In order to make 
this easier for users, I propose that we introduce a new tag on Coders 
PreservesOrder. A Coder that contains this tag guarantees that the encoded
value preserves the same ordering as the base Java type.

"



Could you clarify what is  "encoded value preserves the same ordering as the
base Java type"?






"



Lets say A and B represent two different instances of the same Java type 
like a double, then A < B (using the languages comparison operator) iff 
encode(A) < encode(B) (note the encoded versions are compared
lexicographically)


"



Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P) 
property to hold for all languages we support? What happens A, B sort
differently in different languages? 


"



That would have to be the property of the coder (which means that this
property probably needs to be represented in the portability representation
of the coder). I imagine the common use cases will be for simple coders like
int, long, string, etc., which are likely to sort the same in most
languages.

 
"

"


 

 
"





-Rui


"

"

"

"



"

Reply via email to