[
https://issues.apache.org/jira/browse/FLINK-14406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195303#comment-17195303
]
Chesnay Schepler commented on FLINK-14406:
------------------------------------------
[~mapohl] Is the concern about thread-safety about the correctness of the
exposed value, or handling concurrent accesses to data-structures (and the
resulting potential for things like ConcurrentModificationExceptions)?
> Add metric for managed memory
> -----------------------------
>
> Key: FLINK-14406
> URL: https://issues.apache.org/jira/browse/FLINK-14406
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Metrics, Runtime / Task
> Reporter: lining
> Assignee: Matthias
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> If a user wants to get memory used in time, as there's no manage memory's
> metrics, it couldn't get it.
> *Propose*
> * add default memory type in MemoryManager
>
> {code:java}
> public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;
> {code}
> * add getManagedMemoryTotal in TaskExecutor:
>
> {code:java}
> public long getManagedMemoryTotal() {
> return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
> slot ->
> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)
> ).sum();
> }
> {code}
>
> * add getManagedMemoryUsed in TaskExecutor:
>
> {code:java}
> public long getManagedMemoryUsed() {
> return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
> slot ->
> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)
> - slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
> ).sum();
> }
> {code}
>
> * add instantiateMemoryManagerMetrics in MetricUtils
>
> {code:java}
> public static void instantiateMemoryManagerMetrics(MetricGroup
> statusMetricGroup, TaskExecutor taskExecutor) {
> checkNotNull(statusMetricGroup);
> MetricGroup memoryManagerGroup =
> statusMetricGroup.addGroup("Managed").addGroup("Memory");
> memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",
> taskExecutor::getManagedMemoryTotal);
> memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed",
> taskExecutor::getManagedMemoryUsed);
> }
> {code}
> * register it in TaskManagerRunner#startTaskManager
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)