zentol closed pull request #7347: [FLINK-10761][metrics] Do not acquire lock
for getAllVariables()
URL: https://github.com/apache/flink/pull/7347
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 4400b14a10d..fb321303222 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -111,17 +111,13 @@ public AbstractMetricGroup(MetricRegistry registry,
String[] scope, A parent) {
}
public Map<String, String> getAllVariables() {
- if (variables == null) { // avoid synchronization for common
case
- synchronized (this) {
- if (variables == null) {
- Map<String, String> tmpVariables = new
HashMap<>();
- putVariables(tmpVariables);
- if (parent != null) { // not true for
Job-/TaskManagerMetricGroup
-
tmpVariables.putAll(parent.getAllVariables());
- }
- variables = tmpVariables;
- }
+ if (variables == null) {
+ Map<String, String> tmpVariables = new HashMap<>();
+ putVariables(tmpVariables);
+ if (parent != null) { // not true for
Job-/TaskManagerMetricGroup
+ tmpVariables.putAll(parent.getAllVariables());
}
+ variables = tmpVariables;
}
return variables;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index f3f8b42b851..d750f63cfd8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -21,17 +21,22 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -252,4 +257,89 @@ public void testScopeGenerationWithoutReporters() throws
Exception {
testRegistry.shutdown().get();
}
}
+
+ @Test
+ public void testGetAllVariablesDoesNotDeadlock() throws
InterruptedException {
+ final TestMetricRegistry registry = new TestMetricRegistry();
+
+ final MetricGroup parent = new GenericMetricGroup(registry,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), "parent");
+ final MetricGroup child = parent.addGroup("child");
+
+ final Thread parentRegisteringThread = new Thread(() ->
parent.counter("parent_counter"));
+ final Thread childRegisteringThread = new Thread(() ->
child.counter("child_counter"));
+
+ final BlockerSync parentSync = new BlockerSync();
+ final BlockerSync childSync = new BlockerSync();
+
+ try {
+ // start both threads and have them block in the
registry, so they acquire the lock of their respective group
+
registry.setOnRegistrationAction(childSync::blockNonInterruptible);
+ childRegisteringThread.start();
+ childSync.awaitBlocker();
+
+
registry.setOnRegistrationAction(parentSync::blockNonInterruptible);
+ parentRegisteringThread.start();
+ parentSync.awaitBlocker();
+
+ // the parent thread remains blocked to simulate the
child thread holding some lock in the registry/reporter
+ // the child thread continues execution and calls
getAllVariables()
+ // in the past this would block indefinitely since the
method acquires the locks of all parent groups
+ childSync.releaseBlocker();
+ // wait with a timeout to ensure the finally block is
executed _at some point_, un-blocking the parent
+ childRegisteringThread.join(1000 * 10);
+
+ parentSync.releaseBlocker();
+ parentRegisteringThread.join();
+ } finally {
+ parentSync.releaseBlocker();
+ childSync.releaseBlocker();
+ parentRegisteringThread.join();
+ childRegisteringThread.join();
+ }
+ }
+
+ private static final class TestMetricRegistry implements MetricRegistry
{
+
+ private Runnable onRegistrationAction;
+
+ void setOnRegistrationAction(Runnable onRegistrationAction) {
+ this.onRegistrationAction = onRegistrationAction;
+ }
+
+ @Override
+ public char getDelimiter() {
+ return 0;
+ }
+
+ @Override
+ public char getDelimiter(int index) {
+ return 0;
+ }
+
+ @Override
+ public int getNumberReporters() {
+ return 0;
+ }
+
+ @Override
+ public void register(Metric metric, String metricName,
AbstractMetricGroup group) {
+ onRegistrationAction.run();
+ group.getAllVariables();
+ }
+
+ @Override
+ public void unregister(Metric metric, String metricName,
AbstractMetricGroup group) {
+ }
+
+ @Override
+ public ScopeFormats getScopeFormats() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getMetricQueryServicePath() {
+ return null;
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services