Hi Jan,

You could associate a key to each element of your Key's list (e.g., hashing the 
value), keep only the keys in heap (e.g., in a list) and the associated state 
key-value/s in an external store like RocksDB/Redis, but you will notice large 
overheads due to de/serializing - a huge penatly for more than 1000s of 
elements (see https://hal.inria.fr/hal-01530744/document 
<https://hal.inria.fr/hal-01530744/document> for some experimental settings) 
for relatively small rate of new events per Key, if needed to process all 
values of a Key for each new event. Best case you can do some incremental 
processing unless your non-combining means non-associative operations per Key.

Best,
Ovidiu
> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
> 
> Hi Fabian,
> 
> thanks for quick reply, what you suggest seems to work at first sight, I will 
> try it. Is there any reason not to implement a RocksDBListState this way in 
> general? Is there any increased overhead of this approach?
> 
> Thanks,
> 
>  Jan
> 
> 
> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>> Hi Jan,
>> 
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
>> 
>> Best, Fabian
>> 
>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>> 
>>> Hi all,
>>> 
>>> I have a question that appears as a user@ question, but brought me into
>>> the dev@ mailing list while I was browsing through the Flink's source
>>> codes. First I'll try to briefly describe my use case. I'm trying to do a
>>> group-by-key operation with a limited number of distinct keys (which I
>>> cannot control), but a non trivial count of values. The operation in the
>>> GBK is non-combining, so that all values per key (many) have to be stored
>>> in a state. Running this on testing data led to a surprise (for me), that
>>> even when using RocksDBStateBackend, the whole list of data is serialized
>>> into single binary blob and then deserialized into List, and therefore has
>>> to fit in memory (multiple times, in fact).
>>> 
>>> I tried to create an alternative RocksDBStateBackend, that would store
>>> each element of list in ListState to a separate key in RocksDB, so that the
>>> whole blob would not have to be loaded by a single get, but a scan over
>>> multiple keys could be made. Digging into the source code I found there was
>>> a hierarchy of classes mirroring the public API in 'internal' package -
>>> InternalKvState, InternalMergingState, InternalListState, and so on. These
>>> classes however have different hierarchy than the public API classes that
>>> they mirror, most notably InternalKvState is superinterface of all others.
>>> This fact seems to be used on multiple places throughout the source code.
>>> 
>>> My question is - is this intentional? Would it be possible to store each
>>> element of a ListState in a separate key in RocksDB (probably by adding
>>> some suffix to the actual key of the state for each element)? What are the
>>> pitfalls? And is it necessary for the InternalListState to be actually
>>> subinterface of InternalKvState? I find this to be a related problem.
>>> 
>>> Many thanks for any comments or thoughts,
>>> 
>>>  Jan
>>> 
>>> 
> 

Reply via email to