zentol commented on a change in pull request #15175:
URL: https://github.com/apache/flink/pull/15175#discussion_r594219048



##########
File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java
##########
@@ -17,18 +17,101 @@
 
 package org.apache.flink.metrics.prometheus;
 
+import io.prometheus.client.exporter.PushGateway;
+
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
 import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.DELETE_ON_SHUTDOWN;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
+
 /** {@link MetricReporterFactory} for {@link PrometheusPushGatewayReporter}. */
 @InterceptInstantiationViaReflection(
         reporterClassName = 
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter")
 public class PrometheusPushGatewayReporterFactory implements 
MetricReporterFactory {
 
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(PrometheusPushGatewayReporterFactory.class);
+
     @Override
     public PrometheusPushGatewayReporter createMetricReporter(Properties 
properties) {
-        return new PrometheusPushGatewayReporter();
+        MetricConfig metricConfig = (MetricConfig)properties;
+
+        String host = metricConfig.getString(HOST.key(), HOST.defaultValue());
+        int port = metricConfig.getInteger(PORT.key(), PORT.defaultValue());
+        String configuredJobName = metricConfig.getString(JOB_NAME.key(), 
JOB_NAME.defaultValue());
+        boolean randomSuffix =
+                metricConfig.getBoolean(
+                        RANDOM_JOB_NAME_SUFFIX.key(), 
RANDOM_JOB_NAME_SUFFIX.defaultValue());
+        boolean deleteOnShutdown =
+                metricConfig.getBoolean(DELETE_ON_SHUTDOWN.key(), 
DELETE_ON_SHUTDOWN.defaultValue());
+        Map<String, String> groupingKey =
+                parseGroupingKey(metricConfig.getString(GROUPING_KEY.key(), 
GROUPING_KEY.defaultValue()));
+
+        if (host == null || host.isEmpty() || port < 1) {
+            throw new IllegalArgumentException(
+                    "Invalid host/port configuration. Host: " + host + " Port: 
" + port);
+        }
+
+        String jobName = configuredJobName;
+        if (randomSuffix) {
+            jobName = configuredJobName + new AbstractID();
+        } 
+
+        LOG.info(
+                "Configured PrometheusPushGatewayReporter with {host:{}, 
port:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, 
groupingKey:{}}",
+                host,
+                port,
+                jobName,
+                randomSuffix,
+                deleteOnShutdown,
+                groupingKey);
+
+        return new PrometheusPushGatewayReporter(host, port, jobName, 
groupingKey, deleteOnShutdown);
+    }
+
+    static Map<String, String> parseGroupingKey(final String 
groupingKeyConfig) {

Review comment:
       ```suggestion
   
       @VisibleForTesting
       static Map<String, String> parseGroupingKey(final String 
groupingKeyConfig) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to