[
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17532201#comment-17532201
]
Matthias Schwalbe commented on FLINK-26585:
-------------------------------------------
I waited for the release of Flink 1.15.0 in order to see whether the problem
remains (it does).
I found the root cause in this Java Bug ticket [1]which apparently won't be
fixed.
The proposed solution (replaced MultiStateIterator implementation) would help.
As said, the bug gets annoying only when loading bigger state primitives, which
can even cause OOM Exceptions in the underlying java.util.stream.SpinedBuffer.
[1] https://bugs.openjdk.java.net/browse/JDK-8267359
> State Processor API: Loading a state set buffers the whole state set in
> memory before starting to process
> ---------------------------------------------------------------------------------------------------------
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
> Issue Type: Bug
> Components: API / State Processor
> Affects Versions: 1.13.0, 1.14.0, 1.15.0
> Reporter: Matthias Schwalbe
> Priority: Major
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole
> state in memory before it event processes a single data point
> ** This is absolutely no problem for small state (hence the unit tests work
> fine)
> ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state
> descriptors and flattens all datapoints contained within
> ** The java.util.stream.Stream#flatMap function causes the buffering of the
> whole data set when enumerated later on
> ** See call stack [1]
> *** I our case this is 150e6 data points (> 1GiB just for the pointers to
> the data, let alone the data itself ~30GiB)
> ** I’m not aware of some instrumentation of Stream in order to avoid the
> problem, hence
> ** I coded an alternative implementation of MultiStateKeyIterator that
> avoids using java Stream,
> ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)
> # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends
> Stream<? extends R>> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream)
> # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)
> # <R> Stream<R> map(final Function<? super P_OUT, ? extends R>
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294,
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)