This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 56019c714 [CELEBORN-1804] Shuffle environment metrics of 
RemoteShuffleEnvironment should use Shuffle.Remote metric group
56019c714 is described below

commit 56019c714a1b7fe10097e6512961db2b8e33f4a8
Author: SteNicholas <[email protected]>
AuthorDate: Tue Dec 31 14:39:43 2024 +0800

    [CELEBORN-1804] Shuffle environment metrics of RemoteShuffleEnvironment 
should use Shuffle.Remote metric group
    
    ### What changes were proposed in this pull request?
    
    Shuffle environment metrics of `RemoteShuffleEnvironment` should use 
`Shuffle.Remote` metric group.
    
    ### Why are the changes needed?
    
    Shuffle environment metrics of `RemoteShuffleEnvironment` uses incorrect 
netty metric group defined as `Shuffle.Netty`. Therefore, 
`RemoteShuffleEnvironment` should use remote metric group like `Shuffle.Remote` 
for shuffle environment metrics.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3032 from SteNicholas/CELEBORN-1804.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Weijie Guo <[email protected]>
---
 .../flink/AbstractRemoteShuffleEnvironment.java    |  7 +-
 .../flink/AbstractRemoteShuffleServiceFactory.java |  2 +-
 .../flink/metric/RemoteShuffleMetricFactory.java   | 76 ++++++++++++++++++++++
 .../flink/metric/RequestedMemoryUsageMetric.java   | 45 +++++++++++++
 4 files changed, 126 insertions(+), 4 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java
index bf344c335..ed665c27c 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.celeborn.plugin.flink;
 
+import static 
org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
 import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
 import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.*;
@@ -119,13 +120,13 @@ public abstract class AbstractRemoteShuffleEnvironment {
 
   public ShuffleIOOwnerContext createShuffleIOOwnerContext(
       String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup 
parentGroup) {
-    MetricGroup nettyGroup = 
createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
+    MetricGroup remoteGroup = 
createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
     return new ShuffleIOOwnerContext(
         checkNotNull(ownerName),
         checkNotNull(executionAttemptID),
         parentGroup,
-        nettyGroup.addGroup(METRIC_GROUP_OUTPUT),
-        nettyGroup.addGroup(METRIC_GROUP_INPUT));
+        remoteGroup.addGroup(METRIC_GROUP_OUTPUT),
+        remoteGroup.addGroup(METRIC_GROUP_INPUT));
   }
 
   public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java
index de1d73200..8927de331 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.celeborn.plugin.flink;
 
-import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
+import static 
org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.registerShuffleMetrics;
 
 import java.time.Duration;
 
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java
new file mode 100644
index 000000000..f784a201f
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.metric;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+
+/** Factory for remote shuffle service metrics. */
+public class RemoteShuffleMetricFactory {
+
+  // shuffle environment level metrics: Shuffle.Remote.*
+
+  private static final String METRIC_TOTAL_MEMORY_SEGMENT = 
"TotalMemorySegments";
+  private static final String METRIC_TOTAL_MEMORY = "TotalMemory";
+
+  private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = 
"AvailableMemorySegments";
+  private static final String METRIC_AVAILABLE_MEMORY = "AvailableMemory";
+
+  private static final String METRIC_USED_MEMORY_SEGMENT = 
"UsedMemorySegments";
+  private static final String METRIC_USED_MEMORY = "UsedMemory";
+
+  public static final String METRIC_REQUESTED_MEMORY_USAGE = 
"RequestedMemoryUsage";
+
+  // task level metric group structure: Shuffle.Remote.<Input|Output>.Buffers
+
+  public static final String METRIC_GROUP_SHUFFLE = "Shuffle";
+  public static final String METRIC_GROUP_REMOTE = "Remote";
+
+  private RemoteShuffleMetricFactory() {}
+
+  public static void registerShuffleMetrics(
+      MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
+    checkNotNull(metricGroup);
+    checkNotNull(networkBufferPool);
+    internalRegisterShuffleMetrics(metricGroup, networkBufferPool);
+  }
+
+  private static void internalRegisterShuffleMetrics(
+      MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
+    MetricGroup networkGroup =
+        
metricGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE);
+    networkGroup.gauge(
+        METRIC_TOTAL_MEMORY_SEGMENT, 
networkBufferPool::getTotalNumberOfMemorySegments);
+    networkGroup.gauge(METRIC_TOTAL_MEMORY, networkBufferPool::getTotalMemory);
+    networkGroup.gauge(
+        METRIC_AVAILABLE_MEMORY_SEGMENT, 
networkBufferPool::getNumberOfAvailableMemorySegments);
+    networkGroup.gauge(METRIC_AVAILABLE_MEMORY, 
networkBufferPool::getAvailableMemory);
+    networkGroup.gauge(
+        METRIC_USED_MEMORY_SEGMENT, 
networkBufferPool::getNumberOfUsedMemorySegments);
+    networkGroup.gauge(METRIC_USED_MEMORY, networkBufferPool::getUsedMemory);
+    networkGroup.gauge(
+        METRIC_REQUESTED_MEMORY_USAGE, new 
RequestedMemoryUsageMetric(networkBufferPool));
+  }
+
+  public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup 
parentGroup) {
+    return 
parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE);
+  }
+}
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java
new file mode 100644
index 000000000..0d3d6e8a6
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.metric;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.View;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+
+/**
+ * This is a small hack. Instead of spawning a custom thread to monitor {@link 
NetworkBufferPool}
+ * usage, we are re-using {@link View#update()} method for this purpose.
+ */
+public class RequestedMemoryUsageMetric implements Gauge<Integer>, View {
+
+  private final NetworkBufferPool networkBufferPool;
+
+  public RequestedMemoryUsageMetric(NetworkBufferPool networkBufferPool) {
+    this.networkBufferPool = networkBufferPool;
+  }
+
+  @Override
+  public Integer getValue() {
+    return networkBufferPool.getEstimatedRequestedSegmentsUsage();
+  }
+
+  @Override
+  public void update() {
+    networkBufferPool.maybeLogUsageWarning();
+  }
+}

Reply via email to