XComp commented on code in PR #19555:
URL: https://github.com/apache/flink/pull/19555#discussion_r857409226
##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -52,96 +44,30 @@
* different subtasks.
*/
class PrometheusReporterTaskScopeTest {
- private static final String[] LABEL_NAMES = {
- "job_id",
- "task_id",
- "task_attempt_id",
- "host",
- "task_name",
- "task_attempt_num",
- "job_name",
- "tm_id",
- "subtask_index"
- };
+ private static final String[] LABEL_NAMES = {"label1", "label2"};
+ private static final String[][] LABEL_VALUES =
Review Comment:
Why do we use `String[][]` here? We're always accessing the label values
through their index. Hence, we could also create two fields `LABEL_VALUES_0`
and `LABEL_VALUES_1` (or something similar). Auto-completion would better in
that case...
##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -152,92 +78,80 @@ void countersCanBeAddedSeveralTimesIfTheyDifferInLabels()
throws UnirestExceptio
Counter counter2 = new SimpleCounter();
counter2.inc(2);
- taskMetricGroup1.counter("my_counter", counter1);
- taskMetricGroup2.counter("my_counter", counter2);
+ reporter.notifyOfAddedMetric(counter1, METRIC_NAME, metricGroup1);
+ reporter.notifyOfAddedMetric(counter2, METRIC_NAME, metricGroup2);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_counter",
LABEL_NAMES, labelValues1))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[0]))
.isEqualTo(1.);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_counter",
LABEL_NAMES, labelValues2))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[1]))
.isEqualTo(2.);
}
@Test
void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws
UnirestException {
- Gauge<Integer> gauge1 =
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return 3;
- }
- };
- Gauge<Integer> gauge2 =
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return 4;
- }
- };
+ Gauge<Integer> gauge1 = () -> 3;
+ Gauge<Integer> gauge2 = () -> 4;
- taskMetricGroup1.gauge("my_gauge", gauge1);
- taskMetricGroup2.gauge("my_gauge", gauge2);
+ reporter.notifyOfAddedMetric(gauge1, METRIC_NAME, metricGroup1);
+ reporter.notifyOfAddedMetric(gauge2, METRIC_NAME, metricGroup2);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_gauge",
LABEL_NAMES, labelValues1))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[0]))
.isEqualTo(3.);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_gauge",
LABEL_NAMES, labelValues2))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[1]))
.isEqualTo(4.);
}
@Test
void metersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws
UnirestException {
Meter meter = new TestMeter();
- taskMetricGroup1.meter("my_meter", meter);
- taskMetricGroup2.meter("my_meter", meter);
+ reporter.notifyOfAddedMetric(meter, METRIC_NAME, metricGroup1);
+ reporter.notifyOfAddedMetric(meter, METRIC_NAME, metricGroup2);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_meter",
LABEL_NAMES, labelValues1))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[0]))
.isEqualTo(5.);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_meter",
LABEL_NAMES, labelValues2))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[1]))
.isEqualTo(5.);
}
@Test
void histogramsCanBeAddedSeveralTimesIfTheyDifferInLabels() throws
UnirestException {
Histogram histogram = new TestHistogram();
- taskMetricGroup1.histogram("my_histogram", histogram);
- taskMetricGroup2.histogram("my_histogram", histogram);
+ reporter.notifyOfAddedMetric(histogram, METRIC_NAME, metricGroup1);
+ reporter.notifyOfAddedMetric(histogram, METRIC_NAME, metricGroup2);
final String exportedMetrics =
pollMetrics(reporter.getPort()).getBody();
assertThat(exportedMetrics)
- .contains("subtask_index=\"0\",quantile=\"0.5\",} 0.5"); //
histogram
+ .contains("label2=\"value1_2\",quantile=\"0.5\",} 0.5"); //
histogram
assertThat(exportedMetrics)
- .contains("subtask_index=\"1\",quantile=\"0.5\",} 0.5"); //
histogram
+ .contains("label2=\"value2_2\",quantile=\"0.5\",} 0.5"); //
histogram
Review Comment:
nit: What about setting actual values as well in the histograms to
differentiate them?
```
TestHistogram histogram0 = new TestHistogram();
histogram0.setCount(10);
TestHistogram histogram1 = new TestHistogram();
histogram1.setCount(20);
reporter.notifyOfAddedMetric(histogram0, METRIC_NAME, metricGroup1);
reporter.notifyOfAddedMetric(histogram1, METRIC_NAME, metricGroup2);
final String exportedMetrics = pollMetrics(reporter.getPort()).getBody();
assertThat(exportedMetrics).contains("count{label1=\"value1_1\",label2=\"value1_2\",}
10");
assertThat(exportedMetrics).contains("count{label1=\"value2_1\",label2=\"value2_2\",}
20");
```
##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java:
##########
@@ -137,21 +123,7 @@ private void assertThatGaugeIsExported(Metric metric,
String name, String expect
throws UnirestException {
final String prometheusName = SCOPE_PREFIX + name;
Review Comment:
I guess this unused variable is not needed anymore. The
`addMetriAndPollResponse` expects the plain metric name (based on how it was
used in `PrometheusReporterTaskScopeTest`) and `createExpectedPollResponse`
adds the prefix within the method...
##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -152,92 +78,80 @@ void countersCanBeAddedSeveralTimesIfTheyDifferInLabels()
throws UnirestExceptio
Counter counter2 = new SimpleCounter();
counter2.inc(2);
- taskMetricGroup1.counter("my_counter", counter1);
- taskMetricGroup2.counter("my_counter", counter2);
+ reporter.notifyOfAddedMetric(counter1, METRIC_NAME, metricGroup1);
+ reporter.notifyOfAddedMetric(counter2, METRIC_NAME, metricGroup2);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_counter",
LABEL_NAMES, labelValues1))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[0]))
.isEqualTo(1.);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_counter",
LABEL_NAMES, labelValues2))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[1]))
.isEqualTo(2.);
}
@Test
void gaugesCanBeAddedSeveralTimesIfTheyDifferInLabels() throws
UnirestException {
- Gauge<Integer> gauge1 =
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return 3;
- }
- };
- Gauge<Integer> gauge2 =
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return 4;
- }
- };
+ Gauge<Integer> gauge1 = () -> 3;
+ Gauge<Integer> gauge2 = () -> 4;
- taskMetricGroup1.gauge("my_gauge", gauge1);
- taskMetricGroup2.gauge("my_gauge", gauge2);
+ reporter.notifyOfAddedMetric(gauge1, METRIC_NAME, metricGroup1);
+ reporter.notifyOfAddedMetric(gauge2, METRIC_NAME, metricGroup2);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_gauge",
LABEL_NAMES, labelValues1))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[0]))
.isEqualTo(3.);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_gauge",
LABEL_NAMES, labelValues2))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[1]))
.isEqualTo(4.);
}
@Test
void metersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws
UnirestException {
Meter meter = new TestMeter();
- taskMetricGroup1.meter("my_meter", meter);
- taskMetricGroup2.meter("my_meter", meter);
+ reporter.notifyOfAddedMetric(meter, METRIC_NAME, metricGroup1);
+ reporter.notifyOfAddedMetric(meter, METRIC_NAME, metricGroup2);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_meter",
LABEL_NAMES, labelValues1))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[0]))
.isEqualTo(5.);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
- "flink_taskmanager_job_task_my_meter",
LABEL_NAMES, labelValues2))
+ getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[1]))
.isEqualTo(5.);
Review Comment:
Same here: Differentiating the values makes the test more sensitive to
failures:
```
reporter.notifyOfAddedMetric(new TestMeter(10, 1), METRIC_NAME,
metricGroup1);
reporter.notifyOfAddedMetric(new TestMeter(10, 2), METRIC_NAME,
metricGroup2);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[0]))
.isEqualTo(1.);
assertThat(
CollectorRegistry.defaultRegistry.getSampleValue(
getLogicalScope(METRIC_NAME), LABEL_NAMES,
LABEL_VALUES[1]))
.isEqualTo(2.);
```
##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java:
##########
@@ -192,32 +150,31 @@ void histogramIsReportedAsPrometheusSummary() throws
UnirestException {
}
}
+ /**
+ * Metrics with the same name are stored by the reporter in a shared
data-structure. This test
+ * ensures that a metric is unregistered from Prometheus even if other
metrics with the same
+ * name still exist.
+ */
@Test
- void metricIsRemovedWhenCollectorIsNotUnregisteredYet() throws
UnirestException {
- JobManagerMetricGroup jmMetricGroup =
- JobManagerMetricGroup.createJobManagerMetricGroup(registry,
HOST_NAME);
-
+ void metricIsRemovedWhileOtherMetricsWithSameNameExist() throws
UnirestException {
String metricName = "metric";
Counter metric1 = new SimpleCounter();
- FrontMetricGroup<JobManagerJobMetricGroup> metricGroup1 =
- new FrontMetricGroup<>(
- createReporterScopedSettings(),
- jmMetricGroup.addJob(JobID.generate(), "job_1"));
- reporter.notifyOfAddedMetric(metric1, metricName, metricGroup1);
-
Counter metric2 = new SimpleCounter();
- FrontMetricGroup<JobManagerJobMetricGroup> metricGroup2 =
- new FrontMetricGroup<>(
- createReporterScopedSettings(),
- jmMetricGroup.addJob(JobID.generate(), "job_2"));
- reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2);
- reporter.notifyOfRemovedMetric(metric1, metricName, metricGroup1);
+ final Map<String, String> variables2 = new
HashMap<>(metricGroup.getAllVariables());
+ final Map.Entry<String, String> entryToModify =
variables2.entrySet().iterator().next();
+ final String labelValueThatShouldBeRemoved = entryToModify.getValue();
+ variables2.put(entryToModify.getKey(), "some_value");
+ final MetricGroup metricGroup2 =
TestUtils.createTestMetricGroup(LOGICAL_SCOPE, variables2);
+
+ reporter.notifyOfAddedMetric(metric1, metricName, metricGroup);
+ reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2);
+ reporter.notifyOfRemovedMetric(metric1, metricName, metricGroup);
String response = pollMetrics(reporter.getPort()).getBody();
- assertThat(response).doesNotContain("job_1");
+ assertThat(response).doesNotContain(labelValueThatShouldBeRemoved);
Review Comment:
Why is it enough here to just check for the removal? Shouldn't we also check
for the other metric not being touched? 🤔
##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java:
##########
@@ -48,53 +40,47 @@
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.NoSuchElementException;
-import static
org.apache.flink.metrics.prometheus.PrometheusReporterFactory.ARG_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Basic test for {@link PrometheusReporter}. */
class PrometheusReporterTest {
Review Comment:
nit: some of the test method signatures can be cleaned: The exception is not
always thrown...
##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java:
##########
@@ -271,27 +228,16 @@ void registeringSameMetricTwiceDoesNotThrowException() {
@Test
void cannotStartTwoReportersOnSamePort() throws Exception {
- ReporterSetup setup1 = createReporterSetup("test1",
portRangeProvider.next());
-
- int usedPort = ((PrometheusReporter) setup1.getReporter()).getPort();
-
- try {
- assertThatThrownBy(() -> createReporterSetup("test2",
String.valueOf(usedPort)))
- .isInstanceOf(Exception.class);
- } finally {
- setup1.getReporter().close();
- }
+ assertThatThrownBy(
+ () ->
+ new PrometheusReporter(
+
Collections.singleton(reporter.getPort()).iterator()))
+ .isInstanceOf(Exception.class);
}
@Test
void canStartTwoReportersWhenUsingPortRange() throws Exception {
- String portRange = portRangeProvider.next();
-
- ReporterSetup setup1 = createReporterSetup("test1", portRange);
- ReporterSetup setup2 = createReporterSetup("test2", portRange);
-
- setup1.getReporter().close();
- setup2.getReporter().close();
+ new PrometheusReporter(portRangeProvider.next()).close();
Review Comment:
I guess, we're testing different things here: The old implementation used
the same range. The new test implementations runs on two different ranges (the
iterator increases the base of the next range by 100)...
##########
flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java:
##########
@@ -52,96 +44,30 @@
* different subtasks.
*/
class PrometheusReporterTaskScopeTest {
Review Comment:
nit: The `UnirestException` in the test signatures are obsolete in most of
the tests...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]