Matthias Schwalbe created FLINK-26585:
-----------------------------------------
Summary: State Processor API: Loading a state set buffers the
whole state set in memory before starting to process
Key: FLINK-26585
URL: https://issues.apache.org/jira/browse/FLINK-26585
Project: Flink
Issue Type: Improvement
Components: API / State Processor
Affects Versions: 1.14.0, 1.13.0
Reporter: Matthias Schwalbe
* When loading a state, MultiStateKeyIterator load and bufferes the whole state
in memory before it event processes a single data point
** This is absolutely no problem for small state (hence the unit tests work
fine)
** MultiStateKeyIterator ctor sets up a java Stream that iterates all state
descriptors and flattens all datapoints contained within
** The java.util.stream.Stream#flatMap function causes the buffering of the
whole data set when enumerated later on
** See call stack [1]
*** I our case this is 150e6 data points (> 1GiB just for the pointers to the
data, let alone the data itself ~30GiB)
** I’m not aware of some instrumentation if Stream in order to avoid the
problem, hence
** I coded an alternative implementation of MultiStateKeyIterator that avoids
using java Stream,
** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
[1]
Streams call stack:
hasNext:77, RocksStateKeysIterator
(org.apache.flink.contrib.streaming.state.iterator)
next:82, RocksStateKeysIterator
(org.apache.flink.contrib.streaming.state.iterator)
forEachRemaining:116, Iterator (java.util)
forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
forEach:580, ReferencePipeline$Head (java.util.stream)
accept:270, ReferencePipeline$7$1 (java.util.stream)
# <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends
Stream<? extends R>> var1)
accept:373, ReferencePipeline$11$1 (java.util.stream)
# Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
accept:193, ReferencePipeline$3$1 (java.util.stream)
# <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1)
tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator
(java.util.stream)
getAsBoolean:-1, 1528195520
(java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator
(java.util.stream)
doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream)
tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
hasNext:681, Spliterators$1Adapter (java.util)
hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
hasNext:162, KeyedStateReaderOperator$NamespaceDecorator
(org.apache.flink.state.api.input.operator)
reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
doRun:776, Task (org.apache.flink.runtime.taskmanager)
run:563, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)