zentol commented on a change in pull request #15175:
URL: https://github.com/apache/flink/pull/15175#discussion_r594219048
##########
File path:
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterFactory.java
##########
@@ -17,18 +17,101 @@
package org.apache.flink.metrics.prometheus;
+import io.prometheus.client.exporter.PushGateway;
+
+import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.DELETE_ON_SHUTDOWN;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
+
/** {@link MetricReporterFactory} for {@link PrometheusPushGatewayReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName =
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter")
public class PrometheusPushGatewayReporterFactory implements
MetricReporterFactory {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(PrometheusPushGatewayReporterFactory.class);
+
@Override
public PrometheusPushGatewayReporter createMetricReporter(Properties
properties) {
- return new PrometheusPushGatewayReporter();
+ MetricConfig metricConfig = (MetricConfig)properties;
+
+ String host = metricConfig.getString(HOST.key(), HOST.defaultValue());
+ int port = metricConfig.getInteger(PORT.key(), PORT.defaultValue());
+ String configuredJobName = metricConfig.getString(JOB_NAME.key(),
JOB_NAME.defaultValue());
+ boolean randomSuffix =
+ metricConfig.getBoolean(
+ RANDOM_JOB_NAME_SUFFIX.key(),
RANDOM_JOB_NAME_SUFFIX.defaultValue());
+ boolean deleteOnShutdown =
+ metricConfig.getBoolean(DELETE_ON_SHUTDOWN.key(),
DELETE_ON_SHUTDOWN.defaultValue());
+ Map<String, String> groupingKey =
+ parseGroupingKey(metricConfig.getString(GROUPING_KEY.key(),
GROUPING_KEY.defaultValue()));
+
+ if (host == null || host.isEmpty() || port < 1) {
+ throw new IllegalArgumentException(
+ "Invalid host/port configuration. Host: " + host + " Port:
" + port);
+ }
+
+ String jobName = configuredJobName;
+ if (randomSuffix) {
+ jobName = configuredJobName + new AbstractID();
+ }
+
+ LOG.info(
+ "Configured PrometheusPushGatewayReporter with {host:{},
port:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{},
groupingKey:{}}",
+ host,
+ port,
+ jobName,
+ randomSuffix,
+ deleteOnShutdown,
+ groupingKey);
+
+ return new PrometheusPushGatewayReporter(host, port, jobName,
groupingKey, deleteOnShutdown);
+ }
+
+ static Map<String, String> parseGroupingKey(final String
groupingKeyConfig) {
Review comment:
```suggestion
@VisibleForTesting
static Map<String, String> parseGroupingKey(final String
groupingKeyConfig) {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]