[ 
https://issues.apache.org/jira/browse/FLINK-14431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14431:
---------------------------
    Description: 
h3. Motivation

There are several shortcomings of current (Flink 1.10) Flink TaskManager memory 
information show in rest api.
h4. (1) The information from HardwareDescription  is difficult to match the 
memory compositions of TaskManager in flip-49. As below picture show:

!image-2019-12-19-18-09-05-542.png|width=444,height=389!
 * what's the meaning of HardwareDescription.sizeOfJvmHeap.
 * the user couldn't get resource config about TaskManager.

h4. (2) There isn't information for managed memory.
 * no metric for managed memory.

h4. (3) There isn't information for shuffle memory
 * according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle 
segment total size), the user couldn't get shuffle memory.

h4. (4) The metrics in the TaskManager's metrics page do not correspond to the 
resource configuration of taskmanager
 * It is difficult for users to update taskmanager's resource configuration 
based on metrics because users couldn’t find configuration items related to 
metrics.

h3. Proposed Changes
h4. Add TaskManageResourceInfo which match the memory compositions 
 * information from TaskExecutorResourceSpec in flip-49, add it to 
TaskExecutorRegistration.

{code:java}
public class TaskManagerResourceInfo {
    private final double cpuCores;
    private final long frameworkHeap;
    private final long frameworkOffHeap;
    private final long taskHeap;
    private final long taskOffHeap;
    private final long shuffleMemory;
    private final long managedMemory;
    private final long jvmMetaSpace;
    private final long jvmOverhead;
    private final long totalProcessMemory;
}
{code}
 * url: /taskmanagers/:taskmanagerid
 * response: add

{code:json}
resource: {
  cpuCores: 4,
  frameworkHeap: 134217728,
  frameworkOffHeap: 134217728,
  taskHeap: 181193928,
  taskOffHeap: 0,
  shuffleMemory: 33554432,
  managedMemory: 322122552,
  jvmMetaSpace: 134217728,
  jvmOverhead: 134217728,
  totalProcessMemory: 1073741824
}
{code}
h4. Add shuffle memory metric
 * add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool

{code:java}
public long getTotalMemorySize() {
    return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
}

public long getAvaliableMemorySize() {
    return 1L * getNumberOfAvailableMemorySegments() * memorySegmentSize;
}{code}
 * update NettyShuffleMetricFactory#registerShuffleMetrics

{code:java}
private static final String METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY = 
"TotalMemoryCapacity";
private static final String METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY = 
"AvaliableMemory";
private static void registerShuffleMetrics(
    String groupName,
    MetricGroup metricGroup,
    NetworkBufferPool networkBufferPool) {
    MetricGroup networkGroup = metricGroup.addGroup(groupName);
    networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
                                                
networkBufferPool::getTotalNumberOfMemorySegments);
    networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
                                                
networkBufferPool::getNumberOfAvailableMemorySegments);
    networkGroup.<Long, 
Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY,
                                          
networkBufferPool::getTotalMemorySize);
    networkGroup.<Long, 
Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY,
                                          
networkBufferPool::getAvaliableMemorySize);
}
{code}
h4. Add manage memory metric
 * add default memory type in MemoryManager

{code:java}
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;
{code}
 * add getManagedMemoryTotal in TaskExecutor:

{code:java}
public long getManagedMemoryTotal() {
    return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
        slot -> 
slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)
    ).sum();
}{code}
 * add getManagedMemoryUsed in TaskExecutor:

{code:java}
public long getManagedMemoryUsed() {
    return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
        slot -> 
slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)  
    - slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
    ).sum();
}{code}
 * add instantiateMemoryManagerMetrics in MetricUtils

{code:java}
public static void instantiateMemoryManagerMetrics(MetricGroup 
statusMetricGroup, TaskExecutor taskExecutor) {
    checkNotNull(statusMetricGroup);
    MetricGroup memoryManagerGroup = 
statusMetricGroup.addGroup("Managed").addGroup("Memory");
    memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", 
taskExecutor::getManagedMemoryTotal);
    memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", 
taskExecutor::getManagedMemoryUsed);
}{code}
 * register it in TaskManagerRunner#startTaskManager 

h4. Change the page of taskmanager's metric
 * according to resource configuration in flip-49 and memory metric, as the 
below picture shows:

!image-2019-12-19-18-28-01-447.png|width=671,height=282!
 * Status.JVM.Memory.Heap.Used as the usage of Flink Heap
 * Status.JVM.Memory.Direct.MemoryUsed - (shuffle total) as the usage of Flink 
offHeap
 * shuffle used as the usage of shuffle
 * managed used as the usage of shuffle
 * Status.JVM.Memory.NonHeap.Used as the usage of overhead

{code:json}
 {code}

  was:
h3. Motivation

There are several shortcomings of current (Flink 1.10) Flink TaskManager memory 
information show in rest api.
h4. (1) The information from HardwareDescription  is difficult to match the 
memory compositions of TaskManager in flip-49. As below picture show:

!image-2019-12-19-18-09-05-542.png|width=444,height=389!
 * what's the meaning of HardwareDescription.sizeOfJvmHeap.
 * the user couldn't get resource config about TaskManager.

h4. (2) There isn't information for managed memory.
 * no metric for managed memory.

h4. (3) There isn't information for shuffle memory
 * according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle 
segment total size), user couldn't get shuffle memory.

h4. (4) The metrics in the TaskManager's metrics page do not correspond to the 
resource configuration of taskmanager
 * It is difficult for users to update taskmanager's resource configuration 
based on metrics because users couldn’t find configuration items related to 
metrics.

h3. Proposed Changes
h4. Add TaskManageResourceInfo which match the memory compositions 
 * information from TaskExecutorResourceSpec in flip-49, add it to 
TaskExecutorRegistration.

{code:java}
public class TaskManagerResourceInfo {
    private final double cpuCores;
    private final long frameworkHeap;
    private final long frameworkOffHeap;
    private final long taskHeap;
    private final long taskOffHeap;
    private final long shuffleMemory;
    private final long managedMemory;
    private final long jvmMetaSpace;
    private final long jvmOverhead;
    private final long totalProcessMemory;
}
{code}
 * url: /taskmanagers/:taskmanagerid
 * response: add

{code:java}
resource: {
  cpuCores: 4,
  frameworkHeap: 134217728,
  frameworkOffHeap: 134217728,
  taskHeap: 181193928,
  taskOffHeap: 0,
  shuffleMemory: 33554432,
  managedMemory: 322122552,
  jvmMetaSpace: 134217728,
  jvmOverhead: 134217728,
  totalProcessMemory: 1073741824
}
{code}
h4. Add shuffle memory metric
 * add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool

{code:java}
        public long getTotalMemorySize() {
                return 1L * getTotalNumberOfMemorySegments() * 
memorySegmentSize;
        }

        public long getAvaliableMemorySize() {
                return 1L * getNumberOfAvailableMemorySegments() * 
memorySegmentSize;
        }{code}
 * In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration 
to  taskManagerConfiguration.

 * Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api 
could get it through ResourceManager.requestTaskManagerInfo . 

h4. Add TaskManageResourceInfo which match the memory composition of the 
TaskManager
h5. data in json
{code:json}
{
  "cpuAllocated": -1,
  "cpuUsage": -1,
  "taskHeapAllocated": 966787072,
  "taskHeapUsed": 76071880,
  "heapManageMemoryMax": 0,
  "heapManageMemoryUsed": 0,
  "offHeapManageMemoryMax": 0,
  "offHeapManageMemoryUsed": 0,
  "networkMemoryMax": 107413504,
  "networkMemoryUsed": 0
}{code}
h5. merge information to match Taskmanager’s memory composition
{code:java}
public static TaskManagerResourceInfo create(HardwareDescription 
hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics, 
TaskManagerResourceConfig taskManagerResourceConfig) {
        long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
        long javaHeapUsed = taskManagerMetrics.getHeapUsed();
        long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
        long heapManageMemoryAllocated = 0L;
        long heapManageMemoryUsed = 0L;
        long offHeapManageMemoryAllocated = 0L;
        long offHeapManageMemoryUsed = 0L;
        long networkMemoryAllocated = 
taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
        long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() - 
taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
        long manageMemoryAllocated = 
taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
        long manageMemoryUsed = 
(taskManagerMetrics.getManageMemorySegmentsTotal() - 
taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;

       
if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
            heapManageMemoryAllocated = manageMemoryAllocated;
            heapManageMemoryUsed = manageMemoryUsed;
            javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
            javaHeapUsed = taskManagerMetrics.getHeapUsed() - 
heapManageMemoryUsed;
        } else {
            offHeapManageMemoryAllocated = manageMemoryAllocated;
            offHeapManageMemoryUsed = manageMemoryUsed;
        }
        return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated, 
javaHeapUsed, 
             heapManageMemoryAllocated, heapManageMemoryUsed, 
offHeapManageMemoryAllocated, 
             offHeapManageMemoryUsed, networkMemoryAllocated, 
networkMemoryUsed);
}{code}
 * cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)

 * cpuUsage = (metric 
Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
 * cpuAllocated


> Update TaskManager's memory information to match its memory composition
> -----------------------------------------------------------------------
>
>                 Key: FLINK-14431
>                 URL: https://issues.apache.org/jira/browse/FLINK-14431
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / REST, Runtime / Task, Runtime / Web Frontend
>            Reporter: lining
>            Priority: Major
>         Attachments: image-2019-10-17-17-58-50-342.png, 
> image-2019-10-17-18-01-09-353.png, image-2019-10-17-18-29-53-329.png, 
> image-2019-10-24-16-19-15-499.png, image-2019-10-24-16-20-23-210.png, 
> image-2019-10-24-16-22-27-360.png, image-2019-12-19-18-09-05-542.png, 
> image-2019-12-19-18-27-38-589.png, image-2019-12-19-18-28-01-447.png
>
>
> h3. Motivation
> There are several shortcomings of current (Flink 1.10) Flink TaskManager 
> memory information show in rest api.
> h4. (1) The information from HardwareDescription  is difficult to match the 
> memory compositions of TaskManager in flip-49. As below picture show:
> !image-2019-12-19-18-09-05-542.png|width=444,height=389!
>  * what's the meaning of HardwareDescription.sizeOfJvmHeap.
>  * the user couldn't get resource config about TaskManager.
> h4. (2) There isn't information for managed memory.
>  * no metric for managed memory.
> h4. (3) There isn't information for shuffle memory
>  * according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle 
> segment total size), the user couldn't get shuffle memory.
> h4. (4) The metrics in the TaskManager's metrics page do not correspond to 
> the resource configuration of taskmanager
>  * It is difficult for users to update taskmanager's resource configuration 
> based on metrics because users couldn’t find configuration items related to 
> metrics.
> h3. Proposed Changes
> h4. Add TaskManageResourceInfo which match the memory compositions 
>  * information from TaskExecutorResourceSpec in flip-49, add it to 
> TaskExecutorRegistration.
> {code:java}
> public class TaskManagerResourceInfo {
>     private final double cpuCores;
>     private final long frameworkHeap;
>     private final long frameworkOffHeap;
>     private final long taskHeap;
>     private final long taskOffHeap;
>     private final long shuffleMemory;
>     private final long managedMemory;
>     private final long jvmMetaSpace;
>     private final long jvmOverhead;
>     private final long totalProcessMemory;
> }
> {code}
>  * url: /taskmanagers/:taskmanagerid
>  * response: add
> {code:json}
> resource: {
>   cpuCores: 4,
>   frameworkHeap: 134217728,
>   frameworkOffHeap: 134217728,
>   taskHeap: 181193928,
>   taskOffHeap: 0,
>   shuffleMemory: 33554432,
>   managedMemory: 322122552,
>   jvmMetaSpace: 134217728,
>   jvmOverhead: 134217728,
>   totalProcessMemory: 1073741824
> }
> {code}
> h4. Add shuffle memory metric
>  * add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool
> {code:java}
> public long getTotalMemorySize() {
>     return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
> }
> public long getAvaliableMemorySize() {
>     return 1L * getNumberOfAvailableMemorySegments() * memorySegmentSize;
> }{code}
>  * update NettyShuffleMetricFactory#registerShuffleMetrics
> {code:java}
> private static final String METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY = 
> "TotalMemoryCapacity";
> private static final String METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY = 
> "AvaliableMemory";
> private static void registerShuffleMetrics(
>     String groupName,
>     MetricGroup metricGroup,
>     NetworkBufferPool networkBufferPool) {
>     MetricGroup networkGroup = metricGroup.addGroup(groupName);
>     networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
>                                                 
> networkBufferPool::getTotalNumberOfMemorySegments);
>     networkGroup.<Integer, 
> Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
>                                                 
> networkBufferPool::getNumberOfAvailableMemorySegments);
>     networkGroup.<Long, 
> Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY,
>                                           
> networkBufferPool::getTotalMemorySize);
>     networkGroup.<Long, 
> Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY,
>                                           
> networkBufferPool::getAvaliableMemorySize);
> }
> {code}
> h4. Add manage memory metric
>  * add default memory type in MemoryManager
> {code:java}
> public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;
> {code}
>  * add getManagedMemoryTotal in TaskExecutor:
> {code:java}
> public long getManagedMemoryTotal() {
>     return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
>         slot -> 
> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)
>     ).sum();
> }{code}
>  * add getManagedMemoryUsed in TaskExecutor:
> {code:java}
> public long getManagedMemoryUsed() {
>     return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
>         slot -> 
> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)
>       - 
> slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
>     ).sum();
> }{code}
>  * add instantiateMemoryManagerMetrics in MetricUtils
> {code:java}
> public static void instantiateMemoryManagerMetrics(MetricGroup 
> statusMetricGroup, TaskExecutor taskExecutor) {
>     checkNotNull(statusMetricGroup);
>     MetricGroup memoryManagerGroup = 
> statusMetricGroup.addGroup("Managed").addGroup("Memory");
>     memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", 
> taskExecutor::getManagedMemoryTotal);
>     memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", 
> taskExecutor::getManagedMemoryUsed);
> }{code}
>  * register it in TaskManagerRunner#startTaskManager 
> h4. Change the page of taskmanager's metric
>  * according to resource configuration in flip-49 and memory metric, as the 
> below picture shows:
> !image-2019-12-19-18-28-01-447.png|width=671,height=282!
>  * Status.JVM.Memory.Heap.Used as the usage of Flink Heap
>  * Status.JVM.Memory.Direct.MemoryUsed - (shuffle total) as the usage of 
> Flink offHeap
>  * shuffle used as the usage of shuffle
>  * managed used as the usage of shuffle
>  * Status.JVM.Memory.NonHeap.Used as the usage of overhead
> {code:json}
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to