[
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Khachatryan updated FLINK-29928:
--------------------------------------
Release Note:
Added new config parameters:
- taskmanager.memory.managed.shared-fraction
(TODO: confirm before release)
was:
Added new config parameters:
- state.backend.memory.share-scope
- taskmanager.memory.managed.shared-fraction
(TODO: confirm before release)
> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
> Key: FLINK-29928
> URL: https://issues.apache.org/jira/browse/FLINK-29928
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Configuration, Runtime / State Backends,
> Runtime / Task
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
> - share these objects among RocksDB instances of the same slot
> - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained
> resource control).
> This is sub-optimal if some slots contain more memory intensive tasks than
> the others.
> The proposal is to widen the scope of sharing memory to TM, so that it can be
> shared across all of its RocksDB instances.
> That would reduce the overall memory consumption in exchange for resource
> isolation.
> h1. Proposed changes
> h2. Configuration
> - introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
> -- cluster-level (yaml only)
> -- the non-shared memory will be used as it is now (exclusively per-slot)
> - -introduce "state.backend.memory.share-scope"-
> -- -job-level (yaml and StateBackend)-
> -- -possible values: NONE, SLOT, TASK_MANAGER-
> -- -default: not set-
> -- override "state.backend.rocksdb.memory.fixed-per-slot" if shared-fraction
> > 0 (but don't deprecate it, because it specifies the size)
> - rely on the existing "state.backend.rocksdb.memory.managed" to decide
> whether the shared memory is managed or unmanaged
> - when calculating TM-wise shared memory, ignore
> "taskmanager.memory.managed.consumer-weights" because RocksDB is the only
> consumer so far
> - similarly, exclude StateBackend from weights calculations, so other
> consumers (e.g. PYTHON) can better utilize exclusive slot memory
> - use cluster-level or default configuration when creating TM-wise shared
> RocksDB objects, e.g. "state.backend.rocksdb.memory.managed",
> "state.backend.rocksdb.memory.write-buffer-ratio"
> h2. Example
> {code:java}
> taskmanager.memory.managed.size: 1gb
> taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of
> shared managed memory
> taskmanager.numberOfTaskSlots: 10 # each task slot gets 25Mb of
> exclusive managed memory
> cluster.fine-grained-resource-management.enabled: false
> job 1:
> state.backend.rocksdb.memory.managed: true
> job 2:
> state.backend.rocksdb.memory.managed: true
> job 3:
> state.backend.rocksdb.memory.fixed-per-slot: 50M # ignored
> state.backend.rocksdb.memory.managed: true
> job 4:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> job 5:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> {code}
> Jobs 1, 2, 3 will use the same 750Mb of managed memory and will compete with
> each other.
> Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete
> with each other. 750Mb here is calculated using the same settings
> (managed.shared-fraction) to avoid adding additional parameters
> (unmanaged.shared-fraction).
> Python code (or other consumers) will be able to use up to 25mb per slot.
> h2. Creating and sharing RocksDB objects
> Introduce sharedMemoryManager to TaskManager.
> Then, similarly to the current slot-wise sharing:
> - Memory manager manages OpaqueMemoryResource
> - Creation of Cache object is done from the backend code on the first call
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion
> should NOT close it.
> h1. Testing
> * One way to test that the same RocksDB cache is used is via RocksDB metrics.
> * ITCases parameterization
> * manual and unit tests
> h1. Limitations
> - classloader.resolve-order=child-first is not supported
> - fine-grained-resource-management is not supported
> - only RocksDB will be able to use TM-wise shared memory; other consumers
> may be adjusted later
> h1. Rejected alternatives
> - set total "fixed-per-slot" to a larger value, essentially overcommitting
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
> - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks
> - requires more invasive changes in scheduler and TM
> cc: [~yunta], [~ym], [~liyu]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)