[
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Khachatryan updated FLINK-29928:
--------------------------------------
Description:
h1. Background and motivation
RocksDB is one of the main consumers of off-heap memory, which it uses for
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
- share these objects among RocksDB instances of the same slot
- bound the total memory usage by all RocksDB instances of a TM
The memory is divided between the slots equally (unless using fine-grained
resource control).
This is sub-optimal if some slots contain more memory intensive tasks than the
others.
The proposal is to widen the scope of sharing memory to TM, so that it can be
shared across all of its RocksDB instances.
That would reduce the overall memory consumption in exchange for resource
isolation.
h1. Proposed changes
h2. Configuration
- introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
-- cluster-level (yaml only)
-- the non-shared memory will be used as it is now (exclusively per-slot)
- introduce "state.backend.memory.share-scope"
-- job-level (yaml and StateBackend)
-- possible values: NONE, SLOT, TASK_MANAGER
-- default: not set
-- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set (but
don't deprecate it, because it specifies the size)
- rely on the existing "state.backend.rocksdb.memory.managed" to decide whether
the shared memory is managed or unmanaged
- when calculating TM-wise shared memory, ignore
"taskmanager.memory.managed.consumer-weights" because RocksDB is the only
consumer so far
- similarly, exclude StateBackend from weights calculations, so other consumers
(e.g. PYTHON) can better utilize exclusive slot memory
- use cluster-level or default configuration when creating TM-wise shared
RocksDB objects, e.g. "state.backend.rocksdb.memory.managed",
"state.backend.rocksdb.memory.write-buffer-ratio"
h2. Example
{code}
taskmanager.memory.managed.size: 1gb
taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of
shared managed memory
taskmanager.numberOfTaskSlots: 10 # each task slot gets 25Mb of
exclusive managed memory
cluster.fine-grained-resource-management.enabled: false
job 1:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true
job 2:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true
job 3:
state.backend.memory.share-scope: SLOT
state.backend.rocksdb.memory.managed: true
job 4:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
job 5:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
{code}
Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with
each other.
Job 3 will only use exclusive slot memory (25mb per slot).
Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete with
each other.
Python code (or other consumers) will be able to use up to 25mb per slot in
jobs 1,2,4,5.
h2. Creating and sharing RocksDB objects
Introduce sharedMemoryManager to TaskManager.
Then, similarly to the current slot-wise sharing:
- Memory manager manages OpaqueMemoryResource
- Creation of Cache object is done from the backend code on the first call
So flink-runtime doesn't have to depend on state backend.
h2. Class loading and resolution
RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set
to parent-first to prevent conflicts.
h2. Lifecycle
The cache object should be destroyed on TM termnation; job or task completion
should NOT close it.
h1. Testing
One way to test that the same RocksDB cache is used is via RocksDB metrics.
h1. Limitations
- classloader.resolve-order=child-first is not supported
- fine-grained-resource-management is not supported
- only RocksDB will be able to use TM-wise shared memory; other consumers may
be adjusted later
h1. Rejected alternatives
- set total "fixed-per-slot" to a larger value, essentially overcommitting
unmanaged memory - doesn't work well in containerized environments (OOMErrors)
- set numberOfTaskSlots=1 and allow sharing the same slot between any tasks -
requires more invasive changes in scheduler and TM
cc: [~yunta], [~ym], [~liyu]
was:
h1. Background and motivation
RocksDB is one of the main consumers of off-heap memory, which it uses for
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
- share these objects among RocksDB instances of the same slot
- bound the total memory usage by all RocksDB instances of a TM
The memory is divided between the slots equally (unless using fine-grained
resource control).
This is sub-optimal, if some slots contain more memory intensive tasks than the
others.
The proposal is to widen the scope of sharing memory to TM, so that it can be
shared across all of its RocksDB instances.
That would allow to reduce the overall memory consuption in exchange for
resource isolation.
h1. Proposed changes
h2. Configuration
- introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
-- cluster-level (yaml only)
-- the non-shared memory will be used as it is now (exclusively per-slot)
- introduce "state.backend.memory.share-scope"
-- job-level (yaml and StateBackend)
-- possible values: NONE, SLOT, TASK_MANAGER
-- default: not set
-- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set (but
don't deprecate it, because it specifies the size)
- rely on the existing "state.backend.rocksdb.memory.managed" to decide whether
the shared memory is managed or unmanaged
- when calculating TM-wise shared memory, ignore
"taskmanager.memory.managed.consumer-weights" because RocksDB is the only
consumer so far
- similarly, exclude StateBackend from weights calculations, so other consumers
(e.g. PYTHON) can better utilize exclusive slot memory
- use cluster-level or default configuration when creating TM-wise shared
RocksDB objects, e.g. "state.backend.rocksdb.memory.managed",
"state.backend.rocksdb.memory.write-buffer-ratio"
h2. Example
{code}
taskmanager.memory.managed.size: 1gb
taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of
shared managed memory
taskmanager.numberOfTaskSlots: 10 # each task slot gets 25Mb of
exclusive managed memory
cluster.fine-grained-resource-management.enabled: false
job 1:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true
job 2:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true
job 3:
state.backend.memory.share-scope: SLOT
state.backend.rocksdb.memory.managed: true
job 4:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
job 5:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
{code}
Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with
each other.
Job 3 will only use exclusive slot memory (25mb per slot).
Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete with
each other.
Python code (or other consumers) will be able to use up to 25mb per slot in
jobs 1,2,4,5.
h2. Creating and sharing RocksDB objects
Introduce sharedMemoryManager to TaskManager.
Then, similarly to the current slot-wise sharing:
- Memory manager manages OpaqueMemoryResource
- Creation of Cache object is done from the backend code on the first call
So flink-runtime doesn't have to depend on state backend.
h2. Class loading and resolution
RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set
to parent-first to prevent conflicts.
h2. Lifecycle
The cache object should be destroyed on TM termnation; job or task completion
should NOT close it.
h1. Testing
One way to test that the same RocksDB cache is used is via RocksDB metrics.
h1. Limitations
- classloader.resolve-order=child-first is not supported
- fine-grained-resource-management is not supported
- only RocksDB will be able to use TM-wise shared memory; other consumers may
be adjusted later
h1. Rejected alternatives
- set total "fixed-per-slot" to a larger value, essentially overcommitting
unmanaged memory - doesn't work well in containerized environments (OOMErrors)
- set numberOfTaskSlots=1 and allow sharing the same slot between any tasks -
requires more invasive changes in scheduler and TM
cc: [~yunta], [~ym], [~liyu]
> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
> Key: FLINK-29928
> URL: https://issues.apache.org/jira/browse/FLINK-29928
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Configuration, Runtime / State Backends,
> Runtime / Task
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
> - share these objects among RocksDB instances of the same slot
> - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained
> resource control).
> This is sub-optimal if some slots contain more memory intensive tasks than
> the others.
> The proposal is to widen the scope of sharing memory to TM, so that it can be
> shared across all of its RocksDB instances.
> That would reduce the overall memory consumption in exchange for resource
> isolation.
> h1. Proposed changes
> h2. Configuration
> - introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
> -- cluster-level (yaml only)
> -- the non-shared memory will be used as it is now (exclusively per-slot)
> - introduce "state.backend.memory.share-scope"
> -- job-level (yaml and StateBackend)
> -- possible values: NONE, SLOT, TASK_MANAGER
> -- default: not set
> -- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set
> (but don't deprecate it, because it specifies the size)
> - rely on the existing "state.backend.rocksdb.memory.managed" to decide
> whether the shared memory is managed or unmanaged
> - when calculating TM-wise shared memory, ignore
> "taskmanager.memory.managed.consumer-weights" because RocksDB is the only
> consumer so far
> - similarly, exclude StateBackend from weights calculations, so other
> consumers (e.g. PYTHON) can better utilize exclusive slot memory
> - use cluster-level or default configuration when creating TM-wise shared
> RocksDB objects, e.g. "state.backend.rocksdb.memory.managed",
> "state.backend.rocksdb.memory.write-buffer-ratio"
> h2. Example
> {code}
> taskmanager.memory.managed.size: 1gb
> taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of
> shared managed memory
> taskmanager.numberOfTaskSlots: 10 # each task slot gets 25Mb of
> exclusive managed memory
> cluster.fine-grained-resource-management.enabled: false
> job 1:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: true
> job 2:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: true
> job 3:
> state.backend.memory.share-scope: SLOT
> state.backend.rocksdb.memory.managed: true
> job 4:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> job 5:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> {code}
> Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with
> each other.
> Job 3 will only use exclusive slot memory (25mb per slot).
> Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete
> with each other.
> Python code (or other consumers) will be able to use up to 25mb per slot in
> jobs 1,2,4,5.
> h2. Creating and sharing RocksDB objects
> Introduce sharedMemoryManager to TaskManager.
> Then, similarly to the current slot-wise sharing:
> - Memory manager manages OpaqueMemoryResource
> - Creation of Cache object is done from the backend code on the first call
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion
> should NOT close it.
> h1. Testing
> One way to test that the same RocksDB cache is used is via RocksDB metrics.
> h1. Limitations
> - classloader.resolve-order=child-first is not supported
> - fine-grained-resource-management is not supported
> - only RocksDB will be able to use TM-wise shared memory; other consumers may
> be adjusted later
> h1. Rejected alternatives
> - set total "fixed-per-slot" to a larger value, essentially overcommitting
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
> - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks -
> requires more invasive changes in scheduler and TM
> cc: [~yunta], [~ym], [~liyu]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)