This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 5363432 [FLINK-22725][coordination] SlotManagers unregister metrics
in suspend()
5363432 is described below
commit 5363432300e4c102a73ac1365fee198d21289790
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue May 25 22:26:26 2021 +0200
[FLINK-22725][coordination] SlotManagers unregister metrics in suspend()
---
.../slotmanager/DeclarativeSlotManager.java | 3 +-
.../slotmanager/FineGrainedSlotManager.java | 3 +-
.../slotmanager/SlotManagerImpl.java | 3 +-
.../slotmanager/DeclarativeSlotManagerTest.java | 36 +++++++++++++++++
.../slotmanager/FineGrainedSlotManagerTest.java | 46 ++++++++++++++++++++++
.../FineGrainedSlotManagerTestBase.java | 19 ++++++++-
6 files changed, 105 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 1cc4337..ab8e155 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -211,6 +211,8 @@ public class DeclarativeSlotManager implements SlotManager {
LOG.info("Suspending the slot manager.");
+ slotManagerMetricGroup.close();
+
resourceTracker.clear();
if (taskExecutorManager != null) {
taskExecutorManager.close();
@@ -238,7 +240,6 @@ public class DeclarativeSlotManager implements SlotManager {
LOG.info("Closing the slot manager.");
suspend();
- slotManagerMetricGroup.close();
}
//
---------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 13c6109..5a3a372 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -218,6 +218,8 @@ public class FineGrainedSlotManager implements SlotManager {
LOG.info("Suspending the slot manager.");
+ slotManagerMetricGroup.close();
+
// stop the timeout checks for the TaskManagers
if (taskManagerTimeoutsCheck != null) {
taskManagerTimeoutsCheck.cancel(false);
@@ -249,7 +251,6 @@ public class FineGrainedSlotManager implements SlotManager {
LOG.info("Closing the slot manager.");
suspend();
- slotManagerMetricGroup.close();
}
//
---------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index c17ba46..b8cfc33 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -345,6 +345,8 @@ public class SlotManagerImpl implements SlotManager {
public void suspend() {
LOG.info("Suspending the SlotManager.");
+ slotManagerMetricGroup.close();
+
// stop the timeout checks for the TaskManagers and the SlotRequests
if (taskManagerTimeoutsAndRedundancyCheck != null) {
taskManagerTimeoutsAndRedundancyCheck.cancel(false);
@@ -386,7 +388,6 @@ public class SlotManagerImpl implements SlotManager {
LOG.info("Closing the SlotManager.");
suspend();
- slotManagerMetricGroup.close();
}
//
---------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 88a130a..1f39968 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -32,6 +32,9 @@ import
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
@@ -49,6 +52,7 @@ import
org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
@@ -77,6 +81,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
@@ -1450,6 +1455,37 @@ public class DeclarativeSlotManagerTest extends
TestLogger {
}
}
+ @Test
+ public void testMetricsUnregisteredWhenSuspending() throws Exception {
+ testAccessMetricValueDuringItsUnregister(SlotManager::suspend);
+ }
+
+ @Test
+ public void testMetricsUnregisteredWhenClosing() throws Exception {
+ testAccessMetricValueDuringItsUnregister(AutoCloseable::close);
+ }
+
+ private void testAccessMetricValueDuringItsUnregister(
+ ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception
{
+ final AtomicInteger registeredMetrics = new AtomicInteger();
+ final MetricRegistry metricRegistry =
+ TestingMetricRegistry.builder()
+ .setRegisterConsumer((a, b, c) ->
registeredMetrics.incrementAndGet())
+ .setUnregisterConsumer((a, b, c) ->
registeredMetrics.decrementAndGet())
+ .build();
+
+ final DeclarativeSlotManager slotManager =
+ createDeclarativeSlotManagerBuilder()
+ .setSlotManagerMetricGroup(
+ SlotManagerMetricGroup.create(metricRegistry,
"localhost"))
+ .buildAndStartWithDirectExec();
+
+ // sanity check to ensure metrics were actually registered
+ assertThat(registeredMetrics.get(), greaterThan(0));
+ closeFn.accept(slotManager);
+ assertThat(registeredMetrics.get(), is(0));
+ }
+
private static SlotReport createSlotReport(ResourceID
taskExecutorResourceId, int numberSlots) {
final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots);
for (int i = 0; i < numberSlots; i++) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 82d4cf4..3357e35 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -27,6 +27,9 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -35,6 +38,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.Test;
@@ -44,10 +48,12 @@ import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@@ -55,6 +61,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
/** Tests of {@link FineGrainedSlotManager}. */
@@ -948,4 +955,43 @@ public class FineGrainedSlotManagerTest extends
FineGrainedSlotManagerTestBase {
}
};
}
+
+ @Test
+ public void testMetricsUnregisteredWhenSuspending() throws Exception {
+ testAccessMetricValueDuringItsUnregister(SlotManager::suspend);
+ }
+
+ @Test
+ public void testMetricsUnregisteredWhenClosing() throws Exception {
+ testAccessMetricValueDuringItsUnregister(AutoCloseable::close);
+ }
+
+ private void testAccessMetricValueDuringItsUnregister(
+ ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception
{
+ final AtomicInteger registeredMetrics = new AtomicInteger();
+ final MetricRegistry metricRegistry =
+ TestingMetricRegistry.builder()
+ .setRegisterConsumer((a, b, c) ->
registeredMetrics.incrementAndGet())
+ .setUnregisterConsumer((a, b, c) ->
registeredMetrics.decrementAndGet())
+ .build();
+
+ final Context context = new Context();
+ context.setSlotManagerMetricGroup(
+ SlotManagerMetricGroup.create(metricRegistry, "localhost"));
+
+ context.runTest(
+ () -> {
+ // sanity check to ensure metrics were actually registered
+ assertThat(registeredMetrics.get(), greaterThan(0));
+ context.runInMainThreadAndWait(
+ () -> {
+ try {
+ closeFn.accept(context.getSlotManager());
+ } catch (Exception e) {
+ fail("Error when closing slot manager.");
+ }
+ });
+ assertThat(registeredMetrics.get(), is(0));
+ });
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 69fe129..a05098e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -136,7 +137,7 @@ public abstract class FineGrainedSlotManagerTestBase
extends TestLogger {
private final TaskManagerTracker taskManagerTracker = new
FineGrainedTaskManagerTracker();
private final SlotStatusSyncer slotStatusSyncer =
new DefaultSlotStatusSyncer(Time.seconds(10L));
- private final SlotManagerMetricGroup slotManagerMetricGroup =
+ private SlotManagerMetricGroup slotManagerMetricGroup =
UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
private final ScheduledExecutor scheduledExecutor =
TestingUtils.defaultScheduledExecutor();
private final Executor mainThreadExecutor = MAIN_THREAD_EXECUTOR;
@@ -171,10 +172,24 @@ public abstract class FineGrainedSlotManagerTestBase
extends TestLogger {
this.requirementCheckDelay = requirementCheckDelay;
}
+ public void setSlotManagerMetricGroup(SlotManagerMetricGroup
slotManagerMetricGroup) {
+ this.slotManagerMetricGroup = slotManagerMetricGroup;
+ }
+
void runInMainThread(Runnable runnable) {
mainThreadExecutor.execute(runnable);
}
+ void runInMainThreadAndWait(Runnable runnable) throws
InterruptedException {
+ final OneShotLatch latch = new OneShotLatch();
+ mainThreadExecutor.execute(
+ () -> {
+ runnable.run();
+ latch.trigger();
+ });
+ latch.await();
+ }
+
protected final void runTest(RunnableWithException testMethod) throws
Exception {
slotManager =
new FineGrainedSlotManager(
@@ -187,7 +202,7 @@ public abstract class FineGrainedSlotManagerTestBase
extends TestLogger {
getResourceAllocationStrategy()
.orElse(resourceAllocationStrategyBuilder.build()),
Time.milliseconds(requirementCheckDelay));
- runInMainThread(
+ runInMainThreadAndWait(
() ->
slotManager.start(
resourceManagerId,