[ 
https://issues.apache.org/jira/browse/FLINK-14431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14431:
---------------------------
    Description: 
h3. Motivation

There are several shortcomings of current (Flink 1.10) Flink TaskManager memory 
information show in rest api.
h4. (1) The information from HardwareDescription  is difficult to match the 
memory compositions of TaskManager in flip-49. As below picture show:

!image-2019-12-19-18-09-05-542.png|width=444,height=389!
 * what's the meaning of HardwareDescription.sizeOfJvmHeap.
 * the user couldn't get resource config about TaskManager.

h4. (2) There isn't information for managed memory.
 * no metric for managed memory.

h4. (3) There isn't information for shuffle memory
 * according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle 
segment total size), user couldn't get shuffle memory.

h4. (4) The metrics in the TaskManager's metrics page do not correspond to the 
resource configuration of taskmanager
 * It is difficult for users to update taskmanager's resource configuration 
based on metrics because users couldn’t find configuration items related to 
metrics.

h3. Proposed Changes
h4. Add TaskManageResourceInfo which match the memory compositions 
 * information from TaskExecutorResourceSpec in flip-49, add it to 
TaskExecutorRegistration.

{code:java}
public class TaskManagerResourceInfo {
        private final double cpuCores;
        private final long frameworkHeap;
        private final long frameworkOffHeap;
        private final long taskHeap;
        private final long taskOffHeap;
        private final long shuffleMemory;
        private final long managedMemory;
        private final long jvmMetaSpace;
        private final long jvmOverhead;
        private final long totalProcessMemory;
}
{code}
 * Register it in TaskManagerServices.createMemoryManager. 

h4. Get TaskManager Resource Config from rest api
 * Because of the resource configuration in each TaskManager may be different.

 * Add TaskManagerResourceConfiguration In TaskManagerServicesConfiguration:

{code:java}
public class TaskManagerResourceConfiguration {
    private final long configuredMemory;

    private final MemoryType memoryType;

    private final boolean preAllocateMemory;

    private final float memoryFraction;

    private final int pageSize;
}{code}
 * In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration 
to  taskManagerConfiguration.

 * Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api 
could get it through ResourceManager.requestTaskManagerInfo . 

h4. Add TaskManageResourceInfo which match the memory composition of the 
TaskManager
h5. data in json
{code:json}
{
  "cpuAllocated": -1,
  "cpuUsage": -1,
  "taskHeapAllocated": 966787072,
  "taskHeapUsed": 76071880,
  "heapManageMemoryMax": 0,
  "heapManageMemoryUsed": 0,
  "offHeapManageMemoryMax": 0,
  "offHeapManageMemoryUsed": 0,
  "networkMemoryMax": 107413504,
  "networkMemoryUsed": 0
}{code}
h5. merge information to match Taskmanager’s memory composition
{code:java}
public static TaskManagerResourceInfo create(HardwareDescription 
hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics, 
TaskManagerResourceConfig taskManagerResourceConfig) {
        long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
        long javaHeapUsed = taskManagerMetrics.getHeapUsed();
        long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
        long heapManageMemoryAllocated = 0L;
        long heapManageMemoryUsed = 0L;
        long offHeapManageMemoryAllocated = 0L;
        long offHeapManageMemoryUsed = 0L;
        long networkMemoryAllocated = 
taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
        long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() - 
taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
        long manageMemoryAllocated = 
taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
        long manageMemoryUsed = 
(taskManagerMetrics.getManageMemorySegmentsTotal() - 
taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;

       
if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
            heapManageMemoryAllocated = manageMemoryAllocated;
            heapManageMemoryUsed = manageMemoryUsed;
            javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
            javaHeapUsed = taskManagerMetrics.getHeapUsed() - 
heapManageMemoryUsed;
        } else {
            offHeapManageMemoryAllocated = manageMemoryAllocated;
            offHeapManageMemoryUsed = manageMemoryUsed;
        }
        return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated, 
javaHeapUsed, 
             heapManageMemoryAllocated, heapManageMemoryUsed, 
offHeapManageMemoryAllocated, 
             offHeapManageMemoryUsed, networkMemoryAllocated, 
networkMemoryUsed);
}{code}
 * cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)

 * cpuUsage = (metric 
Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
 * cpuAllocated

  was:
h3. Motivation

There are several shortcomings of current (Flink 1.9) Flink TaskManager memory 
information show in rest api.
h4. (1) There isn't enough information for manage memory.
 * First Manage Memory segment's total and avaliable in metrics.FLINK-14406
 * Second what's the type and segment size of it. Because if user want to 
calculate Task Heap, it need to know manage memory.FLINK-14422

h4. (2) The information from HardwareDescription and TaskManagerMetricsInfo is 
difficult to match the memory composition of the TaskManager.

!image-2019-10-24-16-22-27-360.png!
 * Accoding to TaskManagerMetricsInfo's memorySegmentsTotal(ps: Network Segment 
total size), user couldn't get Network memory.

 * What's the meaning of HardwareDescription.sizeOfJvmHeap.

 * User couldn't get something about TaskHeap.

 * From this information, user couldn't know to change which TaskManager's 
configuration need change. 

h3. Proposed Changes
h4. Add manage memory metric
 * Add registerMemoryManagerMetrics in MemoryManager:

{code:java}
public void registerMemoryManagerMetrics(
        MetricGroup metricGroup) {
        checkNotNull(metricGroup);
        checkNotNull(this.memoryPool);
        MetricGroup memoryManagerGroup = metricGroup.addGroup("MemoryManager");
        this.memoryPool.getNumberOfAvailableMemorySegments();
        memoryManagerGroup.<Integer, Gauge<Integer>>gauge("TotalMemorySegments",
            this.memoryPool::getNumberOfTotalMemorySegments);
        memoryManagerGroup.<Integer, 
Gauge<Integer>>gauge("AvailableMemorySegments",
            this.memoryPool::getNumberOfTotalMemorySegments);
    }
{code}
 * Register it in TaskManagerServices.createMemoryManager. 

h4. Get TaskManager Resource Config from rest api
 * Because of the resource configuration in each TaskManager may be different.

 * Add TaskManagerResourceConfiguration In TaskManagerServicesConfiguration:

{code:java}
public class TaskManagerResourceConfiguration {
    private final long configuredMemory;

    private final MemoryType memoryType;

    private final boolean preAllocateMemory;

    private final float memoryFraction;

    private final int pageSize;
}{code}
 * In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration 
to  taskManagerConfiguration.

 * Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api 
could get it through ResourceManager.requestTaskManagerInfo . 

h4. Add TaskManageResourceInfo which match the memory composition of the 
TaskManager
h5. data in json
{code:json}
{
  "cpuAllocated": -1,
  "cpuUsage": -1,
  "taskHeapAllocated": 966787072,
  "taskHeapUsed": 76071880,
  "heapManageMemoryMax": 0,
  "heapManageMemoryUsed": 0,
  "offHeapManageMemoryMax": 0,
  "offHeapManageMemoryUsed": 0,
  "networkMemoryMax": 107413504,
  "networkMemoryUsed": 0
}{code}
h5. merge information to match Taskmanager’s memory composition
{code:java}
public static TaskManagerResourceInfo create(HardwareDescription 
hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics, 
TaskManagerResourceConfig taskManagerResourceConfig) {
        long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
        long javaHeapUsed = taskManagerMetrics.getHeapUsed();
        long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
        long heapManageMemoryAllocated = 0L;
        long heapManageMemoryUsed = 0L;
        long offHeapManageMemoryAllocated = 0L;
        long offHeapManageMemoryUsed = 0L;
        long networkMemoryAllocated = 
taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
        long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() - 
taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
        long manageMemoryAllocated = 
taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
        long manageMemoryUsed = 
(taskManagerMetrics.getManageMemorySegmentsTotal() - 
taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;

       
if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
            heapManageMemoryAllocated = manageMemoryAllocated;
            heapManageMemoryUsed = manageMemoryUsed;
            javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
            javaHeapUsed = taskManagerMetrics.getHeapUsed() - 
heapManageMemoryUsed;
        } else {
            offHeapManageMemoryAllocated = manageMemoryAllocated;
            offHeapManageMemoryUsed = manageMemoryUsed;
        }
        return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated, 
javaHeapUsed, 
             heapManageMemoryAllocated, heapManageMemoryUsed, 
offHeapManageMemoryAllocated, 
             offHeapManageMemoryUsed, networkMemoryAllocated, 
networkMemoryUsed);
}{code}
 * cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)

 * cpuUsage = (metric 
Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
 * cpuAllocated


> Update TaskManager's memory information to match its memory composition
> -----------------------------------------------------------------------
>
>                 Key: FLINK-14431
>                 URL: https://issues.apache.org/jira/browse/FLINK-14431
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / REST, Runtime / Task, Runtime / Web Frontend
>            Reporter: lining
>            Priority: Major
>         Attachments: image-2019-10-17-17-58-50-342.png, 
> image-2019-10-17-18-01-09-353.png, image-2019-10-17-18-29-53-329.png, 
> image-2019-10-24-16-19-15-499.png, image-2019-10-24-16-20-23-210.png, 
> image-2019-10-24-16-22-27-360.png, image-2019-12-19-18-09-05-542.png
>
>
> h3. Motivation
> There are several shortcomings of current (Flink 1.10) Flink TaskManager 
> memory information show in rest api.
> h4. (1) The information from HardwareDescription  is difficult to match the 
> memory compositions of TaskManager in flip-49. As below picture show:
> !image-2019-12-19-18-09-05-542.png|width=444,height=389!
>  * what's the meaning of HardwareDescription.sizeOfJvmHeap.
>  * the user couldn't get resource config about TaskManager.
> h4. (2) There isn't information for managed memory.
>  * no metric for managed memory.
> h4. (3) There isn't information for shuffle memory
>  * according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle 
> segment total size), user couldn't get shuffle memory.
> h4. (4) The metrics in the TaskManager's metrics page do not correspond to 
> the resource configuration of taskmanager
>  * It is difficult for users to update taskmanager's resource configuration 
> based on metrics because users couldn’t find configuration items related to 
> metrics.
> h3. Proposed Changes
> h4. Add TaskManageResourceInfo which match the memory compositions 
>  * information from TaskExecutorResourceSpec in flip-49, add it to 
> TaskExecutorRegistration.
> {code:java}
> public class TaskManagerResourceInfo {
>       private final double cpuCores;
>       private final long frameworkHeap;
>       private final long frameworkOffHeap;
>       private final long taskHeap;
>       private final long taskOffHeap;
>       private final long shuffleMemory;
>       private final long managedMemory;
>       private final long jvmMetaSpace;
>       private final long jvmOverhead;
>       private final long totalProcessMemory;
> }
> {code}
>  * Register it in TaskManagerServices.createMemoryManager. 
> h4. Get TaskManager Resource Config from rest api
>  * Because of the resource configuration in each TaskManager may be different.
>  * Add TaskManagerResourceConfiguration In TaskManagerServicesConfiguration:
> {code:java}
> public class TaskManagerResourceConfiguration {
>     private final long configuredMemory;
>     private final MemoryType memoryType;
>     private final boolean preAllocateMemory;
>     private final float memoryFraction;
>     private final int pageSize;
> }{code}
>  * In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration 
> to  taskManagerConfiguration.
>  * Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api 
> could get it through ResourceManager.requestTaskManagerInfo . 
> h4. Add TaskManageResourceInfo which match the memory composition of the 
> TaskManager
> h5. data in json
> {code:json}
> {
>   "cpuAllocated": -1,
>   "cpuUsage": -1,
>   "taskHeapAllocated": 966787072,
>   "taskHeapUsed": 76071880,
>   "heapManageMemoryMax": 0,
>   "heapManageMemoryUsed": 0,
>   "offHeapManageMemoryMax": 0,
>   "offHeapManageMemoryUsed": 0,
>   "networkMemoryMax": 107413504,
>   "networkMemoryUsed": 0
> }{code}
> h5. merge information to match Taskmanager’s memory composition
> {code:java}
> public static TaskManagerResourceInfo create(HardwareDescription 
> hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics, 
> TaskManagerResourceConfig taskManagerResourceConfig) {
>         long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
>         long javaHeapUsed = taskManagerMetrics.getHeapUsed();
>         long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
>         long heapManageMemoryAllocated = 0L;
>         long heapManageMemoryUsed = 0L;
>         long offHeapManageMemoryAllocated = 0L;
>         long offHeapManageMemoryUsed = 0L;
>         long networkMemoryAllocated = 
> taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
>         long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() 
> - taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
>         long manageMemoryAllocated = 
> taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
>         long manageMemoryUsed = 
> (taskManagerMetrics.getManageMemorySegmentsTotal() - 
> taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;
>        
> if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
>             heapManageMemoryAllocated = manageMemoryAllocated;
>             heapManageMemoryUsed = manageMemoryUsed;
>             javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
>             javaHeapUsed = taskManagerMetrics.getHeapUsed() - 
> heapManageMemoryUsed;
>         } else {
>             offHeapManageMemoryAllocated = manageMemoryAllocated;
>             offHeapManageMemoryUsed = manageMemoryUsed;
>         }
>         return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated, 
> javaHeapUsed, 
>              heapManageMemoryAllocated, heapManageMemoryUsed, 
> offHeapManageMemoryAllocated, 
>              offHeapManageMemoryUsed, networkMemoryAllocated, 
> networkMemoryUsed);
> }{code}
>  * cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)
>  * cpuUsage = (metric 
> Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
>  * cpuAllocated



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to