This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 64dbcec69b8 Revert "[FLINK-27420] Recreate metric groups for each new 
RM to avoid metric loss"
64dbcec69b8 is described below

commit 64dbcec69b8f8859ca57e9615c5d7283f4b58e16
Author: Yun Gao <[email protected]>
AuthorDate: Tue May 3 13:05:03 2022 +0800

    Revert "[FLINK-27420] Recreate metric groups for each new RM to avoid 
metric loss"
    
    This reverts commit e0c82d6d52871dbbea70c9b41384d2d33179bec0.
---
 .../resourcemanager/ResourceManagerFactory.java    | 15 +++--
 .../ResourceManagerProcessContext.java             | 23 +++----
 .../ResourceManagerServiceImplTest.java            | 70 ----------------------
 3 files changed, 21 insertions(+), 87 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index e661816d8ce..893ad9b33e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -62,6 +62,11 @@ public abstract class ResourceManagerFactory<T extends 
ResourceIDRetrievable> {
             Executor ioExecutor)
             throws ConfigurationException {
 
+        final ResourceManagerMetricGroup resourceManagerMetricGroup =
+                ResourceManagerMetricGroup.create(metricRegistry, hostname);
+        final SlotManagerMetricGroup slotManagerMetricGroup =
+                SlotManagerMetricGroup.create(metricRegistry, hostname);
+
         final Configuration runtimeServicesAndRmConfig =
                 
getEffectiveConfigurationForResourceManagerAndRuntimeServices(configuration);
 
@@ -81,8 +86,8 @@ public abstract class ResourceManagerFactory<T extends 
ResourceIDRetrievable> {
                 fatalErrorHandler,
                 clusterInformation,
                 webInterfaceUrl,
-                metricRegistry,
-                hostname,
+                resourceManagerMetricGroup,
+                slotManagerMetricGroup,
                 ioExecutor);
     }
 
@@ -94,8 +99,7 @@ public abstract class ResourceManagerFactory<T extends 
ResourceIDRetrievable> {
                         context.getRmRuntimeServicesConfig(),
                         context.getRpcService(),
                         context.getHighAvailabilityServices(),
-                        SlotManagerMetricGroup.create(
-                                context.getMetricRegistry(), 
context.getHostname()));
+                        context.getSlotManagerMetricGroup());
 
         return createResourceManager(
                 context.getRmConfig(),
@@ -106,8 +110,7 @@ public abstract class ResourceManagerFactory<T extends 
ResourceIDRetrievable> {
                 context.getFatalErrorHandler(),
                 context.getClusterInformation(),
                 context.getWebInterfaceUrl(),
-                ResourceManagerMetricGroup.create(
-                        context.getMetricRegistry(), context.getHostname()),
+                context.getResourceManagerMetricGroup(),
                 resourceManagerRuntimeServices,
                 context.getIoExecutor());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
index e1e751fda1b..994947de407 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
@@ -23,7 +23,8 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
@@ -49,8 +50,8 @@ public class ResourceManagerProcessContext {
     private final FatalErrorHandler fatalErrorHandler;
     private final ClusterInformation clusterInformation;
     @Nullable private final String webInterfaceUrl;
-    private final MetricRegistry metricRegistry;
-    private final String hostname;
+    private final ResourceManagerMetricGroup resourceManagerMetricGroup;
+    private final SlotManagerMetricGroup slotManagerMetricGroup;
     private final Executor ioExecutor;
 
     public ResourceManagerProcessContext(
@@ -63,8 +64,8 @@ public class ResourceManagerProcessContext {
             FatalErrorHandler fatalErrorHandler,
             ClusterInformation clusterInformation,
             @Nullable String webInterfaceUrl,
-            MetricRegistry metricRegistry,
-            String hostname,
+            ResourceManagerMetricGroup resourceManagerMetricGroup,
+            SlotManagerMetricGroup slotManagerMetricGroup,
             Executor ioExecutor) {
         this.rmConfig = checkNotNull(rmConfig);
         this.resourceId = checkNotNull(resourceId);
@@ -74,8 +75,8 @@ public class ResourceManagerProcessContext {
         this.heartbeatServices = checkNotNull(heartbeatServices);
         this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
         this.clusterInformation = checkNotNull(clusterInformation);
-        this.metricRegistry = checkNotNull(metricRegistry);
-        this.hostname = checkNotNull(hostname);
+        this.resourceManagerMetricGroup = 
checkNotNull(resourceManagerMetricGroup);
+        this.slotManagerMetricGroup = checkNotNull(slotManagerMetricGroup);
         this.ioExecutor = checkNotNull(ioExecutor);
 
         this.webInterfaceUrl = webInterfaceUrl;
@@ -118,12 +119,12 @@ public class ResourceManagerProcessContext {
         return webInterfaceUrl;
     }
 
-    public MetricRegistry getMetricRegistry() {
-        return metricRegistry;
+    public ResourceManagerMetricGroup getResourceManagerMetricGroup() {
+        return resourceManagerMetricGroup;
     }
 
-    public String getHostname() {
-        return hostname;
+    public SlotManagerMetricGroup getSlotManagerMetricGroup() {
+        return slotManagerMetricGroup;
     }
 
     public Executor getIoExecutor() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 1f5bef00718..3d90b203296 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.rpc.RpcUtils;
@@ -35,20 +34,14 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 
-import org.assertj.core.util.Sets;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeoutException;
 
@@ -499,62 +492,6 @@ public class ResourceManagerServiceImplTest extends 
TestLogger {
         deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
     }
 
-    @Test
-    public void grantAndRevokeLeadership_verifyMetrics() throws Exception {
-        final Set<String> registeredMetrics = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
-        TestingMetricRegistry metricRegistry =
-                TestingMetricRegistry.builder()
-                        .setRegisterConsumer((a, b, c) -> 
registeredMetrics.add(b))
-                        .setUnregisterConsumer((a, b, c) -> 
registeredMetrics.remove(b))
-                        .build();
-
-        final TestingResourceManagerFactory rmFactory = 
rmFactoryBuilder.build();
-        resourceManagerService =
-                ResourceManagerServiceImpl.create(
-                        rmFactory,
-                        new Configuration(),
-                        ResourceID.generate(),
-                        rpcService,
-                        haService,
-                        heartbeatServices,
-                        delegationTokenManager,
-                        fatalErrorHandler,
-                        clusterInformation,
-                        null,
-                        metricRegistry,
-                        "localhost",
-                        ForkJoinPool.commonPool());
-        resourceManagerService.start();
-
-        Assert.assertEquals(0, registeredMetrics.size());
-        // grant leadership
-        leaderElectionService.isLeader(UUID.randomUUID());
-
-        assertRmStarted();
-        Set<String> expectedMetrics =
-                Sets.set(
-                        MetricNames.NUM_REGISTERED_TASK_MANAGERS,
-                        MetricNames.TASK_SLOTS_TOTAL,
-                        MetricNames.TASK_SLOTS_AVAILABLE);
-        Assert.assertTrue(
-                "Expected RM to register leader metrics",
-                registeredMetrics.containsAll(expectedMetrics));
-
-        // revoke leadership, block until old rm is terminated
-        revokeLeadership();
-
-        Set<String> intersection = new HashSet<>(registeredMetrics);
-        intersection.retainAll(expectedMetrics);
-        Assert.assertTrue("Expected RM to unregister leader metrics", 
intersection.isEmpty());
-
-        leaderElectionService.isLeader(UUID.randomUUID());
-
-        assertRmStarted();
-        Assert.assertTrue(
-                "Expected RM to re-register leader metrics",
-                registeredMetrics.containsAll(expectedMetrics));
-    }
-
     private static void blockOnFuture(CompletableFuture<?> future) {
         try {
             future.get();
@@ -576,11 +513,4 @@ public class ResourceManagerServiceImplTest extends 
TestLogger {
     private void assertRmStarted() throws Exception {
         leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(), 
TIMEOUT.getUnit());
     }
-
-    private void revokeLeadership() {
-        ResourceManager<?> leaderResourceManager =
-                resourceManagerService.getLeaderResourceManager();
-        leaderElectionService.notLeader();
-        blockOnFuture(leaderResourceManager.getTerminationFuture());
-    }
 }

Reply via email to