[ 
https://issues.apache.org/jira/browse/FLINK-16267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049548#comment-17049548
 ] 

Yun Tang edited comment on FLINK-16267 at 3/3/20 2:01 AM:
----------------------------------------------------------

Hi [~czchen], I suspect the memory consumed by iterator over map state is the 
cause lead to the OOM as that memory is not controllable. But before I give 
analysis for you problem, I hope you could provide more information.
 * How many slots you have for one task manager? Since you only have one 
KeyedProcessFunction with 4 states, I believe there would be only one RocksDB 
instance in one slot. I need to know how many slots for one task manager, if 
one TM has 4 slots and each would be used by a sub-task of 
KeyedProcessFunction, then that TM would have 4 RocksDB instances.
 * Have you ever configured RocksDB related pre-defined options or customized 
options factory?
 * More RocksDB related metrics could help more. You could refer to 
[PR-10930|https://github.com/apache/flink/pull/10930] to view the metrics shown 
for memory usage in RocksDB.  [Memory control via sharing 
cache|https://user-images.githubusercontent.com/1709104/72965622-ce904800-3df7-11ea-8a04-b818f67929c4.png]
 VS [No memory control for 
RocksDB|https://user-images.githubusercontent.com/1709104/72965622-ce904800-3df7-11ea-8a04-b818f67929c4.png]
 
 ** When you manage memory for RocksDB, you would only need to view the block 
cache usage to turn on configuration 
{{state.backend.rocksdb.metrics.block-cache-usage: true}},  and the metrics 
name could be 
{{taskmanager_job_task_operator_\{state-name}_rocksdb.block-cache-usage}}. Due 
to different slots share the same cache, the block-cache-usage in metrics is 
the same for different states at that specific sub-task, as you can see from 
the picture from the PR.
 ** When you did not manage memory for RocksDB, you need to turn on
 *** {{state.backend.rocksdb.metrics.block-cache-usage: true}}
 *** {{state.backend.rocksdb.metrics.size-all-mem-tables: true}} , and the 
metrics name could be 
{{taskmanager_job_task_operator_\{state-name}_rocksdb.size-all-mem-tables}}
 You need to add {{block-cache-usage}} and {{size-all-mem-tables}} of all 
states with different \{{ {state-name}
}} at the same sub-task-index to know how much memory used, as you can see from 
the picture in the PR.

 


was (Author: yunta):
Hi [~czchen], I suspect the memory consumed by iterator over map state is the 
cause lead to the OOM as that memory is not controllable. But before I give 
analysis for you problem, I hope you could provide more information.
 * How many slots you have for one task manager? Since you only have one 
KeyedProcessFunction with 4 states, I believe there would be only one RocksDB 
instance in one slot. I need to know how many slots for one task manager, if 
one TM has 4 slots and each would be used by a sub-task of 
KeyedProcessFunction, then that TM would have 4 RocksDB instances.
 * More RocksDB related metrics could help more. You could refer to 
[PR-10930|https://github.com/apache/flink/pull/10930] to view the metrics shown 
for memory usage in RocksDB.  [Memory control via sharing 
cache|https://user-images.githubusercontent.com/1709104/72965622-ce904800-3df7-11ea-8a04-b818f67929c4.png]
 VS [No memory control for 
RocksDB|https://user-images.githubusercontent.com/1709104/72965622-ce904800-3df7-11ea-8a04-b818f67929c4.png]
 
 ** When you manage memory for RocksDB, you would only need to view the block 
cache usage to turn on configuration 
{{state.backend.rocksdb.metrics.block-cache-usage: true}},  and the metrics 
name could be 
{{taskmanager_job_task_operator_\{state-name}_rocksdb.block-cache-usage}}. Due 
to different slots share the same cache, the block-cache-usage in metrics is 
the same for different states at that specific sub-task, as you can see from 
the picture from the PR.
 ** When you did not manage memory for RocksDB, you need to turn on
 *** {{state.backend.rocksdb.metrics.block-cache-usage: true}}
 *** {{state.backend.rocksdb.metrics.size-all-mem-tables: true}} , and the 
metrics name could be 
{{taskmanager_job_task_operator_\{state-name}_rocksdb.size-all-mem-tables}}
 You need to add {{block-cache-usage}} and {{size-all-mem-tables}} of all 
states with different {{{state-name}}} at the same sub-task-index to know how 
much memory used, as you can see from the picture in the PR.

 

> Flink uses more memory than taskmanager.memory.process.size in Kubernetes
> -------------------------------------------------------------------------
>
>                 Key: FLINK-16267
>                 URL: https://issues.apache.org/jira/browse/FLINK-16267
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.10.0
>            Reporter: ChangZhuo Chen (陳昌倬)
>            Priority: Major
>         Attachments: flink-conf_1.10.0.yaml, flink-conf_1.9.1.yaml, 
> oomkilled_taskmanager.log
>
>
> This issue is from 
> [https://stackoverflow.com/questions/60336764/flink-uses-more-memory-than-taskmanager-memory-process-size-in-kubernetes]
> h1. Description
>  * In Flink 1.10.0, we try to use `taskmanager.memory.process.size` to limit 
> the resource used by taskmanager to ensure they are not killed by Kubernetes. 
> However, we still get lots of taskmanager `OOMKilled`. The setup is in the 
> following section.
>  * The taskmanager log is in attachment [^oomkilled_taskmanager.log].
> h2. Kubernete
>  * The Kubernetes setup is the same as described in 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html].
>  * The following is resource configuration for taskmanager deployment in 
> Kubernetes:
> {{resources:}}
>  {{  requests:}}
>  {{    cpu: 1000m}}
>  {{    memory: 4096Mi}}
>  {{  limits:}}
>  {{    cpu: 1000m}}
>  {{    memory: 4096Mi}}
> h2. Flink Docker
>  * The Flink docker is built by the following Docker file.
> {{FROM flink:1.10-scala_2.11}}
> RUN mkdir -p /opt/flink/plugins/s3 &&
> ln -s /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar /opt/flink/plugins/s3/
>  {{RUN ln -s /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar 
> /opt/flink/lib/}}
> h2. Flink Configuration
>  * The following are all memory related configurations in `flink-conf.yaml` 
> in 1.10.0:
> {{jobmanager.heap.size: 820m}}
>  {{taskmanager.memory.jvm-metaspace.size: 128m}}
>  {{taskmanager.memory.process.size: 4096m}}
>  * We use RocksDB and we don't set `state.backend.rocksdb.memory.managed` in 
> `flink-conf.yaml`.
>  ** Use S3 as checkpoint storage.
>  * The code uses DateStream API
>  ** input/output are both Kafka.
> h2. Project Dependencies
>  * The following is our dependencies.
> {{val flinkVersion = "1.10.0"}}{{libraryDependencies += 
> "com.squareup.okhttp3" % "okhttp" % "4.2.2"}}
>  {{libraryDependencies += "com.typesafe" % "config" % "1.4.0"}}
>  {{libraryDependencies += "joda-time" % "joda-time" % "2.10.5"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % 
> flinkVersion}}
>  {{libraryDependencies += "org.apache.flink" % "flink-metrics-dropwizard" % 
> flinkVersion}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-scala" % flinkVersion 
> % "provided"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-statebackend-rocksdb" 
> % flinkVersion % "provided"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % 
> flinkVersion % "provided"}}
>  {{libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.6.7"}}
>  {{libraryDependencies += "org.log4s" %% "log4s" % "1.8.2"}}
>  {{libraryDependencies += "org.rogach" %% "scallop" % "3.3.1"}}
> h2. Previous Flink 1.9.1 Configuration
>  * The configuration we used in Flink 1.9.1 are the following. It does not 
> have `OOMKilled`.
> h3. Kubernetes
> {{resources:}}
>  {{  requests:}}
>  {{    cpu: 1200m}}
>  {{    memory: 2G}}
>  {{  limits:}}
>  {{    cpu: 1500m}}
>  {{    memory: 2G}}
> h3. Flink 1.9.1
> {{jobmanager.heap.size: 820m}}
>  {{taskmanager.heap.size: 1024m}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to