azagrebin commented on a change in pull request #13316:
URL: https://github.com/apache/flink/pull/13316#discussion_r491842527



##########
File path: flink-runtime-web/src/test/resources/rest_api_v1.snapshot
##########
@@ -3082,4 +3100,4 @@
       }
     }
   } ]
-}
+}

Review comment:
       accidental change?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
##########
@@ -106,36 +118,97 @@
        @JsonProperty(FIELD_NAME_MAPPED_MAX)
        private final long mappedMax;
 
-       // --------- Network buffer pool -------------
+       // --------- Shuffle Netty buffer pool -------------
 
-       @JsonProperty(FIELD_NAME_NETWORK_MEMORY_SEGMENTS_AVAILABLE)
-       private final long memorySegmentsAvailable;
+       @JsonProperty(FIELD_NAME_SHUFFLE_MEMORY_SEGMENTS_AVAILABLE)
+       private final long shuffleMemorySegmentsAvailable;
 
-       @JsonProperty(FIELD_NAME_NETWORK_MEMROY_SEGMENTS_TOTAL)
-       private final long memorySegmentsTotal;
+       @JsonProperty(FIELD_NAME_SHUFFLE_MEMORY_SEGMENTS_USED)
+       private final long shuffleMemorySegmentsUsed;
+
+       @JsonProperty(FIELD_NAME_SHUFFLE_MEMORY_SEGMENTS_TOTAL)
+       private final long shuffleMemorySegmentsTotal;
+
+       @JsonProperty(FIELD_NAME_SHUFFLE_MEMORY_AVAILABLE)
+       private final long shuffleMemoryAvailable;
+
+       @JsonProperty(FIELD_NAME_SHUFFLE_MEMORY_USED)
+       private final long shuffleMemoryUsed;
+
+       @JsonProperty(FIELD_NAME_SHUFFLE_MEMORY_TOTAL)
+       private final long shuffleMemoryTotal;
 
        // --------- Garbage collectors -------------
 
        @JsonProperty(FIELD_NAME_GARBAGE_COLLECTORS)
        private final List<GarbageCollectorInfo> garbageCollectorsInfo;
 
+       public TaskManagerMetricsInfo(
+               long heapUsed,

Review comment:
       the formatting is off for changed method signatures in this file
   there should be one more indent for arguments (and/or new line after 
arguments)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMetricsInfo.java
##########
@@ -58,7 +58,19 @@
 
        public static final String FIELD_NAME_NETWORK_MEMORY_SEGMENTS_AVAILABLE 
= "memorySegmentsAvailable";
 
-       public static final String FIELD_NAME_NETWORK_MEMROY_SEGMENTS_TOTAL = 
"memorySegmentsTotal";
+       public static final String FIELD_NAME_NETWORK_MEMORY_SEGMENTS_TOTAL = 
"memorySegmentsTotal";
+
+       public static final String FIELD_NAME_SHUFFLE_MEMORY_SEGMENTS_AVAILABLE 
= "shuffleNettyMemorySegmentsAvailable";

Review comment:
       I think english-wise `nettyShuffle*` sounds better

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
##########
@@ -109,6 +115,70 @@ public void testCreatePoolAfterDestroy() {
 
        }
 
+       @Test
+       public void testMemoryUsageInTheContextOfMemorySegmentAllocation() {
+               final int bufferSize = 128;
+               final int numBuffers = 10;
+
+               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, bufferSize);
+
+               assertThat(globalPool.getTotalNumberOfMemorySegments(), 
is(numBuffers));
+               assertThat(globalPool.getNumberOfAvailableMemorySegments(), 
is(numBuffers));
+               assertThat(globalPool.getNumberOfUsedMemorySegments(), is(0));
+
+               assertThat(globalPool.getTotalMemory(), is((long) numBuffers * 
bufferSize));
+               assertThat(globalPool.getAvailableMemory(), is((long) 
numBuffers * bufferSize));
+               assertThat(globalPool.getUsedMemory(), is(0L));
+
+               assertThat(globalPool.getNumberOfRegisteredBufferPools(), 
is(0));

Review comment:
       is this not already tested in `testCreatePoolAfterDestroy`?
   or maybe it is better to create a separate test case for this: 
`testMemoryUsageInTheContextOfMemoryPoolCreation`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
##########
@@ -117,7 +118,8 @@ public TaskManagerDetailsHandler(
                        );
        }
 
-       private static TaskManagerMetricsInfo 
createTaskManagerMetricsInfo(MetricStore.TaskManagerMetricStore tmMetrics) {
+       @VisibleForTesting
+       static TaskManagerMetricsInfo 
createTaskManagerMetricsInfo(MetricStore.TaskManagerMetricStore tmMetrics) {

Review comment:
       I would avoid `VisibleForTesting` shortcuts
   and test directly `TaskManagerDetailsHandler::handleRequest`
   like in `TaskManagerLogListHandlerTest`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
##########
@@ -320,7 +320,8 @@ private TaskManagerMetricStore() {
                        this(new ConcurrentHashMap<>(), 
ConcurrentHashMap.newKeySet());
                }
 
-               private TaskManagerMetricStore(Map<String, String> metrics, 
Set<String> garbageCollectorNames) {
+               @VisibleForTesting
+               protected TaskManagerMetricStore(Map<String, String> metrics, 
Set<String> garbageCollectorNames) {

Review comment:
       I would avoid `VisibleForTesting` shortcuts.
   we can use available `MetricStore::add` instead like in `MetricStoreTest`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
##########
@@ -109,6 +115,70 @@ public void testCreatePoolAfterDestroy() {
 
        }
 
+       @Test
+       public void testMemoryUsageInTheContextOfMemorySegmentAllocation() {
+               final int bufferSize = 128;
+               final int numBuffers = 10;
+
+               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, bufferSize);
+
+               assertThat(globalPool.getTotalNumberOfMemorySegments(), 
is(numBuffers));
+               assertThat(globalPool.getNumberOfAvailableMemorySegments(), 
is(numBuffers));
+               assertThat(globalPool.getNumberOfUsedMemorySegments(), is(0));
+
+               assertThat(globalPool.getTotalMemory(), is((long) numBuffers * 
bufferSize));
+               assertThat(globalPool.getAvailableMemory(), is((long) 
numBuffers * bufferSize));
+               assertThat(globalPool.getUsedMemory(), is(0L));
+
+               assertThat(globalPool.getNumberOfRegisteredBufferPools(), 
is(0));
+
+               MemorySegment segment = globalPool.requestMemorySegment();
+               assertThat(segment, is(notNullValue()));
+
+               assertThat(globalPool.getTotalNumberOfMemorySegments(), 
is(numBuffers));
+               assertThat(globalPool.getNumberOfAvailableMemorySegments(), 
is(numBuffers - 1));
+               assertThat(globalPool.getNumberOfUsedMemorySegments(), is(1));
+
+               assertThat(globalPool.getTotalMemory(), is((long) numBuffers * 
bufferSize));
+               assertThat(globalPool.getAvailableMemory(), is((long) 
(numBuffers - 1) * bufferSize));
+               assertThat(globalPool.getUsedMemory(), is((long) bufferSize));
+
+               assertThat(globalPool.getNumberOfRegisteredBufferPools(), 
is(0));
+       }
+
+       @Test
+       public void testMemoryUsageInTheContextOfMemoryPoolDestruction() {
+               final int bufferSize = 128;
+               final int numBuffers = 10;
+
+               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, bufferSize);
+
+               MemorySegment segment = globalPool.requestMemorySegment();
+               assertThat(segment, is(notNullValue()));
+
+               assertThat(globalPool.getTotalNumberOfMemorySegments(), 
is(numBuffers));
+               assertThat(globalPool.getNumberOfAvailableMemorySegments(), 
is(numBuffers - 1));
+               assertThat(globalPool.getNumberOfUsedMemorySegments(), is(1));
+
+               assertThat(globalPool.getTotalMemory(), is((long) numBuffers * 
bufferSize));
+               assertThat(globalPool.getAvailableMemory(), is((long) 
(numBuffers - 1) * bufferSize));
+               assertThat(globalPool.getUsedMemory(), is((long) bufferSize));
+
+               assertThat(globalPool.getNumberOfRegisteredBufferPools(), 
is(0));

Review comment:
       is this not already tested in 
`testMemoryUsageInTheContextOfMemorySegmentAllocation`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to