[
https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250920#comment-17250920
]
Xintong Song edited comment on FLINK-20646 at 12/17/20, 9:39 AM:
-----------------------------------------------------------------
I think this bug reveals two problems.
* Absence of common abstraction for "stateful transformations". Relying on
various concrete transformation implementations to maintain the key selectors
and declare the memory use case is fragile. New transformation implementations
can easily overlook them, which is what's happening now.
* Lack of testings. I'm wondering how this problem escaped from our release
testing. Reduce operation + RocksDB state backend should not be a that rare
case. Obviously we don't have test coverage for such scenarios.
As for a quick fix, we can simply call
{{updateManagedMemoryStateBackendUseCase}} in {{ReduceTransformation}}. I
reviewed the type hierarchy of {{Transformation}} and do not see other
sub-classes with this problem.
was (Author: xintongsong):
I think this but reveals two problems.
* Absence of common abstraction for "stateful transformations". Relying on
various concrete transformation implementations to maintain the key selectors
and declare the memory use case is fragile. New transformation implementations
can easily overlook them, which is what's happening now.
* Lack of testings. I'm wondering how this problem escaped from our release
testing. Reduce operation + RocksDB state backend should not be a that rare
case. Obviously we don't have test coverage for such scenarios.
As for a quick fix, we can simply call
{{updateManagedMemoryStateBackendUseCase}} in {{ReduceTransformation}}. I
reviewed the type hierarchy of {{Transformation}} and do not see other
sub-classes with this problem.
> ReduceTransformation does not work with RocksDBStateBackend
> -----------------------------------------------------------
>
> Key: FLINK-20646
> URL: https://issues.apache.org/jira/browse/FLINK-20646
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.12.0
> Reporter: Xintong Song
> Assignee: Xintong Song
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to
> properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed
> stream (with non-nullĀ {{KeySelector}}) to callĀ
> {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly
> introduced {{ReduceTransformation}} did not.
> As a result, Flink will not reserve managed memory for operators converted
> from {{ReduceTransformation}} (FLINK-19931), leading to the following failure
> when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Exception while restoring keyed state backend for
> StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from
> alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264)
> ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
> ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
> ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
> [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:832) [?:?]
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to
> allocate should not be 0. Please make sure that all types of managed memory
> consumers contained in the job are configured with a non-negative weight via
> `taskmanager.memory.managed.consumer-weights`.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:164)
> ~[flink-core-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:631)
> ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:612)
> ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:499)
> ~[flink-runtime_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:260)
> ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
> ... 16 more
> {code}
> The problem is reported on the user-zh mailing list. (In Chinese though.)
> http://apache-flink.147419.n8.nabble.com/flink-1-12-RocksDBStateBackend-td9504.html
--
This message was sent by Atlassian Jira
(v8.3.4#803005)