[ 
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-29928:
--------------------------------------
    Release Note: 
Added new config parameters: 
- taskmanager.memory.managed.shared-fraction

 (TODO: confirm before release)

  was:
Added new config parameters: 
- state.backend.memory.share-scope
- taskmanager.memory.managed.shared-fraction

 (TODO: confirm before release)


> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
>                 Key: FLINK-29928
>                 URL: https://issues.apache.org/jira/browse/FLINK-29928
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Configuration, Runtime / State Backends, 
> Runtime / Task
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for 
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
>  - share these objects among RocksDB instances of the same slot
>  - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained 
> resource control).
> This is sub-optimal if some slots contain more memory intensive tasks than 
> the others.
> The proposal is to widen the scope of sharing memory to TM, so that it can be 
> shared across all of its RocksDB instances.
> That would reduce the overall memory consumption in exchange for resource 
> isolation.
> h1. Proposed changes
> h2. Configuration
>  - introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
>  -- cluster-level (yaml only)
>  -- the non-shared memory will be used as it is now (exclusively per-slot)
>  - -introduce "state.backend.memory.share-scope"-
>  -- -job-level (yaml and StateBackend)-
>  -- -possible values: NONE, SLOT, TASK_MANAGER-
>  -- -default: not set-
>  -- override "state.backend.rocksdb.memory.fixed-per-slot" if shared-fraction 
> > 0 (but don't deprecate it, because it specifies the size)
>  - rely on the existing "state.backend.rocksdb.memory.managed" to decide 
> whether the shared memory is managed or unmanaged
>  - when calculating TM-wise shared  memory, ignore 
> "taskmanager.memory.managed.consumer-weights" because RocksDB is the only 
> consumer so far
>  - similarly, exclude StateBackend from weights calculations, so other 
> consumers (e.g. PYTHON) can better utilize exclusive slot memory
>  - use cluster-level or default configuration when creating TM-wise shared 
> RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
> "state.backend.rocksdb.memory.write-buffer-ratio"
> h2. Example
> {code:java}
> taskmanager.memory.managed.size: 1gb
> taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of 
> shared managed memory
> taskmanager.numberOfTaskSlots: 10               # each task slot gets 25Mb of 
> exclusive managed memory
> cluster.fine-grained-resource-management.enabled: false
> job 1:
> state.backend.rocksdb.memory.managed: true
> job 2:
> state.backend.rocksdb.memory.managed: true
> job 3:
> state.backend.rocksdb.memory.fixed-per-slot: 50M # ignored
> state.backend.rocksdb.memory.managed: true
> job 4:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> job 5:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> {code}
> Jobs 1, 2, 3 will use the same 750Mb of managed memory and will compete with 
> each other.
> Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete 
> with each other. 750Mb here is calculated using the same settings 
> (managed.shared-fraction) to avoid adding additional parameters 
> (unmanaged.shared-fraction).
> Python code (or other consumers) will be able to use up to 25mb per slot.
> h2. Creating and sharing RocksDB objects
> Introduce sharedMemoryManager to TaskManager.
> Then, similarly to the current slot-wise sharing:
>  - Memory manager manages OpaqueMemoryResource
>  - Creation of Cache object is done from the backend code on the first call
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be 
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion 
> should NOT close it.
> h1. Testing
>  * One way to test that the same RocksDB cache is used is via RocksDB metrics.
>  * ITCases parameterization
>  * manual and unit tests
> h1. Limitations
>  - classloader.resolve-order=child-first is not supported
>  - fine-grained-resource-management is not supported
>  - only RocksDB will be able to use TM-wise shared memory; other consumers 
> may be adjusted later
> h1. Rejected alternatives
>  - set total "fixed-per-slot" to a larger value, essentially overcommitting 
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
>  - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks 
> - requires more invasive changes in scheduler and TM
> cc: [~yunta], [~ym], [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to