Repository: flink Updated Branches: refs/heads/master 6b8f7dc2d -> e22c777f4
[FLINK-7933][metrics] Improve PrometheusReporter tests This closes #4908. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e22c777f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e22c777f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e22c777f Branch: refs/heads/master Commit: e22c777f44b3e24cf9f4509672f981c71d4eb715 Parents: 6b8f7dc Author: zentol <[email protected]> Authored: Thu Oct 26 19:04:30 2017 +0200 Committer: zentol <[email protected]> Committed: Mon Oct 30 09:37:48 2017 +0100 ---------------------------------------------------------------------- .../metrics/prometheus/PrometheusReporter.java | 9 +++ .../PrometheusReporterTaskScopeTest.java | 37 ++++++++---- .../prometheus/PrometheusReporterTest.java | 61 ++++++++++---------- 3 files changed, 65 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e22c777f/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java index 1e44ab9..fad3ced 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java @@ -32,6 +32,7 @@ import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.Preconditions; import io.prometheus.client.Collector; import io.prometheus.client.CollectorRegistry; @@ -73,9 +74,16 @@ public class PrometheusReporter implements MetricReporter { private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR; private HTTPServer httpServer; + private int port; private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, Integer>> collectorsWithCountByMetricName = new HashMap<>(); @VisibleForTesting + int getPort() { + Preconditions.checkState(httpServer != null, "Server has not been initialized."); + return port; + } + + @VisibleForTesting static String replaceInvalidChars(final String input) { // https://prometheus.io/docs/instrumenting/writing_exporters/ // Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore. @@ -91,6 +99,7 @@ public class PrometheusReporter implements MetricReporter { int port = ports.next(); try { httpServer = new HTTPServer(port); + this.port = port; LOG.info("Started PrometheusReporter HTTP server on port {}.", port); break; } catch (IOException ioe) { //assume port conflict http://git-wip-us.apache.org/repos/asf/flink/blob/e22c777f/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java index c7d4040..0ae8fc7 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java @@ -37,6 +37,7 @@ import org.apache.flink.util.AbstractID; import com.mashape.unirest.http.exceptions.UnirestException; import io.prometheus.client.CollectorRegistry; import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.Arrays; @@ -62,7 +63,6 @@ public class PrometheusReporterTaskScopeTest { private static final int SUBTASK_INDEX_1 = 0; private static final int SUBTASK_INDEX_2 = 1; - private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9429"))); private final JobID jobId = new JobID(); private final JobVertexID taskId1 = new JobVertexID(); @@ -72,10 +72,29 @@ public class PrometheusReporterTaskScopeTest { private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private PrometheusReporter reporter; + + @Before + public void setupReporter() { + registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); + reporter = (PrometheusReporter) registry.getReporters().get(0); + + TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); + TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); + taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); + taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + } + + @After + public void shutdownRegistry() { + if (registry != null) { + registry.shutdown(); + } + } @Test public void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException { @@ -137,7 +156,7 @@ public class PrometheusReporterTaskScopeTest { taskMetricGroup1.histogram("my_histogram", histogram); taskMetricGroup2.histogram("my_histogram", histogram); - final String exportedMetrics = pollMetrics().getBody(); + final String exportedMetrics = pollMetrics(reporter.getPort()).getBody(); assertThat(exportedMetrics, containsString("subtask_index=\"0\",quantile=\"0.5\",} 0.5")); // histogram assertThat(exportedMetrics, containsString("subtask_index=\"1\",quantile=\"0.5\",} 0.5")); // histogram @@ -179,10 +198,4 @@ public class PrometheusReporterTaskScopeTest { labelNames[LABEL_NAMES.length] = element; return labelNames; } - - @After - public void shutdownRegistry() { - registry.shutdown(); - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/e22c777f/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 956339b..0d7be6d 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -27,7 +27,6 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; @@ -40,6 +39,7 @@ import com.mashape.unirest.http.HttpResponse; import com.mashape.unirest.http.Unirest; import com.mashape.unirest.http.exceptions.UnirestException; import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -56,7 +56,6 @@ import static org.junit.Assert.assertThat; * Basic test for {@link PrometheusReporter}. */ public class PrometheusReporterTest extends TestLogger { - private static final int NON_DEFAULT_PORT = 9429; private static final String HOST_NAME = "hostname"; private static final String TASK_MANAGER = "tm"; @@ -70,9 +69,23 @@ public class PrometheusReporterTest extends TestLogger { @Rule public ExpectedException thrown = ExpectedException.none(); - private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "" + NON_DEFAULT_PORT))); - private final FrontMetricGroup<TaskManagerMetricGroup> metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)); - private final MetricReporter reporter = registry.getReporters().get(0); + private MetricRegistry registry; + private FrontMetricGroup<TaskManagerMetricGroup> metricGroup; + private PrometheusReporter reporter; + + @Before + public void setupReporter() { + registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); + metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)); + reporter = (PrometheusReporter) registry.getReporters().get(0); + } + + @After + public void shutdownRegistry() { + if (registry != null) { + registry.shutdown(); + } + } /** * {@link io.prometheus.client.Counter} may not decrease, so report {@link Counter} as {@link io.prometheus.client.Gauge}. @@ -145,9 +158,11 @@ public class PrometheusReporterTest extends TestLogger { @Test public void endpointIsUnavailableAfterReporterIsClosed() throws UnirestException { + MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); + PrometheusReporter reporter = (PrometheusReporter) registry.getReporters().get(0); reporter.close(); thrown.expect(UnirestException.class); - pollMetrics(); + pollMetrics(reporter.getPort()); } @Test @@ -229,10 +244,12 @@ public class PrometheusReporterTest extends TestLogger { @Test public void cannotStartTwoReportersOnSamePort() { - final MetricRegistry fixedPort1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "12345"))); - final MetricRegistry fixedPort2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "12345"))); - + final MetricRegistry fixedPort1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); assertThat(fixedPort1.getReporters(), hasSize(1)); + + PrometheusReporter firstReporter = (PrometheusReporter) fixedPort1.getReporters().get(0); + + final MetricRegistry fixedPort2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort())))); assertThat(fixedPort2.getReporters(), hasSize(0)); fixedPort1.shutdown(); @@ -241,8 +258,8 @@ public class PrometheusReporterTest extends TestLogger { @Test public void canStartTwoReportersWhenUsingPortRange() { - final MetricRegistry portRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9249-9252"))); - final MetricRegistry portRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9249-9252"))); + final MetricRegistry portRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9200-9300"))); + final MetricRegistry portRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9200-9300"))); assertThat(portRange1.getReporters(), hasSize(1)); assertThat(portRange2.getReporters(), hasSize(1)); @@ -251,28 +268,13 @@ public class PrometheusReporterTest extends TestLogger { portRange2.shutdown(); } - @Test - public void cannotStartThreeReportersWhenPortRangeIsTooSmall() { - final MetricRegistry smallPortRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9253-9254"))); - final MetricRegistry smallPortRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9253-9254"))); - final MetricRegistry smallPortRange3 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test3", "9253-9254"))); - - assertThat(smallPortRange1.getReporters(), hasSize(1)); - assertThat(smallPortRange2.getReporters(), hasSize(1)); - assertThat(smallPortRange3.getReporters(), hasSize(0)); - - smallPortRange1.shutdown(); - smallPortRange2.shutdown(); - smallPortRange3.shutdown(); - } - private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException { reporter.notifyOfAddedMetric(metric, metricName, metricGroup); - return pollMetrics().getBody(); + return pollMetrics(reporter.getPort()).getBody(); } - static HttpResponse<String> pollMetrics() throws UnirestException { - return Unirest.get("http://localhost:" + NON_DEFAULT_PORT + "/metrics").asString(); + static HttpResponse<String> pollMetrics(int port) throws UnirestException { + return Unirest.get("http://localhost:" + port + "/metrics").asString(); } static Configuration createConfigWithOneReporter(String reporterName, String portString) { @@ -285,7 +287,6 @@ public class PrometheusReporterTest extends TestLogger { @After public void closeReporterAndShutdownRegistry() { - reporter.close(); registry.shutdown(); } }
