fapaul commented on a change in pull request #16971:
URL: https://github.com/apache/flink/pull/16971#discussion_r695473405



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -159,17 +168,51 @@ void applyRemovals() {
 
     @Override
     public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
-        MetricGroup metricGroup = unwrap(group);
-        LOG.debug("Registered {} @ {}", metricName, metricGroup);
-        synchronized (this) {
-            metrics.computeIfAbsent(metricGroup, dummy -> new 
HashMap<>()).put(metricName, metric);
+        if (captureMode != CaptureMode.NONE) {
+            MetricGroup metricGroup = unwrap(group);
+            LOG.debug("Registered {} @ {}", metricName, metricGroup);
+            synchronized (this) {
+                metrics.computeIfAbsent(metricGroup, dummy -> new HashMap<>())
+                        .put(metricName, metric);
+            }
         }
     }
 
     @Override
     public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
-        synchronized (this) {
-            removedGroups.add(unwrap(group));
+        if (captureMode != CaptureMode.NONE) {
+            synchronized (this) {

Review comment:
       Nit: Why not make only `removeMetric` synchronized and the 
`removedGroups` some thread-safe set?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -159,17 +168,51 @@ void applyRemovals() {
 
     @Override
     public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
-        MetricGroup metricGroup = unwrap(group);
-        LOG.debug("Registered {} @ {}", metricName, metricGroup);
-        synchronized (this) {
-            metrics.computeIfAbsent(metricGroup, dummy -> new 
HashMap<>()).put(metricName, metric);
+        if (captureMode != CaptureMode.NONE) {
+            MetricGroup metricGroup = unwrap(group);
+            LOG.debug("Registered {} @ {}", metricName, metricGroup);
+            synchronized (this) {
+                metrics.computeIfAbsent(metricGroup, dummy -> new HashMap<>())

Review comment:
       WDYT about making metrics a `ConcurrentHashMap`? It seems to me that we 
mostly only synchronize on the object to prevent concurrent access to the 
metrics map.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporterRule.java
##########
@@ -18,8 +18,13 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.testutils.InMemoryReporter.CaptureMode;
 import org.apache.flink.util.ExternalResource;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A JUnit rule that encapsulates {@link InMemoryReporter} properly with test 
cases.

Review comment:
       I think we have to update the docstring slightly because without using 
this rule the `InMemoryReporter`  of the minicluster does not capture anything 
anymore.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -159,17 +168,51 @@ void applyRemovals() {
 
     @Override
     public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
-        MetricGroup metricGroup = unwrap(group);
-        LOG.debug("Registered {} @ {}", metricName, metricGroup);
-        synchronized (this) {
-            metrics.computeIfAbsent(metricGroup, dummy -> new 
HashMap<>()).put(metricName, metric);
+        if (captureMode != CaptureMode.NONE) {
+            MetricGroup metricGroup = unwrap(group);
+            LOG.debug("Registered {} @ {}", metricName, metricGroup);
+            synchronized (this) {
+                metrics.computeIfAbsent(metricGroup, dummy -> new HashMap<>())
+                        .put(metricName, metric);
+            }
         }
     }
 
     @Override
     public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
-        synchronized (this) {
-            removedGroups.add(unwrap(group));
+        if (captureMode != CaptureMode.NONE) {
+            synchronized (this) {
+                if (captureMode == CaptureMode.EAGER_RELEASE) {
+                    removeMetric(metricName, group);
+                } else if (captureMode == CaptureMode.LAZY_RELEASE) {
+                    removedGroups.add(unwrap(group));

Review comment:
       Nit: `unwrap(group)` is called on both if branches you can put it before 
the if-statements.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjects.java
##########
@@ -75,7 +75,7 @@
      */
     private final int id;
     /** All registered objects for the current test case. The objects are 
purged upon completion. */
-    private final transient Map<SharedReference<?>, Object> objects = new 
ConcurrentHashMap<>();
+    private final Map<SharedReference<?>, Object> objects = new 
ConcurrentHashMap<>();

Review comment:
       Nit: Make a separate commit for this change? It is not obvious why this 
change is needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to