This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4bc0ca2 [FLINK-21760][metrics][jmx] Remove flink-runtime dependency
4bc0ca2 is described below
commit 4bc0ca22666e1d437a8380f6eb119585aa147d19
Author: Chesnay Schepler <[email protected]>
AuthorDate: Sat Mar 13 10:53:57 2021 +0100
[FLINK-21760][metrics][jmx] Remove flink-runtime dependency
---
flink-connectors/flink-connector-kafka/pom.xml | 2 +-
flink-dist/pom.xml | 2 +-
flink-dist/src/main/assemblies/plugins.xml | 2 +-
flink-metrics/flink-metrics-jmx/pom.xml | 17 +-
.../apache/flink/metrics/jmx/JMXReporterTest.java | 276 +++++++--------------
5 files changed, 94 insertions(+), 205 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/pom.xml
b/flink-connectors/flink-connector-kafka/pom.xml
index 6c9303a..6bb8265 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -157,7 +157,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-metrics-jmx_${scala.binary.version}</artifactId>
+ <artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 09b2de0..e9bf600 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -268,7 +268,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-metrics-jmx_${scala.binary.version}</artifactId>
+ <artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
diff --git a/flink-dist/src/main/assemblies/plugins.xml
b/flink-dist/src/main/assemblies/plugins.xml
index f02dc87..30ce5b6 100644
--- a/flink-dist/src/main/assemblies/plugins.xml
+++ b/flink-dist/src/main/assemblies/plugins.xml
@@ -32,7 +32,7 @@
<!-- Metrics -->
<file>
-
<source>../flink-metrics/flink-metrics-jmx/target/flink-metrics-jmx_${scala.binary.version}-${project.version}.jar</source>
+
<source>../flink-metrics/flink-metrics-jmx/target/flink-metrics-jmx-${project.version}.jar</source>
<outputDirectory>plugins/metrics-jmx/</outputDirectory>
<destName>flink-metrics-jmx-${project.version}.jar</destName>
<fileMode>0644</fileMode>
diff --git a/flink-metrics/flink-metrics-jmx/pom.xml
b/flink-metrics/flink-metrics-jmx/pom.xml
index 472f562..8dd4f4b 100644
--- a/flink-metrics/flink-metrics-jmx/pom.xml
+++ b/flink-metrics/flink-metrics-jmx/pom.xml
@@ -29,7 +29,7 @@ under the License.
<relativePath>..</relativePath>
</parent>
- <artifactId>flink-metrics-jmx_${scala.binary.version}</artifactId>
+ <artifactId>flink-metrics-jmx</artifactId>
<name>Flink : Metrics : JMX</name>
<dependencies>
@@ -49,13 +49,6 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
@@ -73,14 +66,6 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
</dependency>
<dependency>
diff --git
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 036cbb9..4e47c5f 100644
---
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -21,15 +21,11 @@ package org.apache.flink.metrics.jmx;
import org.apache.flink.management.jmx.JMXService;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.util.TestHistogram;
import org.apache.flink.metrics.util.TestMeter;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
-import org.apache.flink.runtime.metrics.ReporterSetup;
-import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
-import org.apache.flink.runtime.metrics.groups.ReporterScopedSettings;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.metrics.util.TestMetricGroup;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -46,20 +42,30 @@ import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
-import java.util.List;
import java.util.Map;
import static org.apache.flink.metrics.jmx.JMXReporter.JMX_DOMAIN_PREFIX;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/** Tests for the JMXReporter. */
public class JMXReporterTest extends TestLogger {
+ private static final Map<String, String> variables;
+ private static final MetricGroup metricGroup;
+
+ static {
+ variables = new HashMap<>();
+ variables.put("<host>", "localhost");
+
+ metricGroup =
+ TestMetricGroup.newBuilder()
+ .setLogicalScopeFunction((characterFilter, character)
-> "taskmanager")
+ .setVariables(variables)
+ .build();
+ }
+
@After
public void shutdownService() throws IOException {
JMXService.stopInstance();
@@ -105,64 +111,31 @@ public class JMXReporterTest extends TestLogger {
*/
@Test
public void testPortConflictHandling() throws Exception {
- ReporterSetup reporterSetup1 =
- ReporterSetup.forReporter("test1", new
JMXReporter("9020-9035"));
- ReporterSetup reporterSetup2 =
- ReporterSetup.forReporter("test2", new
JMXReporter("9020-9035"));
-
- MetricRegistryImpl reg =
- new MetricRegistryImpl(
-
MetricRegistryTestUtils.defaultMetricRegistryConfiguration(),
- Arrays.asList(reporterSetup1, reporterSetup2));
-
- TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host",
"tm");
-
- List<MetricReporter> reporters = reg.getReporters();
-
- assertTrue(reporters.size() == 2);
-
- MetricReporter rep1 = reporters.get(0);
- MetricReporter rep2 = reporters.get(1);
-
- Gauge<Integer> g1 =
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return 1;
- }
- };
- Gauge<Integer> g2 =
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return 2;
- }
- };
-
- rep1.notifyOfAddedMetric(
- g1, "rep1", new
FrontMetricGroup<>(createReporterScopedSettings(0), mg));
- rep2.notifyOfAddedMetric(
- g2, "rep2", new
FrontMetricGroup<>(createReporterScopedSettings(0), mg));
+ final MetricReporter rep1 = new JMXReporter("9020-9035");
+ final MetricReporter rep2 = new JMXReporter("9020-9035");
+
+ Gauge<Integer> g1 = () -> 1;
+ Gauge<Integer> g2 = () -> 2;
+
+ rep1.notifyOfAddedMetric(g1, "rep1", metricGroup);
+ rep2.notifyOfAddedMetric(g2, "rep2", metricGroup);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName1 =
new ObjectName(
JMX_DOMAIN_PREFIX + "taskmanager.rep1",
- JMXReporter.generateJmxTable(mg.getAllVariables()));
+
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
ObjectName objectName2 =
new ObjectName(
JMX_DOMAIN_PREFIX + "taskmanager.rep2",
- JMXReporter.generateJmxTable(mg.getAllVariables()));
+
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
assertEquals(1, mBeanServer.getAttribute(objectName1, "Value"));
assertEquals(2, mBeanServer.getAttribute(objectName2, "Value"));
rep1.notifyOfRemovedMetric(g1, "rep1", null);
rep1.notifyOfRemovedMetric(g2, "rep2", null);
-
- mg.close();
- reg.shutdown().get();
}
/**
@@ -172,54 +145,23 @@ public class JMXReporterTest extends TestLogger {
*/
@Test
public void testJMXAvailability() throws Exception {
- ReporterSetup reporterSetup1 =
- ReporterSetup.forReporter("test1", new
JMXReporter("9040-9055"));
- ReporterSetup reporterSetup2 =
- ReporterSetup.forReporter("test2", new
JMXReporter("9040-9055"));
-
- MetricRegistryImpl reg =
- new MetricRegistryImpl(
-
MetricRegistryTestUtils.defaultMetricRegistryConfiguration(),
- Arrays.asList(reporterSetup1, reporterSetup2));
-
- TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host",
"tm");
-
- List<MetricReporter> reporters = reg.getReporters();
-
- assertTrue(reporters.size() == 2);
-
- MetricReporter rep1 = reporters.get(0);
- MetricReporter rep2 = reporters.get(1);
-
- Gauge<Integer> g1 =
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return 1;
- }
- };
- Gauge<Integer> g2 =
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return 2;
- }
- };
-
- rep1.notifyOfAddedMetric(
- g1, "rep1", new
FrontMetricGroup<>(createReporterScopedSettings(0), mg));
-
- rep2.notifyOfAddedMetric(
- g2, "rep2", new
FrontMetricGroup<>(createReporterScopedSettings(1), mg));
+ final MetricReporter rep1 = new JMXReporter("9040-9055");
+ final MetricReporter rep2 = new JMXReporter("9040-9055");
+
+ Gauge<Integer> g1 = () -> 1;
+ Gauge<Integer> g2 = () -> 2;
+
+ rep1.notifyOfAddedMetric(g1, "rep1", metricGroup);
+ rep2.notifyOfAddedMetric(g2, "rep2", metricGroup);
ObjectName objectName1 =
new ObjectName(
JMX_DOMAIN_PREFIX + "taskmanager.rep1",
- JMXReporter.generateJmxTable(mg.getAllVariables()));
+
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
ObjectName objectName2 =
new ObjectName(
JMX_DOMAIN_PREFIX + "taskmanager.rep2",
- JMXReporter.generateJmxTable(mg.getAllVariables()));
+
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
JMXServiceURL url1 =
new JMXServiceURL(
@@ -259,118 +201,80 @@ public class JMXReporterTest extends TestLogger {
rep1.close();
rep2.close();
- mg.close();
- reg.shutdown().get();
}
/** Tests that histograms are properly reported via the JMXReporter. */
@Test
public void testHistogramReporting() throws Exception {
- MetricRegistryImpl registry = null;
String histogramName = "histogram";
- try {
- registry =
- new MetricRegistryImpl(
-
MetricRegistryTestUtils.defaultMetricRegistryConfiguration(),
- Collections.singletonList(
- ReporterSetup.forReporter("test", new
JMXReporter(null))));
-
- TaskManagerMetricGroup metricGroup =
- new TaskManagerMetricGroup(registry, "localhost", "tmId");
-
- TestHistogram histogram = new TestHistogram();
-
- metricGroup.histogram(histogramName, histogram);
-
- MBeanServer mBeanServer =
ManagementFactory.getPlatformMBeanServer();
-
- ObjectName objectName =
- new ObjectName(
- JMX_DOMAIN_PREFIX + "taskmanager." + histogramName,
-
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
-
- MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
-
- MBeanAttributeInfo[] attributeInfos = info.getAttributes();
-
- assertEquals(11, attributeInfos.length);
-
- assertEquals(histogram.getCount(),
mBeanServer.getAttribute(objectName, "Count"));
- HistogramStatistics statistics = histogram.getStatistics();
- assertEquals(statistics.getMean(),
mBeanServer.getAttribute(objectName, "Mean"));
- assertEquals(statistics.getStdDev(),
mBeanServer.getAttribute(objectName, "StdDev"));
- assertEquals(statistics.getMax(),
mBeanServer.getAttribute(objectName, "Max"));
- assertEquals(statistics.getMin(),
mBeanServer.getAttribute(objectName, "Min"));
- assertEquals(
- statistics.getQuantile(0.5),
mBeanServer.getAttribute(objectName, "Median"));
- assertEquals(
- statistics.getQuantile(0.75),
- mBeanServer.getAttribute(objectName, "75thPercentile"));
- assertEquals(
- statistics.getQuantile(0.95),
- mBeanServer.getAttribute(objectName, "95thPercentile"));
- assertEquals(
- statistics.getQuantile(0.98),
- mBeanServer.getAttribute(objectName, "98thPercentile"));
- assertEquals(
- statistics.getQuantile(0.99),
- mBeanServer.getAttribute(objectName, "99thPercentile"));
- assertEquals(
- statistics.getQuantile(0.999),
- mBeanServer.getAttribute(objectName, "999thPercentile"));
-
- } finally {
- if (registry != null) {
- registry.shutdown().get();
- }
- }
+ final JMXReporter reporter = new JMXReporter(null);
+
+ TestHistogram histogram = new TestHistogram();
+
+ reporter.notifyOfAddedMetric(histogram, histogramName, metricGroup);
+
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ ObjectName objectName =
+ new ObjectName(
+ JMX_DOMAIN_PREFIX + "taskmanager." + histogramName,
+
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
+
+ MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
+
+ MBeanAttributeInfo[] attributeInfos = info.getAttributes();
+
+ assertEquals(11, attributeInfos.length);
+
+ assertEquals(histogram.getCount(),
mBeanServer.getAttribute(objectName, "Count"));
+ HistogramStatistics statistics = histogram.getStatistics();
+ assertEquals(statistics.getMean(),
mBeanServer.getAttribute(objectName, "Mean"));
+ assertEquals(statistics.getStdDev(),
mBeanServer.getAttribute(objectName, "StdDev"));
+ assertEquals(statistics.getMax(), mBeanServer.getAttribute(objectName,
"Max"));
+ assertEquals(statistics.getMin(), mBeanServer.getAttribute(objectName,
"Min"));
+ assertEquals(statistics.getQuantile(0.5),
mBeanServer.getAttribute(objectName, "Median"));
+ assertEquals(
+ statistics.getQuantile(0.75),
+ mBeanServer.getAttribute(objectName, "75thPercentile"));
+ assertEquals(
+ statistics.getQuantile(0.95),
+ mBeanServer.getAttribute(objectName, "95thPercentile"));
+ assertEquals(
+ statistics.getQuantile(0.98),
+ mBeanServer.getAttribute(objectName, "98thPercentile"));
+ assertEquals(
+ statistics.getQuantile(0.99),
+ mBeanServer.getAttribute(objectName, "99thPercentile"));
+ assertEquals(
+ statistics.getQuantile(0.999),
+ mBeanServer.getAttribute(objectName, "999thPercentile"));
}
/** Tests that meters are properly reported via the JMXReporter. */
@Test
public void testMeterReporting() throws Exception {
- MetricRegistryImpl registry = null;
String meterName = "meter";
- try {
- registry =
- new MetricRegistryImpl(
-
MetricRegistryTestUtils.defaultMetricRegistryConfiguration(),
- Collections.singletonList(
- ReporterSetup.forReporter("test", new
JMXReporter(null))));
-
- TaskManagerMetricGroup metricGroup =
- new TaskManagerMetricGroup(registry, "localhost", "tmId");
-
- TestMeter meter = new TestMeter();
+ final JMXReporter reporter = new JMXReporter(null);
- metricGroup.meter(meterName, meter);
+ TestMeter meter = new TestMeter();
+ reporter.notifyOfAddedMetric(meter, meterName, metricGroup);
- MBeanServer mBeanServer =
ManagementFactory.getPlatformMBeanServer();
-
- ObjectName objectName =
- new ObjectName(
- JMX_DOMAIN_PREFIX + "taskmanager." + meterName,
-
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
-
- MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
- MBeanAttributeInfo[] attributeInfos = info.getAttributes();
+ ObjectName objectName =
+ new ObjectName(
+ JMX_DOMAIN_PREFIX + "taskmanager." + meterName,
+
JMXReporter.generateJmxTable(metricGroup.getAllVariables()));
- assertEquals(2, attributeInfos.length);
+ MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
- assertEquals(meter.getRate(), mBeanServer.getAttribute(objectName,
"Rate"));
- assertEquals(meter.getCount(),
mBeanServer.getAttribute(objectName, "Count"));
+ MBeanAttributeInfo[] attributeInfos = info.getAttributes();
- } finally {
- if (registry != null) {
- registry.shutdown().get();
- }
- }
- }
+ assertEquals(2, attributeInfos.length);
- private static ReporterScopedSettings createReporterScopedSettings(int
reporterIndex) {
- return new ReporterScopedSettings(reporterIndex, ',',
Collections.emptySet());
+ assertEquals(meter.getRate(), mBeanServer.getAttribute(objectName,
"Rate"));
+ assertEquals(meter.getCount(), mBeanServer.getAttribute(objectName,
"Count"));
}
}