This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 5363432  [FLINK-22725][coordination] SlotManagers unregister metrics 
in suspend()
5363432 is described below

commit 5363432300e4c102a73ac1365fee198d21289790
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue May 25 22:26:26 2021 +0200

    [FLINK-22725][coordination] SlotManagers unregister metrics in suspend()
---
 .../slotmanager/DeclarativeSlotManager.java        |  3 +-
 .../slotmanager/FineGrainedSlotManager.java        |  3 +-
 .../slotmanager/SlotManagerImpl.java               |  3 +-
 .../slotmanager/DeclarativeSlotManagerTest.java    | 36 +++++++++++++++++
 .../slotmanager/FineGrainedSlotManagerTest.java    | 46 ++++++++++++++++++++++
 .../FineGrainedSlotManagerTestBase.java            | 19 ++++++++-
 6 files changed, 105 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 1cc4337..ab8e155 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -211,6 +211,8 @@ public class DeclarativeSlotManager implements SlotManager {
 
         LOG.info("Suspending the slot manager.");
 
+        slotManagerMetricGroup.close();
+
         resourceTracker.clear();
         if (taskExecutorManager != null) {
             taskExecutorManager.close();
@@ -238,7 +240,6 @@ public class DeclarativeSlotManager implements SlotManager {
         LOG.info("Closing the slot manager.");
 
         suspend();
-        slotManagerMetricGroup.close();
     }
 
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index 13c6109..5a3a372 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -218,6 +218,8 @@ public class FineGrainedSlotManager implements SlotManager {
 
         LOG.info("Suspending the slot manager.");
 
+        slotManagerMetricGroup.close();
+
         // stop the timeout checks for the TaskManagers
         if (taskManagerTimeoutsCheck != null) {
             taskManagerTimeoutsCheck.cancel(false);
@@ -249,7 +251,6 @@ public class FineGrainedSlotManager implements SlotManager {
         LOG.info("Closing the slot manager.");
 
         suspend();
-        slotManagerMetricGroup.close();
     }
 
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
index c17ba46..b8cfc33 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
@@ -345,6 +345,8 @@ public class SlotManagerImpl implements SlotManager {
     public void suspend() {
         LOG.info("Suspending the SlotManager.");
 
+        slotManagerMetricGroup.close();
+
         // stop the timeout checks for the TaskManagers and the SlotRequests
         if (taskManagerTimeoutsAndRedundancyCheck != null) {
             taskManagerTimeoutsAndRedundancyCheck.cancel(false);
@@ -386,7 +388,6 @@ public class SlotManagerImpl implements SlotManager {
         LOG.info("Closing the SlotManager.");
 
         suspend();
-        slotManagerMetricGroup.close();
     }
 
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 88a130a..1f39968 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -32,6 +32,9 @@ import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
@@ -49,6 +52,7 @@ import 
org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
 
@@ -77,6 +81,7 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
@@ -1450,6 +1455,37 @@ public class DeclarativeSlotManagerTest extends 
TestLogger {
         }
     }
 
+    @Test
+    public void testMetricsUnregisteredWhenSuspending() throws Exception {
+        testAccessMetricValueDuringItsUnregister(SlotManager::suspend);
+    }
+
+    @Test
+    public void testMetricsUnregisteredWhenClosing() throws Exception {
+        testAccessMetricValueDuringItsUnregister(AutoCloseable::close);
+    }
+
+    private void testAccessMetricValueDuringItsUnregister(
+            ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception 
{
+        final AtomicInteger registeredMetrics = new AtomicInteger();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer((a, b, c) -> 
registeredMetrics.incrementAndGet())
+                        .setUnregisterConsumer((a, b, c) -> 
registeredMetrics.decrementAndGet())
+                        .build();
+
+        final DeclarativeSlotManager slotManager =
+                createDeclarativeSlotManagerBuilder()
+                        .setSlotManagerMetricGroup(
+                                SlotManagerMetricGroup.create(metricRegistry, 
"localhost"))
+                        .buildAndStartWithDirectExec();
+
+        // sanity check to ensure metrics were actually registered
+        assertThat(registeredMetrics.get(), greaterThan(0));
+        closeFn.accept(slotManager);
+        assertThat(registeredMetrics.get(), is(0));
+    }
+
     private static SlotReport createSlotReport(ResourceID 
taskExecutorResourceId, int numberSlots) {
         final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots);
         for (int i = 0; i < numberSlots; i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 82d4cf4..3357e35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -27,6 +27,9 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -35,6 +38,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.Test;
 
@@ -44,10 +48,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -55,6 +61,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
 /** Tests of {@link FineGrainedSlotManager}. */
@@ -948,4 +955,43 @@ public class FineGrainedSlotManagerTest extends 
FineGrainedSlotManagerTestBase {
             }
         };
     }
+
+    @Test
+    public void testMetricsUnregisteredWhenSuspending() throws Exception {
+        testAccessMetricValueDuringItsUnregister(SlotManager::suspend);
+    }
+
+    @Test
+    public void testMetricsUnregisteredWhenClosing() throws Exception {
+        testAccessMetricValueDuringItsUnregister(AutoCloseable::close);
+    }
+
+    private void testAccessMetricValueDuringItsUnregister(
+            ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception 
{
+        final AtomicInteger registeredMetrics = new AtomicInteger();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer((a, b, c) -> 
registeredMetrics.incrementAndGet())
+                        .setUnregisterConsumer((a, b, c) -> 
registeredMetrics.decrementAndGet())
+                        .build();
+
+        final Context context = new Context();
+        context.setSlotManagerMetricGroup(
+                SlotManagerMetricGroup.create(metricRegistry, "localhost"));
+
+        context.runTest(
+                () -> {
+                    // sanity check to ensure metrics were actually registered
+                    assertThat(registeredMetrics.get(), greaterThan(0));
+                    context.runInMainThreadAndWait(
+                            () -> {
+                                try {
+                                    closeFn.accept(context.getSlotManager());
+                                } catch (Exception e) {
+                                    fail("Error when closing slot manager.");
+                                }
+                            });
+                    assertThat(registeredMetrics.get(), is(0));
+                });
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 69fe129..a05098e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -136,7 +137,7 @@ public abstract class FineGrainedSlotManagerTestBase 
extends TestLogger {
         private final TaskManagerTracker taskManagerTracker = new 
FineGrainedTaskManagerTracker();
         private final SlotStatusSyncer slotStatusSyncer =
                 new DefaultSlotStatusSyncer(Time.seconds(10L));
-        private final SlotManagerMetricGroup slotManagerMetricGroup =
+        private SlotManagerMetricGroup slotManagerMetricGroup =
                 
UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
         private final ScheduledExecutor scheduledExecutor = 
TestingUtils.defaultScheduledExecutor();
         private final Executor mainThreadExecutor = MAIN_THREAD_EXECUTOR;
@@ -171,10 +172,24 @@ public abstract class FineGrainedSlotManagerTestBase 
extends TestLogger {
             this.requirementCheckDelay = requirementCheckDelay;
         }
 
+        public void setSlotManagerMetricGroup(SlotManagerMetricGroup 
slotManagerMetricGroup) {
+            this.slotManagerMetricGroup = slotManagerMetricGroup;
+        }
+
         void runInMainThread(Runnable runnable) {
             mainThreadExecutor.execute(runnable);
         }
 
+        void runInMainThreadAndWait(Runnable runnable) throws 
InterruptedException {
+            final OneShotLatch latch = new OneShotLatch();
+            mainThreadExecutor.execute(
+                    () -> {
+                        runnable.run();
+                        latch.trigger();
+                    });
+            latch.await();
+        }
+
         protected final void runTest(RunnableWithException testMethod) throws 
Exception {
             slotManager =
                     new FineGrainedSlotManager(
@@ -187,7 +202,7 @@ public abstract class FineGrainedSlotManagerTestBase 
extends TestLogger {
                             getResourceAllocationStrategy()
                                     
.orElse(resourceAllocationStrategyBuilder.build()),
                             Time.milliseconds(requirementCheckDelay));
-            runInMainThread(
+            runInMainThreadAndWait(
                     () ->
                             slotManager.start(
                                     resourceManagerId,

Reply via email to