[
https://issues.apache.org/jira/browse/FLINK-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16009649#comment-16009649
]
ASF GitHub Bot commented on FLINK-6221:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3833#discussion_r116376926
--- Diff:
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static com.google.common.collect.Iterables.toArray;
+
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+ private static final Logger log =
LoggerFactory.getLogger(PrometheusReporter.class);
+
+ static final String ARG_PORT = "port";
+ private static final int DEFAULT_PORT = 9249;
+
+ private static final Pattern UNALLOWED_CHAR_PATTERN =
Pattern.compile("[^a-zA-Z0-9:_]");
+ private static final CharacterFilter CHARACTER_FILTER = new
CharacterFilter() {
+ @Override
+ public String filterCharacters(String input) {
+ return replaceInvalidChars(input);
+ }
+ };
+
+ private static final char SCOPE_SEPARATOR = '_';
+
+ private PrometheusEndpoint prometheusEndpoint;
+ private Map<String, Collector> collectorsByMetricName = new HashMap<>();
+
+ @VisibleForTesting
+ static String replaceInvalidChars(final String input) {
+ // https://prometheus.io/docs/instrumenting/writing_exporters/
+ // Only [a-zA-Z0-9:_] are valid in metric names, any other
characters should be sanitized to an underscore.
+ return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+ }
+
+ @Override
+ public void open(MetricConfig config) {
+ int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+ log.info("Using port {}.", port);
+ prometheusEndpoint = new PrometheusEndpoint(port);
+ try {
+ prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT,
true);
+ } catch (IOException e) {
+ log.error("Could not start PrometheusEndpoint on port "
+ port, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ prometheusEndpoint.stop();
+ CollectorRegistry.defaultRegistry.clear();
+ }
+
+ @Override
+ public void notifyOfAddedMetric(final Metric metric, final String
metricName, final MetricGroup group) {
+ final String scope =
((FrontMetricGroup<AbstractMetricGroup<?>>)
group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+ List<String> dimensionKeys = new LinkedList<>();
+ List<String> dimensionValues = new LinkedList<>();
+ for (final Map.Entry<String, String> dimension :
group.getAllVariables().entrySet()) {
+
dimensionKeys.add(CHARACTER_FILTER.filterCharacters(CharMatcher.anyOf("<>").trimFrom(dimension.getKey())));
+
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+ }
+
+ final String validMetricName = scope + SCOPE_SEPARATOR +
CHARACTER_FILTER.filterCharacters(metricName);
+ final String metricIdentifier =
group.getMetricIdentifier(metricName);
+ final Collector collector;
+ if (metric instanceof Gauge) {
+ collector = createGauge((Gauge) metric,
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ } else if (metric instanceof Counter) {
+ collector = createGauge((Counter) metric,
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ } else if (metric instanceof Histogram) {
+ collector = createSummary((Histogram) metric,
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ } else if (metric instanceof Meter) {
+ collector = createCounter((Meter) metric,
validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ } else {
+ log.error("Cannot add unknown metric type: {}. This
indicates that the metric type is not supported by this reporter.",
--- End diff --
That should be done in a seaprate issue.
> Add Prometheus support to metrics
> ---------------------------------
>
> Key: FLINK-6221
> URL: https://issues.apache.org/jira/browse/FLINK-6221
> Project: Flink
> Issue Type: Improvement
> Components: Metrics
> Affects Versions: 1.2.0
> Reporter: Joshua Griffith
> Assignee: Maximilian Bode
> Priority: Minor
>
> [Prometheus|https://prometheus.io/] is becoming popular for metrics and
> alerting. It's possible to use
> [statsd-exporter|https://github.com/prometheus/statsd_exporter] to load Flink
> metrics into Prometheus but it would be far easier if Flink supported
> Promethus as a metrics reporter. A [dropwizard
> client|https://github.com/prometheus/client_java/tree/master/simpleclient_dropwizard]
> exists that could be integrated into the existing metrics system.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)