[
https://issues.apache.org/jira/browse/FLINK-14431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lining updated FLINK-14431:
---------------------------
Description:
h3. Motivation
There are several shortcomings of current (Flink 1.10) Flink TaskManager memory
information show in rest api.
h4. (1) The information from HardwareDescription is difficult to match the
memory compositions of TaskManager in flip-49. As below picture show:
!image-2019-12-19-18-09-05-542.png|width=444,height=389!
* what's the meaning of HardwareDescription.sizeOfJvmHeap.
* the user couldn't get resource config about TaskManager.
h4. (2) There isn't information for managed memory.
* no metric for managed memory.
h4. (3) There isn't information for shuffle memory
* according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle
segment total size), user couldn't get shuffle memory.
h4. (4) The metrics in the TaskManager's metrics page do not correspond to the
resource configuration of taskmanager
* It is difficult for users to update taskmanager's resource configuration
based on metrics because users couldn’t find configuration items related to
metrics.
h3. Proposed Changes
h4. Add TaskManageResourceInfo which match the memory compositions
* information from TaskExecutorResourceSpec in flip-49, add it to
TaskExecutorRegistration.
{code:java}
public class TaskManagerResourceInfo {
private final double cpuCores;
private final long frameworkHeap;
private final long frameworkOffHeap;
private final long taskHeap;
private final long taskOffHeap;
private final long shuffleMemory;
private final long managedMemory;
private final long jvmMetaSpace;
private final long jvmOverhead;
private final long totalProcessMemory;
}
{code}
* url: /taskmanagers/:taskmanagerid
* response: add
{code:java}
resource: {
cpuCores: 4,
frameworkHeap: 134217728,
frameworkOffHeap: 134217728,
taskHeap: 181193928,
taskOffHeap: 0,
shuffleMemory: 33554432,
managedMemory: 322122552,
jvmMetaSpace: 134217728,
jvmOverhead: 134217728,
totalProcessMemory: 1073741824
}
{code}
h4. Add shuffle memory metric
* add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool
{code:java}
public long getTotalMemorySize() {
return 1L * getTotalNumberOfMemorySegments() *
memorySegmentSize;
}
public long getAvaliableMemorySize() {
return 1L * getNumberOfAvailableMemorySegments() *
memorySegmentSize;
}{code}
* In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration
to taskManagerConfiguration.
* Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api
could get it through ResourceManager.requestTaskManagerInfo .
h4. Add TaskManageResourceInfo which match the memory composition of the
TaskManager
h5. data in json
{code:json}
{
"cpuAllocated": -1,
"cpuUsage": -1,
"taskHeapAllocated": 966787072,
"taskHeapUsed": 76071880,
"heapManageMemoryMax": 0,
"heapManageMemoryUsed": 0,
"offHeapManageMemoryMax": 0,
"offHeapManageMemoryUsed": 0,
"networkMemoryMax": 107413504,
"networkMemoryUsed": 0
}{code}
h5. merge information to match Taskmanager’s memory composition
{code:java}
public static TaskManagerResourceInfo create(HardwareDescription
hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics,
TaskManagerResourceConfig taskManagerResourceConfig) {
long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
long javaHeapUsed = taskManagerMetrics.getHeapUsed();
long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
long heapManageMemoryAllocated = 0L;
long heapManageMemoryUsed = 0L;
long offHeapManageMemoryAllocated = 0L;
long offHeapManageMemoryUsed = 0L;
long networkMemoryAllocated =
taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() -
taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
long manageMemoryAllocated =
taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
long manageMemoryUsed =
(taskManagerMetrics.getManageMemorySegmentsTotal() -
taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;
if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
heapManageMemoryAllocated = manageMemoryAllocated;
heapManageMemoryUsed = manageMemoryUsed;
javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
javaHeapUsed = taskManagerMetrics.getHeapUsed() -
heapManageMemoryUsed;
} else {
offHeapManageMemoryAllocated = manageMemoryAllocated;
offHeapManageMemoryUsed = manageMemoryUsed;
}
return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated,
javaHeapUsed,
heapManageMemoryAllocated, heapManageMemoryUsed,
offHeapManageMemoryAllocated,
offHeapManageMemoryUsed, networkMemoryAllocated,
networkMemoryUsed);
}{code}
* cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)
* cpuUsage = (metric
Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
* cpuAllocated
was:
h3. Motivation
There are several shortcomings of current (Flink 1.10) Flink TaskManager memory
information show in rest api.
h4. (1) The information from HardwareDescription is difficult to match the
memory compositions of TaskManager in flip-49. As below picture show:
!image-2019-12-19-18-09-05-542.png|width=444,height=389!
* what's the meaning of HardwareDescription.sizeOfJvmHeap.
* the user couldn't get resource config about TaskManager.
h4. (2) There isn't information for managed memory.
* no metric for managed memory.
h4. (3) There isn't information for shuffle memory
* according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle
segment total size), user couldn't get shuffle memory.
h4. (4) The metrics in the TaskManager's metrics page do not correspond to the
resource configuration of taskmanager
* It is difficult for users to update taskmanager's resource configuration
based on metrics because users couldn’t find configuration items related to
metrics.
h3. Proposed Changes
h4. Add TaskManageResourceInfo which match the memory compositions
* information from TaskExecutorResourceSpec in flip-49, add it to
TaskExecutorRegistration.
{code:java}
public class TaskManagerResourceInfo {
private final double cpuCores;
private final long frameworkHeap;
private final long frameworkOffHeap;
private final long taskHeap;
private final long taskOffHeap;
private final long shuffleMemory;
private final long managedMemory;
private final long jvmMetaSpace;
private final long jvmOverhead;
private final long totalProcessMemory;
}
{code}
* Register it in TaskManagerServices.createMemoryManager.
h4. Get TaskManager Resource Config from rest api
* Because of the resource configuration in each TaskManager may be different.
* Add TaskManagerResourceConfiguration In TaskManagerServicesConfiguration:
{code:java}
public class TaskManagerResourceConfiguration {
private final long configuredMemory;
private final MemoryType memoryType;
private final boolean preAllocateMemory;
private final float memoryFraction;
private final int pageSize;
}{code}
* In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration
to taskManagerConfiguration.
* Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api
could get it through ResourceManager.requestTaskManagerInfo .
h4. Add TaskManageResourceInfo which match the memory composition of the
TaskManager
h5. data in json
{code:json}
{
"cpuAllocated": -1,
"cpuUsage": -1,
"taskHeapAllocated": 966787072,
"taskHeapUsed": 76071880,
"heapManageMemoryMax": 0,
"heapManageMemoryUsed": 0,
"offHeapManageMemoryMax": 0,
"offHeapManageMemoryUsed": 0,
"networkMemoryMax": 107413504,
"networkMemoryUsed": 0
}{code}
h5. merge information to match Taskmanager’s memory composition
{code:java}
public static TaskManagerResourceInfo create(HardwareDescription
hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics,
TaskManagerResourceConfig taskManagerResourceConfig) {
long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
long javaHeapUsed = taskManagerMetrics.getHeapUsed();
long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
long heapManageMemoryAllocated = 0L;
long heapManageMemoryUsed = 0L;
long offHeapManageMemoryAllocated = 0L;
long offHeapManageMemoryUsed = 0L;
long networkMemoryAllocated =
taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() -
taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
long manageMemoryAllocated =
taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
long manageMemoryUsed =
(taskManagerMetrics.getManageMemorySegmentsTotal() -
taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;
if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
heapManageMemoryAllocated = manageMemoryAllocated;
heapManageMemoryUsed = manageMemoryUsed;
javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
javaHeapUsed = taskManagerMetrics.getHeapUsed() -
heapManageMemoryUsed;
} else {
offHeapManageMemoryAllocated = manageMemoryAllocated;
offHeapManageMemoryUsed = manageMemoryUsed;
}
return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated,
javaHeapUsed,
heapManageMemoryAllocated, heapManageMemoryUsed,
offHeapManageMemoryAllocated,
offHeapManageMemoryUsed, networkMemoryAllocated,
networkMemoryUsed);
}{code}
* cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)
* cpuUsage = (metric
Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
* cpuAllocated
> Update TaskManager's memory information to match its memory composition
> -----------------------------------------------------------------------
>
> Key: FLINK-14431
> URL: https://issues.apache.org/jira/browse/FLINK-14431
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / REST, Runtime / Task, Runtime / Web Frontend
> Reporter: lining
> Priority: Major
> Attachments: image-2019-10-17-17-58-50-342.png,
> image-2019-10-17-18-01-09-353.png, image-2019-10-17-18-29-53-329.png,
> image-2019-10-24-16-19-15-499.png, image-2019-10-24-16-20-23-210.png,
> image-2019-10-24-16-22-27-360.png, image-2019-12-19-18-09-05-542.png
>
>
> h3. Motivation
> There are several shortcomings of current (Flink 1.10) Flink TaskManager
> memory information show in rest api.
> h4. (1) The information from HardwareDescription is difficult to match the
> memory compositions of TaskManager in flip-49. As below picture show:
> !image-2019-12-19-18-09-05-542.png|width=444,height=389!
> * what's the meaning of HardwareDescription.sizeOfJvmHeap.
> * the user couldn't get resource config about TaskManager.
> h4. (2) There isn't information for managed memory.
> * no metric for managed memory.
> h4. (3) There isn't information for shuffle memory
> * according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle
> segment total size), user couldn't get shuffle memory.
> h4. (4) The metrics in the TaskManager's metrics page do not correspond to
> the resource configuration of taskmanager
> * It is difficult for users to update taskmanager's resource configuration
> based on metrics because users couldn’t find configuration items related to
> metrics.
> h3. Proposed Changes
> h4. Add TaskManageResourceInfo which match the memory compositions
> * information from TaskExecutorResourceSpec in flip-49, add it to
> TaskExecutorRegistration.
> {code:java}
> public class TaskManagerResourceInfo {
> private final double cpuCores;
> private final long frameworkHeap;
> private final long frameworkOffHeap;
> private final long taskHeap;
> private final long taskOffHeap;
> private final long shuffleMemory;
> private final long managedMemory;
> private final long jvmMetaSpace;
> private final long jvmOverhead;
> private final long totalProcessMemory;
> }
> {code}
> * url: /taskmanagers/:taskmanagerid
> * response: add
> {code:java}
> resource: {
> cpuCores: 4,
> frameworkHeap: 134217728,
> frameworkOffHeap: 134217728,
> taskHeap: 181193928,
> taskOffHeap: 0,
> shuffleMemory: 33554432,
> managedMemory: 322122552,
> jvmMetaSpace: 134217728,
> jvmOverhead: 134217728,
> totalProcessMemory: 1073741824
> }
> {code}
> h4. Add shuffle memory metric
> * add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool
> {code:java}
> public long getTotalMemorySize() {
> return 1L * getTotalNumberOfMemorySegments() *
> memorySegmentSize;
> }
> public long getAvaliableMemorySize() {
> return 1L * getNumberOfAvailableMemorySegments() *
> memorySegmentSize;
> }{code}
> * In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration
> to taskManagerConfiguration.
> * Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api
> could get it through ResourceManager.requestTaskManagerInfo .
> h4. Add TaskManageResourceInfo which match the memory composition of the
> TaskManager
> h5. data in json
> {code:json}
> {
> "cpuAllocated": -1,
> "cpuUsage": -1,
> "taskHeapAllocated": 966787072,
> "taskHeapUsed": 76071880,
> "heapManageMemoryMax": 0,
> "heapManageMemoryUsed": 0,
> "offHeapManageMemoryMax": 0,
> "offHeapManageMemoryUsed": 0,
> "networkMemoryMax": 107413504,
> "networkMemoryUsed": 0
> }{code}
> h5. merge information to match Taskmanager’s memory composition
> {code:java}
> public static TaskManagerResourceInfo create(HardwareDescription
> hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics,
> TaskManagerResourceConfig taskManagerResourceConfig) {
> long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
> long javaHeapUsed = taskManagerMetrics.getHeapUsed();
> long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
> long heapManageMemoryAllocated = 0L;
> long heapManageMemoryUsed = 0L;
> long offHeapManageMemoryAllocated = 0L;
> long offHeapManageMemoryUsed = 0L;
> long networkMemoryAllocated =
> taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
> long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal()
> - taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
> long manageMemoryAllocated =
> taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
> long manageMemoryUsed =
> (taskManagerMetrics.getManageMemorySegmentsTotal() -
> taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;
>
> if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
> heapManageMemoryAllocated = manageMemoryAllocated;
> heapManageMemoryUsed = manageMemoryUsed;
> javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
> javaHeapUsed = taskManagerMetrics.getHeapUsed() -
> heapManageMemoryUsed;
> } else {
> offHeapManageMemoryAllocated = manageMemoryAllocated;
> offHeapManageMemoryUsed = manageMemoryUsed;
> }
> return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated,
> javaHeapUsed,
> heapManageMemoryAllocated, heapManageMemoryUsed,
> offHeapManageMemoryAllocated,
> offHeapManageMemoryUsed, networkMemoryAllocated,
> networkMemoryUsed);
> }{code}
> * cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)
> * cpuUsage = (metric
> Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
> * cpuAllocated
--
This message was sent by Atlassian Jira
(v8.3.4#803005)