fapaul commented on a change in pull request #16971:
URL: https://github.com/apache/flink/pull/16971#discussion_r695473405
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -159,17 +168,51 @@ void applyRemovals() {
@Override
public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
- MetricGroup metricGroup = unwrap(group);
- LOG.debug("Registered {} @ {}", metricName, metricGroup);
- synchronized (this) {
- metrics.computeIfAbsent(metricGroup, dummy -> new
HashMap<>()).put(metricName, metric);
+ if (captureMode != CaptureMode.NONE) {
+ MetricGroup metricGroup = unwrap(group);
+ LOG.debug("Registered {} @ {}", metricName, metricGroup);
+ synchronized (this) {
+ metrics.computeIfAbsent(metricGroup, dummy -> new HashMap<>())
+ .put(metricName, metric);
+ }
}
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName,
MetricGroup group) {
- synchronized (this) {
- removedGroups.add(unwrap(group));
+ if (captureMode != CaptureMode.NONE) {
+ synchronized (this) {
Review comment:
Nit: Why not make only `removeMetric` synchronized and the
`removedGroups` some thread-safe set?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -159,17 +168,51 @@ void applyRemovals() {
@Override
public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
- MetricGroup metricGroup = unwrap(group);
- LOG.debug("Registered {} @ {}", metricName, metricGroup);
- synchronized (this) {
- metrics.computeIfAbsent(metricGroup, dummy -> new
HashMap<>()).put(metricName, metric);
+ if (captureMode != CaptureMode.NONE) {
+ MetricGroup metricGroup = unwrap(group);
+ LOG.debug("Registered {} @ {}", metricName, metricGroup);
+ synchronized (this) {
+ metrics.computeIfAbsent(metricGroup, dummy -> new HashMap<>())
Review comment:
WDYT about making metrics a `ConcurrentHashMap`? It seems to me that we
mostly only synchronize on the object to prevent concurrent access to the
metrics map.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporterRule.java
##########
@@ -18,8 +18,13 @@
package org.apache.flink.runtime.testutils;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.testutils.InMemoryReporter.CaptureMode;
import org.apache.flink.util.ExternalResource;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A JUnit rule that encapsulates {@link InMemoryReporter} properly with test
cases.
Review comment:
I think we have to update the docstring slightly because without using
this rule the `InMemoryReporter` of the minicluster does not capture anything
anymore.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -159,17 +168,51 @@ void applyRemovals() {
@Override
public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
- MetricGroup metricGroup = unwrap(group);
- LOG.debug("Registered {} @ {}", metricName, metricGroup);
- synchronized (this) {
- metrics.computeIfAbsent(metricGroup, dummy -> new
HashMap<>()).put(metricName, metric);
+ if (captureMode != CaptureMode.NONE) {
+ MetricGroup metricGroup = unwrap(group);
+ LOG.debug("Registered {} @ {}", metricName, metricGroup);
+ synchronized (this) {
+ metrics.computeIfAbsent(metricGroup, dummy -> new HashMap<>())
+ .put(metricName, metric);
+ }
}
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName,
MetricGroup group) {
- synchronized (this) {
- removedGroups.add(unwrap(group));
+ if (captureMode != CaptureMode.NONE) {
+ synchronized (this) {
+ if (captureMode == CaptureMode.EAGER_RELEASE) {
+ removeMetric(metricName, group);
+ } else if (captureMode == CaptureMode.LAZY_RELEASE) {
+ removedGroups.add(unwrap(group));
Review comment:
Nit: `unwrap(group)` is called on both if branches you can put it before
the if-statements.
##########
File path:
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjects.java
##########
@@ -75,7 +75,7 @@
*/
private final int id;
/** All registered objects for the current test case. The objects are
purged upon completion. */
- private final transient Map<SharedReference<?>, Object> objects = new
ConcurrentHashMap<>();
+ private final Map<SharedReference<?>, Object> objects = new
ConcurrentHashMap<>();
Review comment:
Nit: Make a separate commit for this change? It is not obvious why this
change is needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]