[
https://issues.apache.org/jira/browse/FLINK-16267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110934#comment-17110934
]
Yun Tang commented on FLINK-16267:
----------------------------------
[~czchen] I think you might not run jobs with rocksdb metrics, and I will give
some explanations for two reasons why memory over limit:
The reason why container happen to be OOMKilled is mainly due to:
* Once Flink managed memory for RocksDB, it will provide more memory
for RocksDB more than default configuration in your situation.
* Iterate over map state would cause iterator on RocksDB, in which
the memory usage cannot be controlled under current RocksDB implementation.
1) From your description, you have 1 broadcast state which is on-heap and not
in RocksDB. The other 3 states in one KeyedProcessFunction }}will be occupied
in one RocksDB instance with 3 different column families. Since you only have
one {{KeyedProcessFunction and I believe there would only one RocksDB instance
per slot.
RocksDB have 64 MB memory for one MemTable by default, and could have at most 2
MemTables by default for one column family. That means you would have at most
128MB in memory for write at most. Moreover, rocksDB has 8MB memory for block
cache to serve as read cache for each column family. Thus, 3 states would have:
3 * (64MB*2 + 8MB) = 544 MB .
The total memory for RocksDB in Flink-1.9 would be:
544MB + memory-used-by-index-and-filter + memory-used-when-iterator
In general, the overall memory should be smaller than 600MB. On the other hand,
if you manage memory for RocksDB in Flink, the memory for RocksDB would be
1423MB as Xintong analyzed. And Flink would use the total 1423MB to serve as
MemTable, block-cache, index & filter. If you did not actually consume so much
memory to write, the left part would be reserved as cache for read. That's why
I said managing memory would actually provide more memory for RocksDB to use in
your case, and if your job could benefit from read cache, you could achieve
better performance.
2) As I analyzed, the temporary memory usage of iteratoring over Map state of
RocksDB cannot be controlled by RocksDB itself, though this part of memory
might not contribute much memory but cannot be ignored if you get entries over
RocksDB map state.
> Flink uses more memory than taskmanager.memory.process.size in Kubernetes
> -------------------------------------------------------------------------
>
> Key: FLINK-16267
> URL: https://issues.apache.org/jira/browse/FLINK-16267
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.10.0
> Reporter: ChangZhuo Chen (陳昌倬)
> Priority: Major
> Attachments: flink-conf_1.10.0.yaml, flink-conf_1.9.1.yaml,
> oomkilled_taskmanager.log
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> This issue is from
> [https://stackoverflow.com/questions/60336764/flink-uses-more-memory-than-taskmanager-memory-process-size-in-kubernetes]
> h1. Description
> * In Flink 1.10.0, we try to use `taskmanager.memory.process.size` to limit
> the resource used by taskmanager to ensure they are not killed by Kubernetes.
> However, we still get lots of taskmanager `OOMKilled`. The setup is in the
> following section.
> * The taskmanager log is in attachment [^oomkilled_taskmanager.log].
> h2. Kubernete
> * The Kubernetes setup is the same as described in
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html].
> * The following is resource configuration for taskmanager deployment in
> Kubernetes:
> {{resources:}}
> {{ requests:}}
> {{ cpu: 1000m}}
> {{ memory: 4096Mi}}
> {{ limits:}}
> {{ cpu: 1000m}}
> {{ memory: 4096Mi}}
> h2. Flink Docker
> * The Flink docker is built by the following Docker file.
> {{FROM flink:1.10-scala_2.11}}
> RUN mkdir -p /opt/flink/plugins/s3 &&
> ln -s /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar /opt/flink/plugins/s3/
> {{RUN ln -s /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar
> /opt/flink/lib/}}
> h2. Flink Configuration
> * The following are all memory related configurations in `flink-conf.yaml`
> in 1.10.0:
> {{jobmanager.heap.size: 820m}}
> {{taskmanager.memory.jvm-metaspace.size: 128m}}
> {{taskmanager.memory.process.size: 4096m}}
> * We use RocksDB and we don't set `state.backend.rocksdb.memory.managed` in
> `flink-conf.yaml`.
> ** Use S3 as checkpoint storage.
> * The code uses DateStream API
> ** input/output are both Kafka.
> h2. Project Dependencies
> * The following is our dependencies.
> {{val flinkVersion = "1.10.0"}}{{libraryDependencies +=
> "com.squareup.okhttp3" % "okhttp" % "4.2.2"}}
> {{libraryDependencies += "com.typesafe" % "config" % "1.4.0"}}
> {{libraryDependencies += "joda-time" % "joda-time" % "2.10.5"}}
> {{libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" %
> flinkVersion}}
> {{libraryDependencies += "org.apache.flink" % "flink-metrics-dropwizard" %
> flinkVersion}}
> {{libraryDependencies += "org.apache.flink" %% "flink-scala" % flinkVersion
> % "provided"}}
> {{libraryDependencies += "org.apache.flink" %% "flink-statebackend-rocksdb"
> % flinkVersion % "provided"}}
> {{libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
> flinkVersion % "provided"}}
> {{libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.6.7"}}
> {{libraryDependencies += "org.log4s" %% "log4s" % "1.8.2"}}
> {{libraryDependencies += "org.rogach" %% "scallop" % "3.3.1"}}
> h2. Previous Flink 1.9.1 Configuration
> * The configuration we used in Flink 1.9.1 are the following. It does not
> have `OOMKilled`.
> h3. Kubernetes
> {{resources:}}
> {{ requests:}}
> {{ cpu: 1200m}}
> {{ memory: 2G}}
> {{ limits:}}
> {{ cpu: 1500m}}
> {{ memory: 2G}}
> h3. Flink 1.9.1
> {{jobmanager.heap.size: 820m}}
> {{taskmanager.heap.size: 1024m}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)