zentol commented on a change in pull request #15175:
URL: https://github.com/apache/flink/pull/15175#discussion_r593813599



##########
File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
##########
@@ -50,30 +49,11 @@ int getPort() {
         return port;
     }
 
-    @Override
-    public void open(MetricConfig config) {
-        super.open(config);
-
-        String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT);
-        Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
-
-        while (ports.hasNext()) {
-            int port = ports.next();
-            try {
-                // internally accesses CollectorRegistry.defaultRegistry
-                httpServer = new HTTPServer(port);
-                this.port = port;
-                log.info("Started PrometheusReporter HTTP server on port {}.", 
port);
-                break;
-            } catch (IOException ioe) { // assume port conflict
-                log.debug("Could not start PrometheusReporter HTTP server on 
port {}.", port, ioe);
-            }
-        }
-        if (httpServer == null) {
-            throw new RuntimeException(
-                    "Could not start PrometheusReporter HTTP server on any 
configured port. Ports: "
-                            + portsConfig);
-        }
+    PrometheusReporter(
+        final int port, 
+        @Nullable HTTPServer httpServer) {
+        this.httpServer = Preconditions.checkNotNull(httpServer);

Review comment:
       `@Nullable` to us implies that null is an expected case that should be 
acceptable. Hence, it doesn't make to annotate it as such if we then right away 
enforce that it is in fact not null.

##########
File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
##########
@@ -17,18 +17,58 @@
 
 package org.apache.flink.metrics.prometheus;
 
+import io.prometheus.client.exporter.HTTPServer;
+
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.util.NetUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.util.Iterator;
 import java.util.Properties;
 
 /** {@link MetricReporterFactory} for {@link PrometheusReporter}. */
 @InterceptInstantiationViaReflection(
         reporterClassName = 
"org.apache.flink.metrics.prometheus.PrometheusReporter")
 public class PrometheusReporterFactory implements MetricReporterFactory {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporterFactory.class);
+
+    static final String ARG_PORT = "port";
+    private static final String DEFAULT_PORT = "9249";
+
     @Override
     public PrometheusReporter createMetricReporter(Properties properties) {
-        return new PrometheusReporter();
+        MetricConfig metricConfig = (MetricConfig)properties;
+        String portsConfig = metricConfig.getString(ARG_PORT, DEFAULT_PORT);
+        Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
+        Integer port = null;
+        HTTPServer httpServer = null;
+        while (ports.hasNext()) {
+            port = ports.next();
+            try {
+                // internally accesses CollectorRegistry.defaultRegistry
+                httpServer = new HTTPServer(port);

Review comment:
       let's move this into the constructor, and have it accept the ports 
iterator

##########
File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java
##########
@@ -51,75 +42,21 @@
                 
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory")
 public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter 
implements Scheduled {
 
-    private PushGateway pushGateway;
-    private String jobName;
-    private boolean deleteOnShutdown;
-    private Map<String, String> groupingKey;
-
-    @Override
-    public void open(MetricConfig config) {
-        super.open(config);
-
-        String host = config.getString(HOST.key(), HOST.defaultValue());
-        int port = config.getInteger(PORT.key(), PORT.defaultValue());
-        String configuredJobName = config.getString(JOB_NAME.key(), 
JOB_NAME.defaultValue());
-        boolean randomSuffix =
-                config.getBoolean(
-                        RANDOM_JOB_NAME_SUFFIX.key(), 
RANDOM_JOB_NAME_SUFFIX.defaultValue());
-        deleteOnShutdown =
-                config.getBoolean(DELETE_ON_SHUTDOWN.key(), 
DELETE_ON_SHUTDOWN.defaultValue());
-        groupingKey =
-                parseGroupingKey(config.getString(GROUPING_KEY.key(), 
GROUPING_KEY.defaultValue()));
-
-        if (host == null || host.isEmpty() || port < 1) {
-            throw new IllegalArgumentException(
-                    "Invalid host/port configuration. Host: " + host + " Port: 
" + port);
-        }
-
-        if (randomSuffix) {
-            this.jobName = configuredJobName + new AbstractID();
-        } else {
-            this.jobName = configuredJobName;
-        }
-
-        pushGateway = new PushGateway(host + ':' + port);
-        log.info(
-                "Configured PrometheusPushGatewayReporter with {host:{}, 
port:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, 
groupingKey:{}}",
-                host,
-                port,
-                jobName,
-                randomSuffix,
-                deleteOnShutdown,
-                groupingKey);
-    }
-
-    Map<String, String> parseGroupingKey(final String groupingKeyConfig) {
-        if (!groupingKeyConfig.isEmpty()) {
-            Map<String, String> groupingKey = new HashMap<>();
-            String[] kvs = groupingKeyConfig.split(";");
-            for (String kv : kvs) {
-                int idx = kv.indexOf("=");
-                if (idx < 0) {
-                    log.warn("Invalid prometheusPushGateway groupingKey:{}, 
will be ignored", kv);
-                    continue;
-                }
-
-                String labelKey = kv.substring(0, idx);
-                String labelValue = kv.substring(idx + 1);
-                if (StringUtils.isNullOrWhitespaceOnly(labelKey)
-                        || StringUtils.isNullOrWhitespaceOnly(labelValue)) {
-                    log.warn(
-                            "Invalid groupingKey {labelKey:{}, labelValue:{}} 
must not be empty",
-                            labelKey,
-                            labelValue);
-                    continue;
-                }
-                groupingKey.put(labelKey, labelValue);
-            }
-
-            return groupingKey;
-        }
-        return Collections.emptyMap();
+    private final PushGateway pushGateway;
+    private final String jobName;
+    private final Map<String, String> groupingKey;
+    private final boolean deleteOnShutdown;
+
+    PrometheusPushGatewayReporter(
+        @Nullable PushGateway pushGateway,
+        @Nullable String jobName,

Review comment:
       see my other comment regarding Nullable

##########
File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java
##########
@@ -17,18 +17,102 @@
 
 package org.apache.flink.metrics.prometheus;
 
+import io.prometheus.client.exporter.PushGateway;
+
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.DELETE_ON_SHUTDOWN;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
+
 /** {@link MetricReporterFactory} for {@link PrometheusPushGatewayReporter}. */
 @InterceptInstantiationViaReflection(
         reporterClassName = 
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter")
 public class PrometheusPushGatewayReporterFactory implements 
MetricReporterFactory {
 
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(PrometheusPushGatewayReporterFactory.class);
+
     @Override
     public PrometheusPushGatewayReporter createMetricReporter(Properties 
properties) {
-        return new PrometheusPushGatewayReporter();
+        MetricConfig metricConfig = (MetricConfig)properties;
+
+        String host = metricConfig.getString(HOST.key(), HOST.defaultValue());
+        int port = metricConfig.getInteger(PORT.key(), PORT.defaultValue());
+        String configuredJobName = metricConfig.getString(JOB_NAME.key(), 
JOB_NAME.defaultValue());
+        boolean randomSuffix =
+                metricConfig.getBoolean(
+                        RANDOM_JOB_NAME_SUFFIX.key(), 
RANDOM_JOB_NAME_SUFFIX.defaultValue());
+        boolean deleteOnShutdown =
+                metricConfig.getBoolean(DELETE_ON_SHUTDOWN.key(), 
DELETE_ON_SHUTDOWN.defaultValue());
+        Map<String, String> groupingKey =
+                parseGroupingKey(metricConfig.getString(GROUPING_KEY.key(), 
GROUPING_KEY.defaultValue()));
+
+        if (host == null || host.isEmpty() || port < 1) {
+            throw new IllegalArgumentException(
+                    "Invalid host/port configuration. Host: " + host + " Port: 
" + port);
+        }
+
+        String jobName = configuredJobName;
+        if (randomSuffix) {
+            jobName = configuredJobName + new AbstractID();
+        } 
+
+        PushGateway pushGateway = new PushGateway(host + ':' + port);

Review comment:
       I would move this back into the constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to