[
https://issues.apache.org/jira/browse/FLINK-25801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-25801:
-----------------------------------
Labels: pull-request-available (was: )
> add cpu processor metric of taskmanager
> ---------------------------------------
>
> Key: FLINK-25801
> URL: https://issues.apache.org/jira/browse/FLINK-25801
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Metrics
> Reporter: 王俊博
> Priority: Minor
> Labels: pull-request-available
>
> flink process add cpu load metric, with user know environment of cpu
> processor they can determine that their job is io bound /cpu bound . But
> flink doesn't add container access cpu processor metric, if cpu environment
> of taskmanager is different(Cpu cores), it's hard to calculate cpu used of
> flink.
>
> {code:java}
> //代码占位符
> metrics.<Double, Gauge<Double>>gauge("Load", mxBean::getProcessCpuLoad);
> metrics.<Long, Gauge<Long>>gauge("Time", mxBean::getProcessCpuTime); {code}
> Spark give totalCores to show Number of cores available in this executor in
> ExecutorSummary.
> [https://spark.apache.org/docs/3.1.1/monitoring.html#:~:text=totalCores,in%20this%20executor.]
> {code:java}
> //代码占位符
> val sb = new StringBuilder
> sb.append(s"""spark_info{version="$SPARK_VERSION_SHORT",
> revision="$SPARK_REVISION"} 1.0\n""")
> val store = uiRoot.asInstanceOf[SparkUI].store
> store.executorList(true).foreach { executor =>
> val prefix = "metrics_executor_"
> val labels = Seq(
> "application_id" -> store.applicationInfo.id,
> "application_name" -> store.applicationInfo.name,
> "executor_id" -> executor.id
> ).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}")
> sb.append(s"${prefix}rddBlocks$labels ${executor.rddBlocks}\n")
> sb.append(s"${prefix}memoryUsed_bytes$labels ${executor.memoryUsed}\n")
> sb.append(s"${prefix}diskUsed_bytes$labels ${executor.diskUsed}\n")
> sb.append(s"${prefix}totalCores$labels ${executor.totalCores}\n")
> }{code}
> Spark add jvmCpuTime like this.
> {code:java}
> //代码占位符
> metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
> val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
> val name = new ObjectName("java.lang", "type", "OperatingSystem")
> override def getValue: Long = {
> try {
> // return JVM process CPU time if the ProcessCpuTime method is available
> mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long]
> } catch {
> case NonFatal(_) => -1L
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)