[ 
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-29928:
--------------------------------------
    Description: 
h1. Background and motivation

RocksDB is one of the main consumers of off-heap memory, which it uses for 
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
 - share these objects among RocksDB instances of the same slot
 - bound the total memory usage by all RocksDB instances of a TM

The memory is divided between the slots equally (unless using fine-grained 
resource control).
This is sub-optimal if some slots contain more memory intensive tasks than the 
others.

The proposal is to widen the scope of sharing memory to TM, so that it can be 
shared across all of its RocksDB instances.
That would reduce the overall memory consumption in exchange for resource 
isolation.
h1. Proposed changes
h2. Configuration
 - introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no 
default)
 -- cluster-level (yaml only)
 -- used by a job only if neither 'state.backend.rocksdb.memory.fixed-per-slot' 
nor 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
 -- use cluster-level or default configuration when creating TM-wise shared 
RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
"state.backend.rocksdb.memory.write-buffer-ratio"
 -- doesn't affect Flink memory calculations; user needs to take it into 
account when planning capacity (similar to fixed-per-slot)

h2. Example
{code:java}
# cluster-level configuration
taskmanager.memory.managed.size: 1gb
state.backend.rocksdb.memory.fixed-per-tm: 1gb
taskmanager.numberOfTaskSlots: 10
cluster.fine-grained-resource-management.enabled: false

# job 1:
state.backend.rocksdb.memory.managed: false # uses shared TM memory

# job 2:
state.backend.rocksdb.memory.managed: false # uses shared TM memory

# job 3:
state.backend.rocksdb.memory.managed: true # uses exclusive managed memory

# job 4:
state.backend.rocksdb.memory.managed: true # gets overriden below
state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged 
memory

{code}
Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete 
with each other.
Their Python code (or other consumers) will be able to use up to ~100Mb per 
slot.

Jobs 3 and 4 are not affected as they specify using managed (3) or 
fixed-per-slot memory (4).
Python code (or other consumers) will be able to use up to ~100Mb per slot but 
will compete with RocksDB in job (3).

h2. Creating and sharing RocksDB objects

Introduce sharedResources to TaskManager.
Then, similarly to the current slot-wise sharing using MemoryManager:
 - put/get OpaqueMemoryResource
 - Creation of Cache object is done from the backend code on the first call
 - Release it when the last backend that uses it is destroyed
So flink-runtime doesn't have to depend on state backend.

h2. Class loading and resolution

RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set 
to parent-first to prevent conflicts.
h2. Lifecycle

The cache object should be destroyed on TM termnation; job or task completion 
should NOT close it.
h1. Testing
 * One way to test that the same RocksDB cache is used is via RocksDB metrics.
 * ITCases parameterization
 * manual and unit tests

h1. Limitations
 - classloader.resolve-order=child-first is not supported
 - fine-grained-resource-management is not supported
 - only RocksDB will be able to use TM-wise shared memory; other consumers may 
be adjusted later

h1. Rejected alternatives
 - set total "fixed-per-slot" to a larger value, essentially overcommitting 
unmanaged memory - doesn't work well in containerized environments (OOMErrors)
 - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - 
requires more invasive changes in scheduler and TM
- make part of managed memory shared; it is beleived that managed memory must 
preserve isolation proprty among other concerns

cc: [~yunta], [~ym], [~liyu]

  was:
h1. Background and motivation

RocksDB is one of the main consumers of off-heap memory, which it uses for 
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
 - share these objects among RocksDB instances of the same slot
 - bound the total memory usage by all RocksDB instances of a TM

The memory is divided between the slots equally (unless using fine-grained 
resource control).
This is sub-optimal if some slots contain more memory intensive tasks than the 
others.

The proposal is to widen the scope of sharing memory to TM, so that it can be 
shared across all of its RocksDB instances.
That would reduce the overall memory consumption in exchange for resource 
isolation.
h1. Proposed changes
h2. Configuration
 - introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no 
default)
 -- cluster-level (yaml only)
 -- used by a job only if neither 'state.backend.rocksdb.memory.fixed-per-slot' 
nor 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
 - use cluster-level or default configuration when creating TM-wise shared 
RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
"state.backend.rocksdb.memory.write-buffer-ratio"
 -- doesn't affect Flink memory calculations; user needs to take it into 
account when planning capacity (similar to fixed-per-slot)

h2. Example
{code:java}
# cluster-level configuration
taskmanager.memory.managed.size: 1gb
state.backend.rocksdb.memory.fixed-per-tm: 1gb
taskmanager.numberOfTaskSlots: 10
cluster.fine-grained-resource-management.enabled: false

# job 1:
state.backend.rocksdb.memory.managed: false # uses shared TM memory

# job 2:
state.backend.rocksdb.memory.managed: false # uses shared TM memory

# job 3:
state.backend.rocksdb.memory.managed: true # uses exclusive managed memory

# job 4:
state.backend.rocksdb.memory.managed: true # gets overriden below
state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged 
memory

{code}
Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete 
with each other.
Their Python code (or other consumers) will be able to use up to ~100Mb per 
slot.

Jobs 3 and 4 are not affected as they specify using managed (3) or 
fixed-per-slot memory (4).
Python code (or other consumers) will be able to use up to ~100Mb per slot but 
will compete with RocksDB in job (3).

h2. Creating and sharing RocksDB objects

Introduce sharedResources to TaskManager.
Then, similarly to the current slot-wise sharing using MemoryManager:
 - put/get OpaqueMemoryResource
 - Creation of Cache object is done from the backend code on the first call
 - Release it when the last backend that uses it is destroyed
So flink-runtime doesn't have to depend on state backend.

h2. Class loading and resolution

RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set 
to parent-first to prevent conflicts.
h2. Lifecycle

The cache object should be destroyed on TM termnation; job or task completion 
should NOT close it.
h1. Testing
 * One way to test that the same RocksDB cache is used is via RocksDB metrics.
 * ITCases parameterization
 * manual and unit tests

h1. Limitations
 - classloader.resolve-order=child-first is not supported
 - fine-grained-resource-management is not supported
 - only RocksDB will be able to use TM-wise shared memory; other consumers may 
be adjusted later

h1. Rejected alternatives
 - set total "fixed-per-slot" to a larger value, essentially overcommitting 
unmanaged memory - doesn't work well in containerized environments (OOMErrors)
 - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - 
requires more invasive changes in scheduler and TM
- make part of managed memory shared; it is beleived that managed memory must 
preserve isolation proprty among other concerns

cc: [~yunta], [~ym], [~liyu]


> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
>                 Key: FLINK-29928
>                 URL: https://issues.apache.org/jira/browse/FLINK-29928
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Configuration, Runtime / State Backends, 
> Runtime / Task
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for 
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
>  - share these objects among RocksDB instances of the same slot
>  - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained 
> resource control).
> This is sub-optimal if some slots contain more memory intensive tasks than 
> the others.
> The proposal is to widen the scope of sharing memory to TM, so that it can be 
> shared across all of its RocksDB instances.
> That would reduce the overall memory consumption in exchange for resource 
> isolation.
> h1. Proposed changes
> h2. Configuration
>  - introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no 
> default)
>  -- cluster-level (yaml only)
>  -- used by a job only if neither 
> 'state.backend.rocksdb.memory.fixed-per-slot' nor 
> 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
>  -- use cluster-level or default configuration when creating TM-wise shared 
> RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
> "state.backend.rocksdb.memory.write-buffer-ratio"
>  -- doesn't affect Flink memory calculations; user needs to take it into 
> account when planning capacity (similar to fixed-per-slot)
> h2. Example
> {code:java}
> # cluster-level configuration
> taskmanager.memory.managed.size: 1gb
> state.backend.rocksdb.memory.fixed-per-tm: 1gb
> taskmanager.numberOfTaskSlots: 10
> cluster.fine-grained-resource-management.enabled: false
> # job 1:
> state.backend.rocksdb.memory.managed: false # uses shared TM memory
> # job 2:
> state.backend.rocksdb.memory.managed: false # uses shared TM memory
> # job 3:
> state.backend.rocksdb.memory.managed: true # uses exclusive managed memory
> # job 4:
> state.backend.rocksdb.memory.managed: true # gets overriden below
> state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged 
> memory
> {code}
> Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete 
> with each other.
> Their Python code (or other consumers) will be able to use up to ~100Mb per 
> slot.
> Jobs 3 and 4 are not affected as they specify using managed (3) or 
> fixed-per-slot memory (4).
> Python code (or other consumers) will be able to use up to ~100Mb per slot 
> but will compete with RocksDB in job (3).
> h2. Creating and sharing RocksDB objects
> Introduce sharedResources to TaskManager.
> Then, similarly to the current slot-wise sharing using MemoryManager:
>  - put/get OpaqueMemoryResource
>  - Creation of Cache object is done from the backend code on the first call
>  - Release it when the last backend that uses it is destroyed
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be 
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion 
> should NOT close it.
> h1. Testing
>  * One way to test that the same RocksDB cache is used is via RocksDB metrics.
>  * ITCases parameterization
>  * manual and unit tests
> h1. Limitations
>  - classloader.resolve-order=child-first is not supported
>  - fine-grained-resource-management is not supported
>  - only RocksDB will be able to use TM-wise shared memory; other consumers 
> may be adjusted later
> h1. Rejected alternatives
>  - set total "fixed-per-slot" to a larger value, essentially overcommitting 
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
>  - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks 
> - requires more invasive changes in scheduler and TM
> - make part of managed memory shared; it is beleived that managed memory must 
> preserve isolation proprty among other concerns
> cc: [~yunta], [~ym], [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to