zentol commented on a change in pull request #15175:
URL: https://github.com/apache/flink/pull/15175#discussion_r593813599
##########
File path:
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
##########
@@ -50,30 +49,11 @@ int getPort() {
return port;
}
- @Override
- public void open(MetricConfig config) {
- super.open(config);
-
- String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT);
- Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
-
- while (ports.hasNext()) {
- int port = ports.next();
- try {
- // internally accesses CollectorRegistry.defaultRegistry
- httpServer = new HTTPServer(port);
- this.port = port;
- log.info("Started PrometheusReporter HTTP server on port {}.",
port);
- break;
- } catch (IOException ioe) { // assume port conflict
- log.debug("Could not start PrometheusReporter HTTP server on
port {}.", port, ioe);
- }
- }
- if (httpServer == null) {
- throw new RuntimeException(
- "Could not start PrometheusReporter HTTP server on any
configured port. Ports: "
- + portsConfig);
- }
+ PrometheusReporter(
+ final int port,
+ @Nullable HTTPServer httpServer) {
+ this.httpServer = Preconditions.checkNotNull(httpServer);
Review comment:
`@Nullable` to us implies that null is an expected case that should be
acceptable. Hence, it doesn't make to annotate it as such if we then right away
enforce that it is in fact not null.
##########
File path:
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
##########
@@ -17,18 +17,58 @@
package org.apache.flink.metrics.prometheus;
+import io.prometheus.client.exporter.HTTPServer;
+
+import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.util.NetUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Iterator;
import java.util.Properties;
/** {@link MetricReporterFactory} for {@link PrometheusReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName =
"org.apache.flink.metrics.prometheus.PrometheusReporter")
public class PrometheusReporterFactory implements MetricReporterFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(PrometheusReporterFactory.class);
+
+ static final String ARG_PORT = "port";
+ private static final String DEFAULT_PORT = "9249";
+
@Override
public PrometheusReporter createMetricReporter(Properties properties) {
- return new PrometheusReporter();
+ MetricConfig metricConfig = (MetricConfig)properties;
+ String portsConfig = metricConfig.getString(ARG_PORT, DEFAULT_PORT);
+ Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
+ Integer port = null;
+ HTTPServer httpServer = null;
+ while (ports.hasNext()) {
+ port = ports.next();
+ try {
+ // internally accesses CollectorRegistry.defaultRegistry
+ httpServer = new HTTPServer(port);
Review comment:
let's move this into the constructor, and have it accept the ports
iterator
##########
File path:
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
##########
@@ -51,75 +42,21 @@
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory")
public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter
implements Scheduled {
- private PushGateway pushGateway;
- private String jobName;
- private boolean deleteOnShutdown;
- private Map<String, String> groupingKey;
-
- @Override
- public void open(MetricConfig config) {
- super.open(config);
-
- String host = config.getString(HOST.key(), HOST.defaultValue());
- int port = config.getInteger(PORT.key(), PORT.defaultValue());
- String configuredJobName = config.getString(JOB_NAME.key(),
JOB_NAME.defaultValue());
- boolean randomSuffix =
- config.getBoolean(
- RANDOM_JOB_NAME_SUFFIX.key(),
RANDOM_JOB_NAME_SUFFIX.defaultValue());
- deleteOnShutdown =
- config.getBoolean(DELETE_ON_SHUTDOWN.key(),
DELETE_ON_SHUTDOWN.defaultValue());
- groupingKey =
- parseGroupingKey(config.getString(GROUPING_KEY.key(),
GROUPING_KEY.defaultValue()));
-
- if (host == null || host.isEmpty() || port < 1) {
- throw new IllegalArgumentException(
- "Invalid host/port configuration. Host: " + host + " Port:
" + port);
- }
-
- if (randomSuffix) {
- this.jobName = configuredJobName + new AbstractID();
- } else {
- this.jobName = configuredJobName;
- }
-
- pushGateway = new PushGateway(host + ':' + port);
- log.info(
- "Configured PrometheusPushGatewayReporter with {host:{},
port:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{},
groupingKey:{}}",
- host,
- port,
- jobName,
- randomSuffix,
- deleteOnShutdown,
- groupingKey);
- }
-
- Map<String, String> parseGroupingKey(final String groupingKeyConfig) {
- if (!groupingKeyConfig.isEmpty()) {
- Map<String, String> groupingKey = new HashMap<>();
- String[] kvs = groupingKeyConfig.split(";");
- for (String kv : kvs) {
- int idx = kv.indexOf("=");
- if (idx < 0) {
- log.warn("Invalid prometheusPushGateway groupingKey:{},
will be ignored", kv);
- continue;
- }
-
- String labelKey = kv.substring(0, idx);
- String labelValue = kv.substring(idx + 1);
- if (StringUtils.isNullOrWhitespaceOnly(labelKey)
- || StringUtils.isNullOrWhitespaceOnly(labelValue)) {
- log.warn(
- "Invalid groupingKey {labelKey:{}, labelValue:{}}
must not be empty",
- labelKey,
- labelValue);
- continue;
- }
- groupingKey.put(labelKey, labelValue);
- }
-
- return groupingKey;
- }
- return Collections.emptyMap();
+ private final PushGateway pushGateway;
+ private final String jobName;
+ private final Map<String, String> groupingKey;
+ private final boolean deleteOnShutdown;
+
+ PrometheusPushGatewayReporter(
+ @Nullable PushGateway pushGateway,
+ @Nullable String jobName,
Review comment:
see my other comment regarding Nullable
##########
File path:
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java
##########
@@ -17,18 +17,102 @@
package org.apache.flink.metrics.prometheus;
+import io.prometheus.client.exporter.PushGateway;
+
+import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.DELETE_ON_SHUTDOWN;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
+
/** {@link MetricReporterFactory} for {@link PrometheusPushGatewayReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName =
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter")
public class PrometheusPushGatewayReporterFactory implements
MetricReporterFactory {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(PrometheusPushGatewayReporterFactory.class);
+
@Override
public PrometheusPushGatewayReporter createMetricReporter(Properties
properties) {
- return new PrometheusPushGatewayReporter();
+ MetricConfig metricConfig = (MetricConfig)properties;
+
+ String host = metricConfig.getString(HOST.key(), HOST.defaultValue());
+ int port = metricConfig.getInteger(PORT.key(), PORT.defaultValue());
+ String configuredJobName = metricConfig.getString(JOB_NAME.key(),
JOB_NAME.defaultValue());
+ boolean randomSuffix =
+ metricConfig.getBoolean(
+ RANDOM_JOB_NAME_SUFFIX.key(),
RANDOM_JOB_NAME_SUFFIX.defaultValue());
+ boolean deleteOnShutdown =
+ metricConfig.getBoolean(DELETE_ON_SHUTDOWN.key(),
DELETE_ON_SHUTDOWN.defaultValue());
+ Map<String, String> groupingKey =
+ parseGroupingKey(metricConfig.getString(GROUPING_KEY.key(),
GROUPING_KEY.defaultValue()));
+
+ if (host == null || host.isEmpty() || port < 1) {
+ throw new IllegalArgumentException(
+ "Invalid host/port configuration. Host: " + host + " Port:
" + port);
+ }
+
+ String jobName = configuredJobName;
+ if (randomSuffix) {
+ jobName = configuredJobName + new AbstractID();
+ }
+
+ PushGateway pushGateway = new PushGateway(host + ':' + port);
Review comment:
I would move this back into the constructor.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]