This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 215bd2102 [observability] Introduce prometheus-push metric report
(#2317)
215bd2102 is described below
commit 215bd2102d41470a2f32c3b47a884396d0c57471
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jan 29 10:28:36 2026 +0800
[observability] Introduce prometheus-push metric report (#2317)
---
.../org/apache/fluss/config/ConfigOptions.java | 49 +++
fluss-metrics/fluss-metrics-prometheus/pom.xml | 6 +
...porter.java => AbstractPrometheusReporter.java} | 177 +++++------
.../prometheus/PrometheusPushGatewayReporter.java | 83 +++++
.../PrometheusPushGatewayReporterPlugin.java | 116 +++++++
.../metrics/prometheus/PrometheusReporter.java | 341 +--------------------
.../prometheus/PrometheusReporterPlugin.java | 4 -
.../src/main/resources/META-INF/NOTICE | 3 +-
...che.fluss.metrics.reporter.MetricReporterPlugin | 1 +
.../PrometheusPushGatewayReporterPluginTest.java} | 42 +--
website/docs/maintenance/configuration.md | 19 +-
.../maintenance/observability/metric-reporters.md | 27 +-
12 files changed, 406 insertions(+), 462 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index e7dbec436..6cfcf052a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1833,6 +1833,55 @@ public class ConfigOptions {
+ "the CoordinatorServer) it is advisable
to use a port range "
+ "like 9250-9260.");
+ // ------------------------------------------------------------------------
+ // ConfigOptions for prometheus push gateway reporter
+ // ------------------------------------------------------------------------
+ public static final ConfigOption<String>
METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_HOST_URL =
+ key("metrics.reporter.prometheus-push.host-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The PushGateway server host URL including scheme,
host name, and port.");
+
+ public static final ConfigOption<String>
METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_JOB_NAME =
+ key("metrics.reporter.prometheus-push.job-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The job name under which metrics will be
pushed");
+
+ public static final ConfigOption<Boolean>
+ METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX =
+
key("metrics.reporter.prometheus-push.random-job-name-suffix")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Specifies whether a random suffix should
be appended to the job name. "
+ + "This is useful when multiple
instances of the reporter "
+ + "are running on the same host.");
+
+ public static final ConfigOption<Boolean>
+ METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_DELETE_ON_SHUTDOWN =
+ key("metrics.reporter.prometheus-push.delete-on-shutdown")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Specifies whether to delete metrics from
the PushGateway on shutdown. Fluss will try its best to delete the metrics but
this is not guaranteed.");
+
+ public static final ConfigOption<String>
METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_GROUPING_KEY =
+ key("metrics.reporter.prometheus-push.grouping-key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specifies the grouping key which is the group and
global labels of all metrics. The label name and value are separated by '=',
and labels are separated by ';', e.g., k1=v1;k2=v2.");
+
+ public static final ConfigOption<Duration>
+ METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_PUSH_INTERVAL =
+ key("metrics.reporter.prometheus-push.push-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription(
+ "The interval of pushing metrics to
Prometheus PushGateway.");
+
// ------------------------------------------------------------------------
// ConfigOptions for jmx reporter
// ------------------------------------------------------------------------
diff --git a/fluss-metrics/fluss-metrics-prometheus/pom.xml
b/fluss-metrics/fluss-metrics-prometheus/pom.xml
index 677c8969c..6ca4367c8 100644
--- a/fluss-metrics/fluss-metrics-prometheus/pom.xml
+++ b/fluss-metrics/fluss-metrics-prometheus/pom.xml
@@ -54,6 +54,12 @@
<version>${prometheus.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_pushgateway</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>org.apache.fluss</groupId>
diff --git
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/AbstractPrometheusReporter.java
similarity index 83%
copy from
fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java
copy to
fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/AbstractPrometheusReporter.java
index 1f0925aea..c8d1e955f 100644
---
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java
+++
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/AbstractPrometheusReporter.java
@@ -31,98 +31,84 @@ import org.apache.fluss.metrics.reporter.MetricReporter;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.HTTPServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
-import static org.apache.fluss.utils.Preconditions.checkState;
-
/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
-/** {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
*/
-public class PrometheusReporter implements MetricReporter {
+/**
+ * Base class for Prometheus metric reporters. Contains common logic for
metric registration and
+ * collector management.
+ */
+public abstract class AbstractPrometheusReporter implements MetricReporter {
- private static final Logger LOG =
LoggerFactory.getLogger(PrometheusReporter.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractPrometheusReporter.class);
private static final Pattern UNALLOWED_CHAR_PATTERN =
Pattern.compile("[^a-zA-Z0-9:_]");
- private static final CharacterFilter CHARACTER_FILTER =
PrometheusReporter::replaceInvalidChars;
-
- @VisibleForTesting static final char SCOPE_SEPARATOR = '_';
- @VisibleForTesting static final String SCOPE_PREFIX = "fluss" +
SCOPE_SEPARATOR;
- private final Map<String, AbstractMap.SimpleImmutableEntry<Collector,
Integer>>
- collectorsWithCountByMetricName = new HashMap<>();
+ private static final CharacterFilter CHARACTER_FILTER =
+ AbstractPrometheusReporter::replaceInvalidChars;
- @VisibleForTesting final CollectorRegistry registry = new
CollectorRegistry(true);
+ @VisibleForTesting protected static final char SCOPE_SEPARATOR = '_';
- private HTTPServer httpServer;
- private int port;
+ @VisibleForTesting protected static final String SCOPE_PREFIX = "fluss" +
SCOPE_SEPARATOR;
- int getPort() {
- checkState(httpServer != null, "Server has not been initialized.");
- return port;
- }
-
- PrometheusReporter(Iterator<Integer> ports) {
- while (ports.hasNext()) {
- port = ports.next();
- try {
- httpServer = new HTTPServer(new InetSocketAddress(port),
this.registry);
- LOG.info("Started PrometheusReporter HTTP server on port {}.",
port);
- break;
- } catch (IOException ioe) { // assume port conflict
- LOG.debug("Could not start PrometheusReporter HTTP server on
port {}.", port, ioe);
- }
- }
+ private final Map<String, AbstractMap.SimpleImmutableEntry<Collector,
Integer>>
+ collectorsWithCountByMetricName = new HashMap<>();
- if (httpServer == null) {
- throw new RuntimeException(
- "Could not start PrometheusReporter HTTP server on any
configured port. Ports: "
- + ports);
- }
- }
+ @VisibleForTesting protected final CollectorRegistry registry = new
CollectorRegistry(true);
- static String replaceInvalidChars(final String input) {
+ protected static String replaceInvalidChars(final String input) {
// https://prometheus.io/docs/instrumenting/writing_exporters/
// Only [a-zA-Z0-9:_] are valid in metric names, any other characters
should be sanitized to
// an underscore.
return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
}
+ /**
+ * Configures the reporter. Subclasses should override this to add their
own configuration.
+ *
+ * @param config the configuration
+ */
@Override
public void open(Configuration config) {
- // do nothing now;
+ // default no-op
}
+ /**
+ * Closes the reporter and clears the registry. Subclasses should override
to add their own
+ * cleanup logic and call super.close().
+ */
+ @Override
public void close() {
- if (httpServer != null) {
- httpServer.stop();
- }
registry.clear();
}
+ /**
+ * Called when a new {@link Metric} was added.
+ *
+ * @param metric the metric that was added
+ * @param metricName the name of the metric
+ * @param group the group that contains the metric
+ */
@Override
public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
List<String> dimensionKeys = new LinkedList<>();
List<String> dimensionValues = new LinkedList<>();
for (final Map.Entry<String, String> dimension :
group.getAllVariables().entrySet()) {
- final String key = dimension.getKey();
- dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key));
+
dimensionKeys.add(CHARACTER_FILTER.filterCharacters(dimension.getKey()));
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
}
@@ -158,14 +144,56 @@ public class PrometheusReporter implements MetricReporter
{
}
}
- private static String getScopedName(String metricName, MetricGroup group) {
+ /**
+ * Called when a {@link Metric} was removed.
+ *
+ * @param metric the metric that should be removed
+ * @param metricName the name of the metric
+ * @param group the group that contains the metric
+ */
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName,
MetricGroup group) {
+ List<String> dimensionValues = new LinkedList<>();
+ for (final Map.Entry<String, String> dimension :
group.getAllVariables().entrySet()) {
+
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+ }
+
+ final String scopedMetricName = getScopedName(metricName, group);
+ synchronized (this) {
+ final AbstractMap.SimpleImmutableEntry<Collector, Integer>
collectorWithCount =
+ collectorsWithCountByMetricName.get(scopedMetricName);
+ final Integer count = collectorWithCount.getValue();
+ final Collector collector = collectorWithCount.getKey();
+
+ removeMetric(metric, dimensionValues, collector);
+
+ if (count == 1) {
+ try {
+ registry.unregister(collector);
+ } catch (Exception e) {
+ LOG.warn("There was a problem unregistering metric {}.",
scopedMetricName, e);
+ }
+ collectorsWithCountByMetricName.remove(scopedMetricName);
+ } else {
+ collectorsWithCountByMetricName.put(
+ scopedMetricName,
+ new AbstractMap.SimpleImmutableEntry<>(collector,
count - 1));
+ }
+ }
+ }
+
+ protected static String getScopedName(String metricName, MetricGroup
group) {
return SCOPE_PREFIX
+ getLogicalScope(group)
+ SCOPE_SEPARATOR
+ CHARACTER_FILTER.filterCharacters(metricName);
}
- private Collector createCollector(
+ protected static String getLogicalScope(MetricGroup group) {
+ return group.getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+ }
+
+ protected Collector createCollector(
Metric metric,
List<String> dimensionKeys,
List<String> dimensionValues,
@@ -201,7 +229,7 @@ public class PrometheusReporter implements MetricReporter {
return collector;
}
- private void addMetric(Metric metric, List<String> dimensionValues,
Collector collector) {
+ protected void addMetric(Metric metric, List<String> dimensionValues,
Collector collector) {
switch (metric.getMetricType()) {
case GAUGE:
((io.prometheus.client.Gauge) collector)
@@ -225,7 +253,7 @@ public class PrometheusReporter implements MetricReporter {
}
}
- private void removeMetric(Metric metric, List<String> dimensionValues,
Collector collector) {
+ protected void removeMetric(Metric metric, List<String> dimensionValues,
Collector collector) {
switch (metric.getMetricType()) {
case GAUGE:
case COUNTER:
@@ -242,44 +270,7 @@ public class PrometheusReporter implements MetricReporter {
}
}
- @Override
- public void notifyOfRemovedMetric(Metric metric, String metricName,
MetricGroup group) {
-
- List<String> dimensionValues = new LinkedList<>();
- for (final Map.Entry<String, String> dimension :
group.getAllVariables().entrySet()) {
-
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
- }
-
- final String scopedMetricName = getScopedName(metricName, group);
- synchronized (this) {
- final AbstractMap.SimpleImmutableEntry<Collector, Integer>
collectorWithCount =
- collectorsWithCountByMetricName.get(scopedMetricName);
- final Integer count = collectorWithCount.getValue();
- final Collector collector = collectorWithCount.getKey();
-
- removeMetric(metric, dimensionValues, collector);
-
- if (count == 1) {
- try {
- registry.unregister(collector);
- } catch (Exception e) {
- LOG.warn("There was a problem unregistering metric {}.",
scopedMetricName, e);
- }
- collectorsWithCountByMetricName.remove(scopedMetricName);
- } else {
- collectorsWithCountByMetricName.put(
- scopedMetricName,
- new AbstractMap.SimpleImmutableEntry<>(collector,
count - 1));
- }
- }
- }
-
- private static String getLogicalScope(MetricGroup group) {
- return group.getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
- }
-
- @VisibleForTesting
- io.prometheus.client.Gauge.Child gaugeFrom(Gauge<?> gauge) {
+ protected static io.prometheus.client.Gauge.Child gaugeFrom(Gauge<?>
gauge) {
return new io.prometheus.client.Gauge.Child() {
@Override
public double get() {
@@ -306,7 +297,7 @@ public class PrometheusReporter implements MetricReporter {
};
}
- private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter)
{
+ protected static io.prometheus.client.Gauge.Child gaugeFrom(Counter
counter) {
return new io.prometheus.client.Gauge.Child() {
@Override
public double get() {
@@ -315,7 +306,7 @@ public class PrometheusReporter implements MetricReporter {
};
}
- private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
+ protected static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
return new io.prometheus.client.Gauge.Child() {
@Override
public double get() {
@@ -394,13 +385,13 @@ public class PrometheusReporter implements MetricReporter
{
}
}
- private static List<String> addToList(List<String> list, String element) {
+ protected static List<String> addToList(List<String> list, String element)
{
final List<String> result = new ArrayList<>(list);
result.add(element);
return result;
}
- private static String[] toArray(List<String> list) {
+ protected static String[] toArray(List<String> list) {
return list.toArray(new String[0]);
}
}
diff --git
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporter.java
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporter.java
new file mode 100644
index 000000000..f6102f9cd
--- /dev/null
+++
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metrics.prometheus;
+
+import org.apache.fluss.metrics.Metric;
+import org.apache.fluss.metrics.reporter.ScheduledMetricReporter;
+
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.time.Duration;
+import java.util.Map;
+
+/** {@link ScheduledMetricReporter} that pushes {@link Metric Metrics} to
Prometheus PushGateway. */
+public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter
+ implements ScheduledMetricReporter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+
+ private final PushGateway pushGateway;
+ private final String jobName;
+ private final Map<String, String> groupingKey;
+ private final boolean deleteOnShutdown;
+ private final Duration pushInterval;
+
+ public PrometheusPushGatewayReporter(
+ URL hostUrl,
+ String jobName,
+ Map<String, String> groupingKey,
+ final boolean deleteOnShutdown,
+ Duration pushInterval) {
+ this.pushGateway = new PushGateway(hostUrl);
+ this.jobName = jobName;
+ this.groupingKey = groupingKey;
+ this.deleteOnShutdown = deleteOnShutdown;
+ this.pushInterval = pushInterval;
+ }
+
+ @Override
+ public void close() {
+ if (deleteOnShutdown) {
+ try {
+ pushGateway.delete(jobName, groupingKey);
+ LOG.info("Deleted metrics from PushGateway.");
+ } catch (IOException e) {
+ LOG.warn("Could not delete metrics from PushGateway.", e);
+ }
+ }
+ super.close();
+ }
+
+ @Override
+ public Duration scheduleInterval() {
+ return pushInterval;
+ }
+
+ @Override
+ public void report() {
+ try {
+ pushGateway.push(registry, jobName, groupingKey);
+ } catch (IOException e) {
+ LOG.warn("Could not push metrics to PushGateway.", e);
+ }
+ }
+}
diff --git
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPlugin.java
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPlugin.java
new file mode 100644
index 000000000..5786a002c
--- /dev/null
+++
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPlugin.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metrics.prometheus;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metrics.reporter.MetricReporter;
+import org.apache.fluss.metrics.reporter.MetricReporterPlugin;
+import org.apache.fluss.utils.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static
org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_DELETE_ON_SHUTDOWN;
+import static
org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_GROUPING_KEY;
+import static
org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_HOST_URL;
+import static
org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_JOB_NAME;
+import static
org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_PUSH_INTERVAL;
+import static
org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX;
+
+/** {@link MetricReporterPlugin} for {@link PrometheusPushGatewayReporter}. */
+public class PrometheusPushGatewayReporterPlugin implements
MetricReporterPlugin {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PrometheusPushGatewayReporterPlugin.class);
+
+ private static final String PLUGIN_NAME = "prometheus-push";
+
+ @Override
+ public MetricReporter createMetricReporter(Configuration config) {
+ String hostUrl =
config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_HOST_URL);
+ String configuredJobName =
config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_JOB_NAME);
+ boolean deleteOnShutdown =
+
config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_DELETE_ON_SHUTDOWN);
+ boolean randomSuffix =
+
config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX);
+ Duration pushInterval =
config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_PUSH_INTERVAL);
+ String jobName = configuredJobName;
+ if (randomSuffix) {
+ jobName = configuredJobName + new Random().nextLong();
+ }
+ Map<String, String> groupingKey =
+
parseGroupingKey(config.get(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_GROUPING_KEY));
+ LOG.info(
+ "Configured PrometheusPushGatewayReporter with {hostUrl:{},
jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, groupingKey:{},
pushInterval:{}}",
+ hostUrl,
+ jobName,
+ randomSuffix,
+ deleteOnShutdown,
+ groupingKey,
+ pushInterval);
+ try {
+ return new PrometheusPushGatewayReporter(
+ new URL(hostUrl), jobName, groupingKey, deleteOnShutdown,
pushInterval);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String identifier() {
+ return PLUGIN_NAME;
+ }
+
+ @VisibleForTesting
+ static Map<String, String> parseGroupingKey(final String
groupingKeyConfig) {
+ if (!groupingKeyConfig.isEmpty()) {
+ Map<String, String> groupingKey = new HashMap<>();
+ String[] kvs = groupingKeyConfig.split(";");
+ for (String kv : kvs) {
+ int idx = kv.indexOf("=");
+ if (idx < 0) {
+ LOG.warn("Invalid prometheusPushGateway groupingKey:{},
will be ignored", kv);
+ continue;
+ }
+
+ String labelKey = kv.substring(0, idx);
+ String labelValue = kv.substring(idx + 1);
+ if (StringUtils.isNullOrWhitespaceOnly(labelKey)
+ || StringUtils.isNullOrWhitespaceOnly(labelValue)) {
+ LOG.warn(
+ "Invalid groupingKey {labelKey:{}, labelValue:{}}
must not be empty",
+ labelKey,
+ labelValue);
+ continue;
+ }
+ groupingKey.put(labelKey, labelValue);
+ }
+
+ return groupingKey;
+ }
+
+ return Collections.emptyMap();
+ }
+}
diff --git
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java
index 1f0925aea..358330b6e 100644
---
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java
+++
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporter.java
@@ -17,59 +17,23 @@
package org.apache.fluss.metrics.prometheus;
-import org.apache.fluss.annotation.VisibleForTesting;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.metrics.CharacterFilter;
-import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.Gauge;
-import org.apache.fluss.metrics.Histogram;
-import org.apache.fluss.metrics.HistogramStatistics;
-import org.apache.fluss.metrics.Meter;
import org.apache.fluss.metrics.Metric;
-import org.apache.fluss.metrics.groups.MetricGroup;
-import org.apache.fluss.metrics.reporter.MetricReporter;
-import io.prometheus.client.Collector;
-import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
import static org.apache.fluss.utils.Preconditions.checkState;
-/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
- * additional information regarding copyright ownership. */
-
-/** {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
*/
-public class PrometheusReporter implements MetricReporter {
+/** {@link PrometheusReporter} that exports {@link Metric} via Prometheus HTTP
server. */
+public class PrometheusReporter extends AbstractPrometheusReporter {
private static final Logger LOG =
LoggerFactory.getLogger(PrometheusReporter.class);
- private static final Pattern UNALLOWED_CHAR_PATTERN =
Pattern.compile("[^a-zA-Z0-9:_]");
- private static final CharacterFilter CHARACTER_FILTER =
PrometheusReporter::replaceInvalidChars;
-
- @VisibleForTesting static final char SCOPE_SEPARATOR = '_';
- @VisibleForTesting static final String SCOPE_PREFIX = "fluss" +
SCOPE_SEPARATOR;
-
- private final Map<String, AbstractMap.SimpleImmutableEntry<Collector,
Integer>>
- collectorsWithCountByMetricName = new HashMap<>();
-
- @VisibleForTesting final CollectorRegistry registry = new
CollectorRegistry(true);
-
private HTTPServer httpServer;
private int port;
@@ -97,310 +61,11 @@ public class PrometheusReporter implements MetricReporter {
}
}
- static String replaceInvalidChars(final String input) {
- // https://prometheus.io/docs/instrumenting/writing_exporters/
- // Only [a-zA-Z0-9:_] are valid in metric names, any other characters
should be sanitized to
- // an underscore.
- return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
- }
-
@Override
- public void open(Configuration config) {
- // do nothing now;
- }
-
public void close() {
if (httpServer != null) {
httpServer.stop();
}
- registry.clear();
- }
-
- @Override
- public void notifyOfAddedMetric(Metric metric, String metricName,
MetricGroup group) {
- List<String> dimensionKeys = new LinkedList<>();
- List<String> dimensionValues = new LinkedList<>();
- for (final Map.Entry<String, String> dimension :
group.getAllVariables().entrySet()) {
- final String key = dimension.getKey();
- dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key));
-
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
- }
-
- final String scopedMetricName = getScopedName(metricName, group);
- final String helpString = metricName + " (scope: " +
getLogicalScope(group) + ")";
-
- final Collector collector;
- Integer count = 0;
-
- synchronized (this) {
- if (collectorsWithCountByMetricName.containsKey(scopedMetricName))
{
- final AbstractMap.SimpleImmutableEntry<Collector, Integer>
collectorWithCount =
- collectorsWithCountByMetricName.get(scopedMetricName);
- collector = collectorWithCount.getKey();
- count = collectorWithCount.getValue();
- } else {
- collector =
- createCollector(
- metric,
- dimensionKeys,
- dimensionValues,
- scopedMetricName,
- helpString);
- try {
- collector.register(registry);
- } catch (Exception e) {
- LOG.warn("There was a problem registering metric {}.",
metricName, e);
- }
- }
- addMetric(metric, dimensionValues, collector);
- collectorsWithCountByMetricName.put(
- scopedMetricName, new
AbstractMap.SimpleImmutableEntry<>(collector, count + 1));
- }
- }
-
- private static String getScopedName(String metricName, MetricGroup group) {
- return SCOPE_PREFIX
- + getLogicalScope(group)
- + SCOPE_SEPARATOR
- + CHARACTER_FILTER.filterCharacters(metricName);
- }
-
- private Collector createCollector(
- Metric metric,
- List<String> dimensionKeys,
- List<String> dimensionValues,
- String scopedMetricName,
- String helpString) {
- Collector collector;
- switch (metric.getMetricType()) {
- case GAUGE:
- case COUNTER:
- case METER:
- collector =
- io.prometheus.client.Gauge.build()
- .name(scopedMetricName)
- .help(helpString)
- .labelNames(toArray(dimensionKeys))
- .create();
- break;
- case HISTOGRAM:
- collector =
- new HistogramSummaryProxy(
- (Histogram) metric,
- scopedMetricName,
- helpString,
- dimensionKeys,
- dimensionValues);
- break;
- default:
- LOG.warn(
- "Cannot create collector for unknown metric type: {}.
This indicates that the metric type is not supported by this reporter.",
- metric.getClass().getName());
- collector = null;
- }
- return collector;
- }
-
- private void addMetric(Metric metric, List<String> dimensionValues,
Collector collector) {
- switch (metric.getMetricType()) {
- case GAUGE:
- ((io.prometheus.client.Gauge) collector)
- .setChild(gaugeFrom((Gauge<?>) metric),
toArray(dimensionValues));
- break;
- case COUNTER:
- ((io.prometheus.client.Gauge) collector)
- .setChild(gaugeFrom((Counter) metric),
toArray(dimensionValues));
- break;
- case METER:
- ((io.prometheus.client.Gauge) collector)
- .setChild(gaugeFrom((Meter) metric),
toArray(dimensionValues));
- break;
- case HISTOGRAM:
- ((HistogramSummaryProxy) collector).addChild((Histogram)
metric, dimensionValues);
- break;
- default:
- LOG.warn(
- "Cannot add unknown metric type: {}. This indicates
that the metric type is not supported by this reporter.",
- metric.getClass().getName());
- }
- }
-
- private void removeMetric(Metric metric, List<String> dimensionValues,
Collector collector) {
- switch (metric.getMetricType()) {
- case GAUGE:
- case COUNTER:
- case METER:
- ((io.prometheus.client.Gauge)
collector).remove(toArray(dimensionValues));
- break;
- case HISTOGRAM:
- ((HistogramSummaryProxy) collector).remove(dimensionValues);
- break;
- default:
- LOG.warn(
- "Cannot remove unknown metric type: {}. This indicates
that the metric type is not supported by this reporter.",
- metric.getClass().getName());
- }
- }
-
- @Override
- public void notifyOfRemovedMetric(Metric metric, String metricName,
MetricGroup group) {
-
- List<String> dimensionValues = new LinkedList<>();
- for (final Map.Entry<String, String> dimension :
group.getAllVariables().entrySet()) {
-
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
- }
-
- final String scopedMetricName = getScopedName(metricName, group);
- synchronized (this) {
- final AbstractMap.SimpleImmutableEntry<Collector, Integer>
collectorWithCount =
- collectorsWithCountByMetricName.get(scopedMetricName);
- final Integer count = collectorWithCount.getValue();
- final Collector collector = collectorWithCount.getKey();
-
- removeMetric(metric, dimensionValues, collector);
-
- if (count == 1) {
- try {
- registry.unregister(collector);
- } catch (Exception e) {
- LOG.warn("There was a problem unregistering metric {}.",
scopedMetricName, e);
- }
- collectorsWithCountByMetricName.remove(scopedMetricName);
- } else {
- collectorsWithCountByMetricName.put(
- scopedMetricName,
- new AbstractMap.SimpleImmutableEntry<>(collector,
count - 1));
- }
- }
- }
-
- private static String getLogicalScope(MetricGroup group) {
- return group.getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
- }
-
- @VisibleForTesting
- io.prometheus.client.Gauge.Child gaugeFrom(Gauge<?> gauge) {
- return new io.prometheus.client.Gauge.Child() {
- @Override
- public double get() {
- final Object value = gauge.getValue();
- if (value == null) {
- LOG.debug("Gauge {} is null-valued, defaulting to 0.",
gauge);
- return 0;
- }
- if (value instanceof Double) {
- return (double) value;
- }
- if (value instanceof Number) {
- return ((Number) value).doubleValue();
- }
- if (value instanceof Boolean) {
- return ((Boolean) value) ? 1 : 0;
- }
- LOG.debug(
- "Invalid type for Gauge {}: {}, only number types and
booleans are supported by this reporter.",
- gauge,
- value.getClass().getName());
- return 0;
- }
- };
- }
-
- private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter)
{
- return new io.prometheus.client.Gauge.Child() {
- @Override
- public double get() {
- return (double) counter.getCount();
- }
- };
- }
-
- private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
- return new io.prometheus.client.Gauge.Child() {
- @Override
- public double get() {
- return meter.getRate();
- }
- };
- }
-
- @VisibleForTesting
- static class HistogramSummaryProxy extends Collector {
- static final List<Double> QUANTILES = Arrays.asList(.5, .75, .95, .98,
.99, .999);
-
- private final String metricName;
- private final String helpString;
- private final List<String> labelNamesWithQuantile;
-
- private final Map<List<String>, Histogram> histogramsByLabelValues =
new HashMap<>();
-
- HistogramSummaryProxy(
- final Histogram histogram,
- final String metricName,
- final String helpString,
- final List<String> labelNames,
- final List<String> labelValues) {
- this.metricName = metricName;
- this.helpString = helpString;
- this.labelNamesWithQuantile = addToList(labelNames, "quantile");
- histogramsByLabelValues.put(labelValues, histogram);
- }
-
- @Override
- public List<MetricFamilySamples> collect() {
- // We cannot use SummaryMetricFamily because it is impossible to
get a sum of all values
- // (at least for Dropwizard histograms,
- // whose snapshot's values array only holds a sample of recent
values).
-
- List<MetricFamilySamples.Sample> samples = new LinkedList<>();
- for (Map.Entry<List<String>, Histogram> labelValuesToHistogram :
- histogramsByLabelValues.entrySet()) {
- addSamples(
- labelValuesToHistogram.getKey(),
- labelValuesToHistogram.getValue(),
- samples);
- }
- return Collections.singletonList(
- new MetricFamilySamples(metricName, Type.SUMMARY,
helpString, samples));
- }
-
- void addChild(final Histogram histogram, final List<String>
labelValues) {
- histogramsByLabelValues.put(labelValues, histogram);
- }
-
- void remove(final List<String> labelValues) {
- histogramsByLabelValues.remove(labelValues);
- }
-
- private void addSamples(
- final List<String> labelValues,
- final Histogram histogram,
- final List<MetricFamilySamples.Sample> samples) {
- samples.add(
- new MetricFamilySamples.Sample(
- metricName + "_count",
- labelNamesWithQuantile.subList(0,
labelNamesWithQuantile.size() - 1),
- labelValues,
- histogram.getCount()));
- final HistogramStatistics statistics = histogram.getStatistics();
- for (final Double quantile : QUANTILES) {
- samples.add(
- new MetricFamilySamples.Sample(
- metricName,
- labelNamesWithQuantile,
- addToList(labelValues, quantile.toString()),
- statistics.getQuantile(quantile)));
- }
- }
- }
-
- private static List<String> addToList(List<String> list, String element) {
- final List<String> result = new ArrayList<>(list);
- result.add(element);
- return result;
- }
-
- private static String[] toArray(List<String> list) {
- return list.toArray(new String[0]);
+ super.close();
}
}
diff --git
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java
index 217444948..7e133e1aa 100644
---
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java
+++
b/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java
@@ -25,10 +25,6 @@ import org.apache.fluss.utils.NetUtils;
import java.util.Iterator;
-/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
- * additional information regarding copyright ownership. */
-
/** {@link MetricReporterPlugin} for {@link PrometheusReporter}. */
public class PrometheusReporterPlugin implements MetricReporterPlugin {
diff --git
a/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/NOTICE
b/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/NOTICE
index 31394e5f2..4900c25d1 100644
--- a/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/NOTICE
+++ b/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/NOTICE
@@ -8,4 +8,5 @@ This project bundles the following dependencies under the
Apache Software Licens
- io.prometheus:simpleclient:0.8.1
- io.prometheus:simpleclient_common:0.8.1
-- io.prometheus:simpleclient_httpserver:0.8.1
\ No newline at end of file
+- io.prometheus:simpleclient_httpserver:0.8.1
+- io.prometheus:simpleclient_pushgateway:0.8.1
\ No newline at end of file
diff --git
a/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin
b/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin
index 806179107..af91cddb7 100644
---
a/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin
+++
b/fluss-metrics/fluss-metrics-prometheus/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin
@@ -18,3 +18,4 @@
org.apache.fluss.metrics.prometheus.PrometheusReporterPlugin
+org.apache.fluss.metrics.prometheus.PrometheusPushGatewayReporterPlugin
diff --git
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java
b/fluss-metrics/fluss-metrics-prometheus/src/test/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPluginTest.java
similarity index 53%
copy from
fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java
copy to
fluss-metrics/fluss-metrics-prometheus/src/test/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPluginTest.java
index 217444948..7dba3718e 100644
---
a/fluss-metrics/fluss-metrics-prometheus/src/main/java/org/apache/fluss/metrics/prometheus/PrometheusReporterPlugin.java
+++
b/fluss-metrics/fluss-metrics-prometheus/src/test/java/org/apache/fluss/metrics/prometheus/PrometheusPushGatewayReporterPluginTest.java
@@ -17,33 +17,37 @@
package org.apache.fluss.metrics.prometheus;
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.metrics.reporter.MetricReporter;
-import org.apache.fluss.metrics.reporter.MetricReporterPlugin;
-import org.apache.fluss.utils.NetUtils;
+import org.junit.jupiter.api.Test;
-import java.util.Iterator;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
-/** {@link MetricReporterPlugin} for {@link PrometheusReporter}. */
-public class PrometheusReporterPlugin implements MetricReporterPlugin {
-
- private static final String PLUGIN_NAME = "prometheus";
+/** Test for {@link PrometheusPushGatewayReporterPlugin}. */
+public class PrometheusPushGatewayReporterPluginTest {
- @Override
- public MetricReporter createMetricReporter(Configuration configuration) {
- String portsConfig =
-
configuration.getString(ConfigOptions.METRICS_REPORTER_PROMETHEUS_PORT);
- Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);
- return new PrometheusReporter(ports);
+ @Test
+ void testParseGroupingKey() {
+ Map<String, String> groupingKey =
+
PrometheusPushGatewayReporterPlugin.parseGroupingKey("k1=v1;k2=v2");
+ assertThat(groupingKey).containsEntry("k1", "v1");
+ assertThat(groupingKey).containsEntry("k2", "v2");
}
- @Override
- public String identifier() {
- return PLUGIN_NAME;
+ @Test
+ void testParseIncompleteGroupingKey() {
+ Map<String, String> groupingKey =
+ PrometheusPushGatewayReporterPlugin.parseGroupingKey("k1=");
+ assertThat(groupingKey).isEmpty();
+
+ groupingKey =
PrometheusPushGatewayReporterPlugin.parseGroupingKey("=v1");
+ assertThat(groupingKey).isEmpty();
+
+ groupingKey =
PrometheusPushGatewayReporterPlugin.parseGroupingKey("k1");
+ assertThat(groupingKey).isEmpty();
}
}
diff --git a/website/docs/maintenance/configuration.md
b/website/docs/maintenance/configuration.md
index 93630a969..57f7017c8 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -168,12 +168,19 @@ during the Fluss cluster working.
## Metrics
-| Option | Type | Default | Description
|
-|----------------------------------|--------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| metrics.reporters | List | (None) | An optional list of
reporter names. If configured, only reporters whose name matches in the list
will be started
|
-| metrics.reporter.prometheus.port | String | 9249 | The port the
Prometheus reporter listens on. In order to be able to run several instances of
the reporter on one host (e.g. when one TabletServer is colocated with the
CoordinatorServer) it is advisable to use a port range like 9250-9260.
|
-| metrics.reporter.jmx.port | String | (None) | The port for the
JMXServer that JMX clients can connect to. If not set, the JMXServer won't
start. In order to be able to run several instances of the reporter on one host
(e.g. when one TabletServer is colocated with the CoordinatorServer) it is
advisable to use a port range like 9990-9999. |
-
+More metrics example could be found in [Observability - Metric
Reporters](observability/metric-reporters.md).
+
+| Option | Type | Default
| Description
|
+|---------------------------------------------------------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| metrics.reporters | List | (None)
| An optional list of reporter names. If configured, only reporters whose
name matches in the list will be started
|
+| metrics.reporter.prometheus.port | String | 9249
| The port the Prometheus reporter listens on. In order to be able to run
several instances of the reporter on one host (e.g. when one TabletServer is
colocated with the CoordinatorServer) it is advisable to use a port range like
9250-9260. |
+| metrics.reporter.jmx.port | String | (None)
| The port for the JMXServer that JMX clients can connect to. If not set,
the JMXServer won't start. In order to be able to run several instances of the
reporter on one host (e.g. when one TabletServer is colocated with the
CoordinatorServer) it is advisable to use a port range like 9990-9999. |
+| metrics.reporter.prometheus-push.host-url | String | (None)
| The PushGateway server host URL including scheme, host name, and port
|
+| metrics.reporter.prometheus-push.job-name | String | (None)
| The job name under which metrics will be pushed
|
+| metrics.reporter.prometheus-push.push-interval | String | 10
seconds | The interval of pushing metrics to Prometheus PushGateway, defaults
to 10 SECONDS.
|
+| metrics.reporter.prometheus-push.random-job-name-suffix | Boolean | true
| Specifies whether a random suffix should be appended to the job name,
defaults to true. This is useful when multiple instances of the reporter are
running on the same host.
|
+| metrics.reporter.prometheus-push.delete-on-shutdown | Boolean | true
| Specifies whether to delete metrics from the PushGateway on shutdown,
defaults to true. Fluss will try its best to delete the metrics but this is not
guaranteed.
|
+| metrics.reporter.prometheus-push.grouping-key | String | (None)
| Specifies the grouping key which is the group and global labels of all
metrics. The label name and value are separated by '=', and labels are
separated by ';', e.g., k1=v1;k2=v2.
|
## Lakehouse
| Option | Type | Default | Description
|
diff --git a/website/docs/maintenance/observability/metric-reporters.md
b/website/docs/maintenance/observability/metric-reporters.md
index 48f94770f..43ce9473b 100644
--- a/website/docs/maintenance/observability/metric-reporters.md
+++ b/website/docs/maintenance/observability/metric-reporters.md
@@ -79,4 +79,29 @@ Fluss metric types are mapped to Prometheus metric types as
follows:
| Counter | Gauge |Prometheus counters cannot be decremented.|
| Gauge | Gauge |Only numbers and booleans are supported. |
| Histogram | Summary |Quantiles .5, .75, .95, .98, .99 and .999 |
-| Meter | Gauge |The gauge exports the meter's rate. |
+| Meter | Gauge |The gauge exports the meter's rate.
+
+### PrometheusPushGateway
+
+Type: push
+
+Parameters:
+
+- `metrics.reporter.prometheus-push.host-url` - The PushGateway server host
URL including scheme, host name, and port.
+- `metrics.reporter.prometheus-push.job-name` - The job name under which
metrics will be pushed.
+- `metrics.reporter.prometheus-push.push-interval` - (Optional) The interval
of pushing metrics to Prometheus PushGateway, defaults to 10 SECONDS.
+- `metrics.reporter.prometheus-push.random-job-name-suffix` - (Optional)
Specifies whether a random suffix should be appended to the job name, defaults
to true. This is useful when multiple instances of the reporter are running on
the same host.
+- `metrics.reporter.prometheus-push.delete-on-shutdown` - (Optional) Specifies
whether to delete metrics from the PushGateway on shutdown, defaults to true.
Fluss will try its best to delete the metrics but this is not guaranteed.
+- `metrics.reporter.prometheus-push.grouping-key` - Specifies the grouping key
which is the group and global labels of all metrics. The label name and value
are separated by `=`, and labels are separated by `;`, e.g., `k1=v1;k2=v2`.
+
+Example configuration:
+
+```yaml
+metrics.reporters: prometheus-push
+metrics.reporter.prometheus-push.host-url: http://localhost:9091
+metrics.reporter.prometheus-push.job-name: fluss-tablet-server
+metrics.reporter.prometheus-push.push-interval: 10 SECONDS
+metrics.reporter.prometheus-push.random-job-name-suffix: true
+metrics.reporter.prometheus-push.delete-on-shutdown: true
+metrics.reporter.prometheus-push.grouping-key:
instance=instance01;cluster=clusterA
+```