rionmonster commented on a change in pull request #15175:
URL: https://github.com/apache/flink/pull/15175#discussion_r593831263
##########
File path:
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java
##########
@@ -17,18 +17,102 @@
package org.apache.flink.metrics.prometheus;
+import io.prometheus.client.exporter.PushGateway;
+
+import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.DELETE_ON_SHUTDOWN;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
+
/** {@link MetricReporterFactory} for {@link PrometheusPushGatewayReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName =
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter")
public class PrometheusPushGatewayReporterFactory implements
MetricReporterFactory {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(PrometheusPushGatewayReporterFactory.class);
+
@Override
public PrometheusPushGatewayReporter createMetricReporter(Properties
properties) {
- return new PrometheusPushGatewayReporter();
+ MetricConfig metricConfig = (MetricConfig)properties;
+
+ String host = metricConfig.getString(HOST.key(), HOST.defaultValue());
+ int port = metricConfig.getInteger(PORT.key(), PORT.defaultValue());
+ String configuredJobName = metricConfig.getString(JOB_NAME.key(),
JOB_NAME.defaultValue());
+ boolean randomSuffix =
+ metricConfig.getBoolean(
+ RANDOM_JOB_NAME_SUFFIX.key(),
RANDOM_JOB_NAME_SUFFIX.defaultValue());
+ boolean deleteOnShutdown =
+ metricConfig.getBoolean(DELETE_ON_SHUTDOWN.key(),
DELETE_ON_SHUTDOWN.defaultValue());
+ Map<String, String> groupingKey =
+ parseGroupingKey(metricConfig.getString(GROUPING_KEY.key(),
GROUPING_KEY.defaultValue()));
+
+ if (host == null || host.isEmpty() || port < 1) {
+ throw new IllegalArgumentException(
+ "Invalid host/port configuration. Host: " + host + " Port:
" + port);
+ }
+
+ String jobName = configuredJobName;
+ if (randomSuffix) {
+ jobName = configuredJobName + new AbstractID();
+ }
+
+ PushGateway pushGateway = new PushGateway(host + ':' + port);
Review comment:
Done! Moved this (specifically the `PushGateway` initialization) into
the constructor as opposed to the factory, which makes total sense. The
`jobName` resolution is still within the factory, but we could adjust that if
you preferred and pass in a flag to generate the name within the constructor.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]