[ 
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-29928:
--------------------------------------
    Release Note: 
Added new config parameters: 
- state.backend.rocksdb.memory.fixed-per-tm

 (TODO: confirm before release)

  was:
Added new config parameters: 
- taskmanager.memory.managed.shared-fraction

 (TODO: confirm before release)


> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
>                 Key: FLINK-29928
>                 URL: https://issues.apache.org/jira/browse/FLINK-29928
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Configuration, Runtime / State Backends, 
> Runtime / Task
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for 
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
>  - share these objects among RocksDB instances of the same slot
>  - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained 
> resource control).
> This is sub-optimal if some slots contain more memory intensive tasks than 
> the others.
> The proposal is to widen the scope of sharing memory to TM, so that it can be 
> shared across all of its RocksDB instances.
> That would reduce the overall memory consumption in exchange for resource 
> isolation.
> h1. Proposed changes
> h2. Configuration
>  - introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no 
> default)
>  -- cluster-level (yaml only)
>  -- used by a job only if neither 
> 'state.backend.rocksdb.memory.fixed-per-slot' nor 
> 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
>  -- use cluster-level or default configuration when creating TM-wise shared 
> RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
> "state.backend.rocksdb.memory.write-buffer-ratio"
>  -- doesn't affect Flink memory calculations; user needs to take it into 
> account when planning capacity (similar to fixed-per-slot)
> h2. Example
> {code:java}
> # cluster-level configuration
> taskmanager.memory.managed.size: 1gb
> state.backend.rocksdb.memory.fixed-per-tm: 1gb
> taskmanager.numberOfTaskSlots: 10
> cluster.fine-grained-resource-management.enabled: false
> # job 1:
> state.backend.rocksdb.memory.managed: false # uses shared TM memory
> # job 2:
> state.backend.rocksdb.memory.managed: false # uses shared TM memory
> # job 3:
> state.backend.rocksdb.memory.managed: true # uses exclusive managed memory
> # job 4:
> state.backend.rocksdb.memory.managed: true # gets overriden below
> state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged 
> memory
> {code}
> Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete 
> with each other.
> Their Python code (or other consumers) will be able to use up to ~100Mb per 
> slot.
> Jobs 3 and 4 are not affected as they specify using managed (3) or 
> fixed-per-slot memory (4).
> Python code (or other consumers) will be able to use up to ~100Mb per slot 
> but will compete with RocksDB in job (3).
> h2. Creating and sharing RocksDB objects
> Introduce sharedResources to TaskManager.
> Then, similarly to the current slot-wise sharing using MemoryManager:
>  - put/get OpaqueMemoryResource
>  - Creation of Cache object is done from the backend code on the first call
>  - Release it when the last backend that uses it is destroyed
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be 
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion 
> should NOT close it.
> h1. Testing
>  * One way to test that the same RocksDB cache is used is via RocksDB metrics.
> - -ITCases parameterization-
> - manual and unit tests
> h1. Limitations
>  - classloader.resolve-order=child-first is not supported
>  - fine-grained-resource-management is not supported
>  - only RocksDB will be able to use TM-wise shared memory; other consumers 
> may be adjusted later
> h1. Rejected alternatives
>  - set total "fixed-per-slot" to a larger value, essentially overcommitting 
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
>  - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks 
> - requires more invasive changes in scheduler and TM
> - make part of managed memory shared; it is beleived that managed memory must 
> preserve isolation proprty among other concerns
> cc: [~yunta], [~ym], [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to