[
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794591#comment-17794591
]
ChangZhuo Chen (陳昌倬) commented on FLINK-26585:
----------------------------------------------
Is there any change to merge this into 1.18 series?
Since CaseClassSerializer has been deprecated in 1.18, the only way to migrate
away from CaseClassSerializer is state processor api. Without this fix, state
processor API cannot run when key size is large.
> State Processor API: Loading a state set buffers the whole state set in
> memory before starting to process
> ---------------------------------------------------------------------------------------------------------
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
> Issue Type: Improvement
> Components: API / State Processor
> Affects Versions: 1.13.0, 1.14.0, 1.15.0
> Reporter: Matthias Schwalbe
> Assignee: Matthias Schwalbe
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole
> state in memory before it event processes a single data point
> ** This is absolutely no problem for small state (hence the unit tests work
> fine)
> ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state
> descriptors and flattens all datapoints contained within
> ** The java.util.stream.Stream#flatMap function causes the buffering of the
> whole data set when enumerated later on
> ** See call stack [1]
> *** I our case this is 150e6 data points (> 1GiB just for the pointers to
> the data, let alone the data itself ~30GiB)
> ** I’m not aware of some instrumentation of Stream in order to avoid the
> problem, hence
> ** I coded an alternative implementation of MultiStateKeyIterator that
> avoids using java Stream,
> ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)
> # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends
> Stream<? extends R>> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream)
> # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)
> # <R> Stream<R> map(final Function<? super P_OUT, ? extends R>
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294,
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)