[
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Khachatryan updated FLINK-29928:
--------------------------------------
Release Note:
Added new config parameters:
- state.backend.rocksdb.memory.fixed-per-tm
(TODO: confirm before release)
was:
Added new config parameters:
- taskmanager.memory.managed.shared-fraction
(TODO: confirm before release)
> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
> Key: FLINK-29928
> URL: https://issues.apache.org/jira/browse/FLINK-29928
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Configuration, Runtime / State Backends,
> Runtime / Task
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
> - share these objects among RocksDB instances of the same slot
> - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained
> resource control).
> This is sub-optimal if some slots contain more memory intensive tasks than
> the others.
> The proposal is to widen the scope of sharing memory to TM, so that it can be
> shared across all of its RocksDB instances.
> That would reduce the overall memory consumption in exchange for resource
> isolation.
> h1. Proposed changes
> h2. Configuration
> - introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no
> default)
> -- cluster-level (yaml only)
> -- used by a job only if neither
> 'state.backend.rocksdb.memory.fixed-per-slot' nor
> 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
> -- use cluster-level or default configuration when creating TM-wise shared
> RocksDB objects, e.g. "state.backend.rocksdb.memory.managed",
> "state.backend.rocksdb.memory.write-buffer-ratio"
> -- doesn't affect Flink memory calculations; user needs to take it into
> account when planning capacity (similar to fixed-per-slot)
> h2. Example
> {code:java}
> # cluster-level configuration
> taskmanager.memory.managed.size: 1gb
> state.backend.rocksdb.memory.fixed-per-tm: 1gb
> taskmanager.numberOfTaskSlots: 10
> cluster.fine-grained-resource-management.enabled: false
> # job 1:
> state.backend.rocksdb.memory.managed: false # uses shared TM memory
> # job 2:
> state.backend.rocksdb.memory.managed: false # uses shared TM memory
> # job 3:
> state.backend.rocksdb.memory.managed: true # uses exclusive managed memory
> # job 4:
> state.backend.rocksdb.memory.managed: true # gets overriden below
> state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged
> memory
> {code}
> Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete
> with each other.
> Their Python code (or other consumers) will be able to use up to ~100Mb per
> slot.
> Jobs 3 and 4 are not affected as they specify using managed (3) or
> fixed-per-slot memory (4).
> Python code (or other consumers) will be able to use up to ~100Mb per slot
> but will compete with RocksDB in job (3).
> h2. Creating and sharing RocksDB objects
> Introduce sharedResources to TaskManager.
> Then, similarly to the current slot-wise sharing using MemoryManager:
> - put/get OpaqueMemoryResource
> - Creation of Cache object is done from the backend code on the first call
> - Release it when the last backend that uses it is destroyed
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion
> should NOT close it.
> h1. Testing
> * One way to test that the same RocksDB cache is used is via RocksDB metrics.
> - -ITCases parameterization-
> - manual and unit tests
> h1. Limitations
> - classloader.resolve-order=child-first is not supported
> - fine-grained-resource-management is not supported
> - only RocksDB will be able to use TM-wise shared memory; other consumers
> may be adjusted later
> h1. Rejected alternatives
> - set total "fixed-per-slot" to a larger value, essentially overcommitting
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
> - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks
> - requires more invasive changes in scheduler and TM
> - make part of managed memory shared; it is beleived that managed memory must
> preserve isolation proprty among other concerns
> cc: [~yunta], [~ym], [~liyu]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)