This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 64dbcec69b8 Revert "[FLINK-27420] Recreate metric groups for each new
RM to avoid metric loss"
64dbcec69b8 is described below
commit 64dbcec69b8f8859ca57e9615c5d7283f4b58e16
Author: Yun Gao <[email protected]>
AuthorDate: Tue May 3 13:05:03 2022 +0800
Revert "[FLINK-27420] Recreate metric groups for each new RM to avoid
metric loss"
This reverts commit e0c82d6d52871dbbea70c9b41384d2d33179bec0.
---
.../resourcemanager/ResourceManagerFactory.java | 15 +++--
.../ResourceManagerProcessContext.java | 23 +++----
.../ResourceManagerServiceImplTest.java | 70 ----------------------
3 files changed, 21 insertions(+), 87 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index e661816d8ce..893ad9b33e3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -62,6 +62,11 @@ public abstract class ResourceManagerFactory<T extends
ResourceIDRetrievable> {
Executor ioExecutor)
throws ConfigurationException {
+ final ResourceManagerMetricGroup resourceManagerMetricGroup =
+ ResourceManagerMetricGroup.create(metricRegistry, hostname);
+ final SlotManagerMetricGroup slotManagerMetricGroup =
+ SlotManagerMetricGroup.create(metricRegistry, hostname);
+
final Configuration runtimeServicesAndRmConfig =
getEffectiveConfigurationForResourceManagerAndRuntimeServices(configuration);
@@ -81,8 +86,8 @@ public abstract class ResourceManagerFactory<T extends
ResourceIDRetrievable> {
fatalErrorHandler,
clusterInformation,
webInterfaceUrl,
- metricRegistry,
- hostname,
+ resourceManagerMetricGroup,
+ slotManagerMetricGroup,
ioExecutor);
}
@@ -94,8 +99,7 @@ public abstract class ResourceManagerFactory<T extends
ResourceIDRetrievable> {
context.getRmRuntimeServicesConfig(),
context.getRpcService(),
context.getHighAvailabilityServices(),
- SlotManagerMetricGroup.create(
- context.getMetricRegistry(),
context.getHostname()));
+ context.getSlotManagerMetricGroup());
return createResourceManager(
context.getRmConfig(),
@@ -106,8 +110,7 @@ public abstract class ResourceManagerFactory<T extends
ResourceIDRetrievable> {
context.getFatalErrorHandler(),
context.getClusterInformation(),
context.getWebInterfaceUrl(),
- ResourceManagerMetricGroup.create(
- context.getMetricRegistry(), context.getHostname()),
+ context.getResourceManagerMetricGroup(),
resourceManagerRuntimeServices,
context.getIoExecutor());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
index e1e751fda1b..994947de407 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerProcessContext.java
@@ -23,7 +23,8 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -49,8 +50,8 @@ public class ResourceManagerProcessContext {
private final FatalErrorHandler fatalErrorHandler;
private final ClusterInformation clusterInformation;
@Nullable private final String webInterfaceUrl;
- private final MetricRegistry metricRegistry;
- private final String hostname;
+ private final ResourceManagerMetricGroup resourceManagerMetricGroup;
+ private final SlotManagerMetricGroup slotManagerMetricGroup;
private final Executor ioExecutor;
public ResourceManagerProcessContext(
@@ -63,8 +64,8 @@ public class ResourceManagerProcessContext {
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
- MetricRegistry metricRegistry,
- String hostname,
+ ResourceManagerMetricGroup resourceManagerMetricGroup,
+ SlotManagerMetricGroup slotManagerMetricGroup,
Executor ioExecutor) {
this.rmConfig = checkNotNull(rmConfig);
this.resourceId = checkNotNull(resourceId);
@@ -74,8 +75,8 @@ public class ResourceManagerProcessContext {
this.heartbeatServices = checkNotNull(heartbeatServices);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.clusterInformation = checkNotNull(clusterInformation);
- this.metricRegistry = checkNotNull(metricRegistry);
- this.hostname = checkNotNull(hostname);
+ this.resourceManagerMetricGroup =
checkNotNull(resourceManagerMetricGroup);
+ this.slotManagerMetricGroup = checkNotNull(slotManagerMetricGroup);
this.ioExecutor = checkNotNull(ioExecutor);
this.webInterfaceUrl = webInterfaceUrl;
@@ -118,12 +119,12 @@ public class ResourceManagerProcessContext {
return webInterfaceUrl;
}
- public MetricRegistry getMetricRegistry() {
- return metricRegistry;
+ public ResourceManagerMetricGroup getResourceManagerMetricGroup() {
+ return resourceManagerMetricGroup;
}
- public String getHostname() {
- return hostname;
+ public SlotManagerMetricGroup getSlotManagerMetricGroup() {
+ return slotManagerMetricGroup;
}
public Executor getIoExecutor() {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 1f5bef00718..3d90b203296 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.rpc.RpcUtils;
@@ -35,20 +34,14 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
-import org.assertj.core.util.Sets;
import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeoutException;
@@ -499,62 +492,6 @@ public class ResourceManagerServiceImplTest extends
TestLogger {
deregisterApplicationFuture.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
}
- @Test
- public void grantAndRevokeLeadership_verifyMetrics() throws Exception {
- final Set<String> registeredMetrics = Collections.newSetFromMap(new
ConcurrentHashMap<>());
- TestingMetricRegistry metricRegistry =
- TestingMetricRegistry.builder()
- .setRegisterConsumer((a, b, c) ->
registeredMetrics.add(b))
- .setUnregisterConsumer((a, b, c) ->
registeredMetrics.remove(b))
- .build();
-
- final TestingResourceManagerFactory rmFactory =
rmFactoryBuilder.build();
- resourceManagerService =
- ResourceManagerServiceImpl.create(
- rmFactory,
- new Configuration(),
- ResourceID.generate(),
- rpcService,
- haService,
- heartbeatServices,
- delegationTokenManager,
- fatalErrorHandler,
- clusterInformation,
- null,
- metricRegistry,
- "localhost",
- ForkJoinPool.commonPool());
- resourceManagerService.start();
-
- Assert.assertEquals(0, registeredMetrics.size());
- // grant leadership
- leaderElectionService.isLeader(UUID.randomUUID());
-
- assertRmStarted();
- Set<String> expectedMetrics =
- Sets.set(
- MetricNames.NUM_REGISTERED_TASK_MANAGERS,
- MetricNames.TASK_SLOTS_TOTAL,
- MetricNames.TASK_SLOTS_AVAILABLE);
- Assert.assertTrue(
- "Expected RM to register leader metrics",
- registeredMetrics.containsAll(expectedMetrics));
-
- // revoke leadership, block until old rm is terminated
- revokeLeadership();
-
- Set<String> intersection = new HashSet<>(registeredMetrics);
- intersection.retainAll(expectedMetrics);
- Assert.assertTrue("Expected RM to unregister leader metrics",
intersection.isEmpty());
-
- leaderElectionService.isLeader(UUID.randomUUID());
-
- assertRmStarted();
- Assert.assertTrue(
- "Expected RM to re-register leader metrics",
- registeredMetrics.containsAll(expectedMetrics));
- }
-
private static void blockOnFuture(CompletableFuture<?> future) {
try {
future.get();
@@ -576,11 +513,4 @@ public class ResourceManagerServiceImplTest extends
TestLogger {
private void assertRmStarted() throws Exception {
leaderElectionService.getConfirmationFuture().get(TIMEOUT.getSize(),
TIMEOUT.getUnit());
}
-
- private void revokeLeadership() {
- ResourceManager<?> leaderResourceManager =
- resourceManagerService.getLeaderResourceManager();
- leaderElectionService.notLeader();
- blockOnFuture(leaderResourceManager.getTerminationFuture());
- }
}