This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 56019c714 [CELEBORN-1804] Shuffle environment metrics of
RemoteShuffleEnvironment should use Shuffle.Remote metric group
56019c714 is described below
commit 56019c714a1b7fe10097e6512961db2b8e33f4a8
Author: SteNicholas <[email protected]>
AuthorDate: Tue Dec 31 14:39:43 2024 +0800
[CELEBORN-1804] Shuffle environment metrics of RemoteShuffleEnvironment
should use Shuffle.Remote metric group
### What changes were proposed in this pull request?
Shuffle environment metrics of `RemoteShuffleEnvironment` should use
`Shuffle.Remote` metric group.
### Why are the changes needed?
Shuffle environment metrics of `RemoteShuffleEnvironment` uses incorrect
netty metric group defined as `Shuffle.Netty`. Therefore,
`RemoteShuffleEnvironment` should use remote metric group like `Shuffle.Remote`
for shuffle environment metrics.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3032 from SteNicholas/CELEBORN-1804.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Weijie Guo <[email protected]>
---
.../flink/AbstractRemoteShuffleEnvironment.java | 7 +-
.../flink/AbstractRemoteShuffleServiceFactory.java | 2 +-
.../flink/metric/RemoteShuffleMetricFactory.java | 76 ++++++++++++++++++++++
.../flink/metric/RequestedMemoryUsageMetric.java | 45 +++++++++++++
4 files changed, 126 insertions(+), 4 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java
index bf344c335..ed665c27c 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java
@@ -17,6 +17,7 @@
package org.apache.celeborn.plugin.flink;
+import static
org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
import static
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.*;
@@ -119,13 +120,13 @@ public abstract class AbstractRemoteShuffleEnvironment {
public ShuffleIOOwnerContext createShuffleIOOwnerContext(
String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup
parentGroup) {
- MetricGroup nettyGroup =
createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
+ MetricGroup remoteGroup =
createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
return new ShuffleIOOwnerContext(
checkNotNull(ownerName),
checkNotNull(executionAttemptID),
parentGroup,
- nettyGroup.addGroup(METRIC_GROUP_OUTPUT),
- nettyGroup.addGroup(METRIC_GROUP_INPUT));
+ remoteGroup.addGroup(METRIC_GROUP_OUTPUT),
+ remoteGroup.addGroup(METRIC_GROUP_INPUT));
}
public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java
index de1d73200..8927de331 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java
@@ -18,7 +18,7 @@
package org.apache.celeborn.plugin.flink;
-import static
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
+import static
org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.registerShuffleMetrics;
import java.time.Duration;
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java
new file mode 100644
index 000000000..f784a201f
--- /dev/null
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.metric;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+
+/** Factory for remote shuffle service metrics. */
+public class RemoteShuffleMetricFactory {
+
+ // shuffle environment level metrics: Shuffle.Remote.*
+
+ private static final String METRIC_TOTAL_MEMORY_SEGMENT =
"TotalMemorySegments";
+ private static final String METRIC_TOTAL_MEMORY = "TotalMemory";
+
+ private static final String METRIC_AVAILABLE_MEMORY_SEGMENT =
"AvailableMemorySegments";
+ private static final String METRIC_AVAILABLE_MEMORY = "AvailableMemory";
+
+ private static final String METRIC_USED_MEMORY_SEGMENT =
"UsedMemorySegments";
+ private static final String METRIC_USED_MEMORY = "UsedMemory";
+
+ public static final String METRIC_REQUESTED_MEMORY_USAGE =
"RequestedMemoryUsage";
+
+ // task level metric group structure: Shuffle.Remote.<Input|Output>.Buffers
+
+ public static final String METRIC_GROUP_SHUFFLE = "Shuffle";
+ public static final String METRIC_GROUP_REMOTE = "Remote";
+
+ private RemoteShuffleMetricFactory() {}
+
+ public static void registerShuffleMetrics(
+ MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
+ checkNotNull(metricGroup);
+ checkNotNull(networkBufferPool);
+ internalRegisterShuffleMetrics(metricGroup, networkBufferPool);
+ }
+
+ private static void internalRegisterShuffleMetrics(
+ MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
+ MetricGroup networkGroup =
+
metricGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE);
+ networkGroup.gauge(
+ METRIC_TOTAL_MEMORY_SEGMENT,
networkBufferPool::getTotalNumberOfMemorySegments);
+ networkGroup.gauge(METRIC_TOTAL_MEMORY, networkBufferPool::getTotalMemory);
+ networkGroup.gauge(
+ METRIC_AVAILABLE_MEMORY_SEGMENT,
networkBufferPool::getNumberOfAvailableMemorySegments);
+ networkGroup.gauge(METRIC_AVAILABLE_MEMORY,
networkBufferPool::getAvailableMemory);
+ networkGroup.gauge(
+ METRIC_USED_MEMORY_SEGMENT,
networkBufferPool::getNumberOfUsedMemorySegments);
+ networkGroup.gauge(METRIC_USED_MEMORY, networkBufferPool::getUsedMemory);
+ networkGroup.gauge(
+ METRIC_REQUESTED_MEMORY_USAGE, new
RequestedMemoryUsageMetric(networkBufferPool));
+ }
+
+ public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup
parentGroup) {
+ return
parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE);
+ }
+}
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java
new file mode 100644
index 000000000..0d3d6e8a6
--- /dev/null
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RequestedMemoryUsageMetric.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.metric;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.View;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+
+/**
+ * This is a small hack. Instead of spawning a custom thread to monitor {@link
NetworkBufferPool}
+ * usage, we are re-using {@link View#update()} method for this purpose.
+ */
+public class RequestedMemoryUsageMetric implements Gauge<Integer>, View {
+
+ private final NetworkBufferPool networkBufferPool;
+
+ public RequestedMemoryUsageMetric(NetworkBufferPool networkBufferPool) {
+ this.networkBufferPool = networkBufferPool;
+ }
+
+ @Override
+ public Integer getValue() {
+ return networkBufferPool.getEstimatedRequestedSegmentsUsage();
+ }
+
+ @Override
+ public void update() {
+ networkBufferPool.maybeLogUsageWarning();
+ }
+}