tillrohrmann commented on a change in pull request #16971:
URL: https://github.com/apache/flink/pull/16971#discussion_r695839994
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -61,17 +68,26 @@
@Experimental
@ThreadSafe
public class InMemoryReporter implements MetricReporter {
- private static final ThreadLocal<InMemoryReporter> REPORTERS =
- ThreadLocal.withInitial(InMemoryReporter::new);
-
- static InMemoryReporter getInstance() {
- return REPORTERS.get();
- }
-
private static final Logger LOG =
LoggerFactory.getLogger(InMemoryReporter.class);
+ private static final String ID = "ID";
+ private static final Map<UUID, InMemoryReporter> REPORTERS = new
ConcurrentHashMap<>();
private final Map<MetricGroup, Map<String, Metric>> metrics = new
HashMap<>();
private final Set<MetricGroup> removedGroups = new HashSet<>();
+ private final UUID id;
+
+ private volatile CaptureMode captureMode = CaptureMode.NONE;
+
+ enum CaptureMode {
+ NONE,
Review comment:
Isn't this value the same as not instantiating a `MetricReporter`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -159,17 +168,51 @@ void applyRemovals() {
@Override
public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
- MetricGroup metricGroup = unwrap(group);
- LOG.debug("Registered {} @ {}", metricName, metricGroup);
- synchronized (this) {
- metrics.computeIfAbsent(metricGroup, dummy -> new
HashMap<>()).put(metricName, metric);
+ if (captureMode != CaptureMode.NONE) {
+ MetricGroup metricGroup = unwrap(group);
+ LOG.debug("Registered {} @ {}", metricName, metricGroup);
+ synchronized (this) {
+ metrics.computeIfAbsent(metricGroup, dummy -> new HashMap<>())
Review comment:
I think Arvid is right in this case. Using a `ConcurrentHashMap` won't
solve our concurrency problems because
* `InMemoryReporter` has several collections it needs to keep in sync
atomically (`metrics` and `removedGroups`)
* `InMemoryReporter` executes non atomic operations on the `metrics`
collections (`removeMetric`) which requires locking
What I personally prefer doing is to introduce a dedicated `lock` and to use
`@GuardedBy` to express which data structures need to be accessed under the
`lock`. This also gives you a bit support from IntelliJ if you forget about it.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -61,17 +68,26 @@
@Experimental
@ThreadSafe
public class InMemoryReporter implements MetricReporter {
- private static final ThreadLocal<InMemoryReporter> REPORTERS =
- ThreadLocal.withInitial(InMemoryReporter::new);
-
- static InMemoryReporter getInstance() {
- return REPORTERS.get();
- }
-
private static final Logger LOG =
LoggerFactory.getLogger(InMemoryReporter.class);
+ private static final String ID = "ID";
+ private static final Map<UUID, InMemoryReporter> REPORTERS = new
ConcurrentHashMap<>();
private final Map<MetricGroup, Map<String, Metric>> metrics = new
HashMap<>();
private final Set<MetricGroup> removedGroups = new HashSet<>();
+ private final UUID id;
+
+ private volatile CaptureMode captureMode = CaptureMode.NONE;
+
+ enum CaptureMode {
+ NONE,
+ EAGER_RELEASE,
Review comment:
I think this should be the default behaviour tbh.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporterRule.java
##########
@@ -41,26 +58,41 @@
*/
@Experimental
public class InMemoryReporterRule implements ExternalResource {
- private final InMemoryReporter inMemoryReporter =
InMemoryReporter.getInstance();
+ private final InMemoryReporter inMemoryReporter;
- public static InMemoryReporterRule create() {
- return new InMemoryReporterRule();
+ public static InMemoryReporterRule fromMiniCluster(MiniClusterResource
miniClusterResource) {
Review comment:
Can we invert the dependency? Differently said: Can we allow to
configure a `MetricReporter` for the `MiniCluster`? If yes, then we can have
our `InMemoryReporterRule` (actually so far all `ExternalResources` have the
`Resource` suffix and `Rule`), which creates our reporter and then we start the
`MiniCluster` with this reporter. That way we can also have a special
`MetricReporter` that retains the metrics.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemoryReporter.java
##########
@@ -61,17 +68,26 @@
@Experimental
@ThreadSafe
public class InMemoryReporter implements MetricReporter {
- private static final ThreadLocal<InMemoryReporter> REPORTERS =
- ThreadLocal.withInitial(InMemoryReporter::new);
-
- static InMemoryReporter getInstance() {
- return REPORTERS.get();
- }
-
private static final Logger LOG =
LoggerFactory.getLogger(InMemoryReporter.class);
+ private static final String ID = "ID";
+ private static final Map<UUID, InMemoryReporter> REPORTERS = new
ConcurrentHashMap<>();
private final Map<MetricGroup, Map<String, Metric>> metrics = new
HashMap<>();
private final Set<MetricGroup> removedGroups = new HashSet<>();
+ private final UUID id;
+
+ private volatile CaptureMode captureMode = CaptureMode.NONE;
Review comment:
I prefer immutable classes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]