[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Schwalbe updated FLINK-26585:
--------------------------------------
    Description: 
* When loading a state, MultiStateKeyIterator load and bufferes the whole state 
in memory before it event processes a single data point 
 ** This is absolutely no problem for small state (hence the unit tests work 
fine)
 ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
descriptors and flattens all datapoints contained within
 ** The java.util.stream.Stream#flatMap function causes the buffering of the 
whole data set when enumerated later on
 ** See call stack [1] 
 *** I our case this is 150e6 data points (> 1GiB just for the pointers to the 
data, let alone the data itself ~30GiB)
 ** I’m not aware of some instrumentation of Stream in order to avoid the 
problem, hence
 ** I coded an alternative implementation of MultiStateKeyIterator that avoids 
using java Stream,
 ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)

[1]

Streams call stack:

hasNext:77, RocksStateKeysIterator 
(org.apache.flink.contrib.streaming.state.iterator)

next:82, RocksStateKeysIterator 
(org.apache.flink.contrib.streaming.state.iterator)

forEachRemaining:116, Iterator (java.util)

forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)

forEach:580, ReferencePipeline$Head (java.util.stream)

accept:270, ReferencePipeline$7$1 (java.util.stream)                            
           # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends 
Stream<? extends R>> var1)

accept:373, ReferencePipeline$11$1 (java.util.stream)                           
           # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)

accept:193, ReferencePipeline$3$1 (java.util.stream)                            
           # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1)

tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)

lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator 
(java.util.stream)

getAsBoolean:-1, 1528195520 
(java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)

fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
(java.util.stream)

doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)

tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)

hasNext:681, Spliterators$1Adapter (java.util)

hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)

hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
(org.apache.flink.state.api.input.operator)

reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)

invoke:191, DataSourceTask (org.apache.flink.runtime.operators)

doRun:776, Task (org.apache.flink.runtime.taskmanager)

run:563, Task (org.apache.flink.runtime.taskmanager)

run:748, Thread (java.lang)

  was:
* When loading a state, MultiStateKeyIterator load and bufferes the whole state 
in memory before it event processes a single data point 
** This is absolutely no problem for small state (hence the unit tests work 
fine)
** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
descriptors and flattens all datapoints contained within
** The java.util.stream.Stream#flatMap function causes the buffering of the 
whole data set when enumerated later on
** See call stack [1] 
*** I our case this is 150e6 data points (> 1GiB just for the pointers to the 
data, let alone the data itself ~30GiB)
** I’m not aware of some instrumentation if Stream in order to avoid the 
problem, hence
** I coded an alternative implementation of MultiStateKeyIterator that avoids 
using java Stream,
** I can contribute our implementation (MultiStateKeyIteratorNoStreams)

[1]

Streams call stack:

hasNext:77, RocksStateKeysIterator 
(org.apache.flink.contrib.streaming.state.iterator)

next:82, RocksStateKeysIterator 
(org.apache.flink.contrib.streaming.state.iterator)

forEachRemaining:116, Iterator (java.util)

forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)

forEach:580, ReferencePipeline$Head (java.util.stream)

accept:270, ReferencePipeline$7$1 (java.util.stream)                            
           # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends 
Stream<? extends R>> var1)

accept:373, ReferencePipeline$11$1 (java.util.stream)                           
           # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)

accept:193, ReferencePipeline$3$1 (java.util.stream)                            
           # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1)

tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)

lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator 
(java.util.stream)

getAsBoolean:-1, 1528195520 
(java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)

fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
(java.util.stream)

doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)

tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)

hasNext:681, Spliterators$1Adapter (java.util)

hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)

hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
(org.apache.flink.state.api.input.operator)

reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)

invoke:191, DataSourceTask (org.apache.flink.runtime.operators)

doRun:776, Task (org.apache.flink.runtime.taskmanager)

run:563, Task (org.apache.flink.runtime.taskmanager)

run:748, Thread (java.lang)


> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26585
>                 URL: https://issues.apache.org/jira/browse/FLINK-26585
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / State Processor
>    Affects Versions: 1.13.0, 1.14.0
>            Reporter: Matthias Schwalbe
>            Priority: Critical
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)                          
>              # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends 
> Stream<? extends R>> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream)                         
>              # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)                          
>              # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to