[ 
https://issues.apache.org/jira/browse/FLINK-16267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111360#comment-17111360
 ] 

Trystan edited comment on FLINK-16267 at 5/19/20, 5:06 PM:
-----------------------------------------------------------

[~azagrebin] [~xintongsong], thanks for the clear replies! It makes sense that 
user code using off-heap memory not controlled by the JVM would be outside 
Flink's ability to control. Especially since we have ~500 threads created in 
userland, it makes sense that there would be up to 500mb, even 1gb (if our 
Dynamo library does some allocation too, way deep down) being allocated outside 
the memory manager's control. That would still leave ~2gb unaccounted for.

 

I think [~yunta] might have hit the nail on the head, though:
{noformat}
Iterate over map state would cause iterator on RocksDB, in which the memory 
usage cannot be controlled under current RocksDB implementation.{noformat}
We do this a lot, in fact! We are using a custom (keyed) ProcessFunction to do 
sliding window aggregation, where the key is the hour and the value is a count. 
We have some prebuffering (1h) using a normal tumbling window. The output is 
connected to our ProcessFunction where it computes the sliding window results. 
We had to implement a custom windower, because we have 1h slides tracking 
values for 1d, 2d, ..., 7d - so 672 data points. Flink's built-in windowing 
meant it would do 672 writes (for each slide), vs 1 write. Obviously the read 
side of the window computation is where the built-in windows would have 
performed better. We are iterating over every key in the map to compute the 7 
total aggregate values. The total size of our state when savepointing is ~1TB, 
so it's a decent amount of data, and the ProcessFunction is called ~20k 
times/sec across the entire job.

But the fact that iterating over the RocksDB state can't be controlled by 
Flink/RocksDB makes a lot of sense, and would certainly explain why our memory 
usage is so much higher than expected! I think between our threadpool and this 
fact, the large discrepancy could be accounted for.


was (Author: trystan):
[~azagrebin] [~xintongsong], thanks for the clear replies! It makes sense that 
user code using off-heap memory not controlled by the JVM would be outside 
Flink's ability to control. Especially since we have ~500 threads created in 
userland, it makes sense that there would be up to 500mb, even 1gb (if our 
Dynamo library does some allocation too, way deep down) being allocated outside 
the memory manager's control. That would still leave ~2gb unaccounted for.

 

I think [~yunta] might have hit the nail on the head, though:
{noformat}
Iterate over map state would cause iterator on RocksDB, in which the memory 
usage cannot be controlled under current RocksDB implementation.{noformat}
We do this a lot, in fact! We are using a custom (keyed) ProcessFunction to do 
sliding window aggregation, where the key is the hour and the value is a count. 
We have some prebuffering (1h) using a normal tumbling window. The output is 
connected to our ProcessFunction where it computes the sliding window results. 
We had to implement a custom windower, because we have 1h slides tracking 
values for 1d, 2d, ..., 7d - so 672 data points. Flink's built-in windowing 
meant it would do 672 writes (for each slide), vs 1 write. Obviously the read 
side of the window computation is where the built-in windows would have 
performed better. The total size of our state when savepointing is ~1TB, so 
it's a decent amount of data, and the ProcessFunction is called ~20k times/sec 
across the entire job.

But the fact that iterating over the RocksDB state can't be controlled by 
Flink/RocksDB makes a lot of sense, and would certainly explain why our memory 
usage is so much higher than expected! I think between our threadpool and this 
fact, the large discrepancy could be accounted for.

> Flink uses more memory than taskmanager.memory.process.size in Kubernetes
> -------------------------------------------------------------------------
>
>                 Key: FLINK-16267
>                 URL: https://issues.apache.org/jira/browse/FLINK-16267
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.10.0
>            Reporter: ChangZhuo Chen (陳昌倬)
>            Priority: Major
>         Attachments: flink-conf_1.10.0.yaml, flink-conf_1.9.1.yaml, 
> oomkilled_taskmanager.log
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> This issue is from 
> [https://stackoverflow.com/questions/60336764/flink-uses-more-memory-than-taskmanager-memory-process-size-in-kubernetes]
> h1. Description
>  * In Flink 1.10.0, we try to use `taskmanager.memory.process.size` to limit 
> the resource used by taskmanager to ensure they are not killed by Kubernetes. 
> However, we still get lots of taskmanager `OOMKilled`. The setup is in the 
> following section.
>  * The taskmanager log is in attachment [^oomkilled_taskmanager.log].
> h2. Kubernete
>  * The Kubernetes setup is the same as described in 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html].
>  * The following is resource configuration for taskmanager deployment in 
> Kubernetes:
> {{resources:}}
>  {{  requests:}}
>  {{    cpu: 1000m}}
>  {{    memory: 4096Mi}}
>  {{  limits:}}
>  {{    cpu: 1000m}}
>  {{    memory: 4096Mi}}
> h2. Flink Docker
>  * The Flink docker is built by the following Docker file.
> {{FROM flink:1.10-scala_2.11}}
> RUN mkdir -p /opt/flink/plugins/s3 &&
> ln -s /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar /opt/flink/plugins/s3/
>  {{RUN ln -s /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar 
> /opt/flink/lib/}}
> h2. Flink Configuration
>  * The following are all memory related configurations in `flink-conf.yaml` 
> in 1.10.0:
> {{jobmanager.heap.size: 820m}}
>  {{taskmanager.memory.jvm-metaspace.size: 128m}}
>  {{taskmanager.memory.process.size: 4096m}}
>  * We use RocksDB and we don't set `state.backend.rocksdb.memory.managed` in 
> `flink-conf.yaml`.
>  ** Use S3 as checkpoint storage.
>  * The code uses DateStream API
>  ** input/output are both Kafka.
> h2. Project Dependencies
>  * The following is our dependencies.
> {{val flinkVersion = "1.10.0"}}{{libraryDependencies += 
> "com.squareup.okhttp3" % "okhttp" % "4.2.2"}}
>  {{libraryDependencies += "com.typesafe" % "config" % "1.4.0"}}
>  {{libraryDependencies += "joda-time" % "joda-time" % "2.10.5"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % 
> flinkVersion}}
>  {{libraryDependencies += "org.apache.flink" % "flink-metrics-dropwizard" % 
> flinkVersion}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-scala" % flinkVersion 
> % "provided"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-statebackend-rocksdb" 
> % flinkVersion % "provided"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % 
> flinkVersion % "provided"}}
>  {{libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.6.7"}}
>  {{libraryDependencies += "org.log4s" %% "log4s" % "1.8.2"}}
>  {{libraryDependencies += "org.rogach" %% "scallop" % "3.3.1"}}
> h2. Previous Flink 1.9.1 Configuration
>  * The configuration we used in Flink 1.9.1 are the following. It does not 
> have `OOMKilled`.
> h3. Kubernetes
> {{resources:}}
>  {{  requests:}}
>  {{    cpu: 1200m}}
>  {{    memory: 2G}}
>  {{  limits:}}
>  {{    cpu: 1500m}}
>  {{    memory: 2G}}
> h3. Flink 1.9.1
> {{jobmanager.heap.size: 820m}}
>  {{taskmanager.heap.size: 1024m}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to