This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e72c9d8  [FLINK-12359][metrics][tests] Harden 
SystemResourcesMetricsITCase
e72c9d8 is described below

commit e72c9d8881faf4681c1c1aa229e79e77a89ec15f
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri May 3 11:25:10 2019 +0200

    [FLINK-12359][metrics][tests] Harden SystemResourcesMetricsITCase
---
 .../metrics/SystemResourcesMetricsITCase.java      | 62 +++++++++++-----------
 1 file changed, 31 insertions(+), 31 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
index b875d97..7801a28 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java
@@ -19,9 +19,12 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.TestLogger;
@@ -30,16 +33,17 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 /**
  * Integration tests for proper initialization of the system resource metrics.
@@ -58,6 +62,8 @@ public class SystemResourcesMetricsITCase extends TestLogger {
                Configuration configuration = new Configuration();
                configuration.setBoolean(SYSTEM_RESOURCE_METRICS, true);
                configuration.setString(REPORTERS_LIST, "test_reporter");
+               configuration.setString(MetricOptions.SCOPE_NAMING_JM, 
"jobmanager");
+               configuration.setString(MetricOptions.SCOPE_NAMING_TM, 
"taskmanager");
                configuration.setString("metrics.reporter.test_reporter.class", 
TestReporter.class.getName());
                return configuration;
        }
@@ -67,25 +73,11 @@ public class SystemResourcesMetricsITCase extends 
TestLogger {
                assertEquals(1, TestReporter.OPENED_REPORTERS.size());
                TestReporter reporter = 
TestReporter.OPENED_REPORTERS.iterator().next();
 
-               List<String> expectedPatterns = getExpectedPatterns();
-
-               Collection<String> gaugeNames = reporter.getGauges().values();
-
-               for (String expectedPattern : expectedPatterns) {
-                       boolean found = false;
-                       for (String gaugeName : gaugeNames) {
-                               if (gaugeName.matches(expectedPattern)) {
-                                       found = true;
-                               }
-                       }
-                       if (!found) {
-                               fail(String.format("Failed to find gauge [%s] 
in registered gauges [%s]", expectedPattern, gaugeNames));
-                       }
-               }
+               reporter.patternsExhaustedFuture.get(10, TimeUnit.SECONDS);
        }
 
        private static List<String> getExpectedPatterns() {
-               String[] expectedGauges = new String[] {
+               String[] expectedGauges = {
                        "System.CPU.Idle",
                        "System.CPU.Sys",
                        "System.CPU.User",
@@ -101,9 +93,9 @@ public class SystemResourcesMetricsITCase extends TestLogger 
{
                        "System.Network.*SendRate"
                };
 
-               String[] expectedHosts = new String[] {
-                       "localhost.taskmanager.([a-f0-9\\\\-])*.",
-                       "localhost.jobmanager."
+               String[] expectedHosts = {
+                       "taskmanager.",
+                       "jobmanager."
                };
 
                List<String> patterns = new ArrayList<>();
@@ -118,13 +110,11 @@ public class SystemResourcesMetricsITCase extends 
TestLogger {
        /**
         * Test metric reporter that exposes registered metrics.
         */
-       public static final class TestReporter extends AbstractReporter {
+       public static final class TestReporter implements MetricReporter {
                public static final Set<TestReporter> OPENED_REPORTERS = 
ConcurrentHashMap.newKeySet();
-
-               @Override
-               public String filterCharacters(String input) {
-                       return input;
-               }
+               private final Map<String, CompletableFuture<Void>> 
patternFutures = getExpectedPatterns().stream()
+                       .collect(Collectors.toMap(pattern -> pattern, pattern 
-> new CompletableFuture<>()));
+               private final CompletableFuture<Void> patternsExhaustedFuture = 
FutureUtils.waitForAll(patternFutures.values());
 
                @Override
                public void open(MetricConfig config) {
@@ -136,8 +126,18 @@ public class SystemResourcesMetricsITCase extends 
TestLogger {
                        OPENED_REPORTERS.remove(this);
                }
 
-               public Map<Gauge<?>, String> getGauges() {
-                       return gauges;
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       final String metricIdentifier = 
group.getMetricIdentifier(metricName, name -> name);
+                       for (final String expectedPattern : 
patternFutures.keySet()) {
+                               if (metricIdentifier.matches(expectedPattern)) {
+                                       
patternFutures.get(expectedPattern).complete(null);
+                               }
+                       }
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
                }
        }
 }

Reply via email to