This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e2500c8790 [HUDI-7083] Adding support for multiple tables with 
Prometheus Reporter (#10068)
9e2500c8790 is described below

commit 9e2500c87903fd77276f4189f68e3cabba220c1c
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Nov 21 09:55:23 2023 -0500

    [HUDI-7083] Adding support for multiple tables with Prometheus Reporter 
(#10068)
    
    * Adding support for multiple tables with Prometheus Reporter
    
    * Fixing closure of http server
    
    * Remove entry from port-collector registry map after stopping http server
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../metrics/prometheus/PrometheusReporter.java     | 77 +++++++++++++++++++---
 1 file changed, 67 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PrometheusReporter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PrometheusReporter.java
index 1394e662626..34fd7a07f65 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PrometheusReporter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PrometheusReporter.java
@@ -18,42 +18,76 @@
 
 package org.apache.hudi.metrics.prometheus;
 
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metrics.MetricsReporter;
 
 import com.codahale.metrics.MetricRegistry;
+import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.dropwizard.DropwizardExports;
+import io.prometheus.client.dropwizard.samplebuilder.DefaultSampleBuilder;
+import io.prometheus.client.dropwizard.samplebuilder.SampleBuilder;
 import io.prometheus.client.exporter.HTTPServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
 
 /**
  * Implementation of Prometheus reporter, which connects to the Http server, 
and get metrics
  * from that server.
  */
 public class PrometheusReporter extends MetricsReporter {
+  private static final Pattern LABEL_PATTERN = Pattern.compile("\\s*,\\s*");
 
   private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusReporter.class);
+  private static final Map<Integer, CollectorRegistry> 
PORT_TO_COLLECTOR_REGISTRY = new HashMap<>();
+  private static final Map<Integer, HTTPServer> PORT_TO_SERVER = new 
HashMap<>();
 
-  private HTTPServer httpServer;
   private final DropwizardExports metricExports;
   private final CollectorRegistry collectorRegistry;
+  private final int serverPort;
 
   public PrometheusReporter(HoodieWriteConfig config, MetricRegistry registry) 
{
-    int serverPort = config.getPrometheusPort();
-    collectorRegistry = new CollectorRegistry();
-    metricExports = new DropwizardExports(registry);
+    this.serverPort = config.getPrometheusPort();
+    if (!PORT_TO_SERVER.containsKey(serverPort) || 
!PORT_TO_COLLECTOR_REGISTRY.containsKey(serverPort)) {
+      startHttpServer(serverPort);
+    }
+    List<String> labelNames = new ArrayList<>();
+    List<String> labelValues = new ArrayList<>();
+    if (StringUtils.nonEmpty(config.getPushGatewayLabels())) {
+      LABEL_PATTERN.splitAsStream(config.getPushGatewayLabels().trim()).map(s 
-> s.split(":", 2))
+          .forEach(parts -> {
+            labelNames.add(parts[0]);
+            labelValues.add(parts[1]);
+          });
+    }
+    metricExports = new DropwizardExports(registry, new 
LabeledSampleBuilder(labelNames, labelValues));
+    this.collectorRegistry = PORT_TO_COLLECTOR_REGISTRY.get(serverPort);
     metricExports.register(collectorRegistry);
-    try {
-      httpServer = new HTTPServer(new InetSocketAddress(serverPort), 
collectorRegistry);
-    } catch (Exception e) {
-      String msg = "Could not start PrometheusReporter HTTP server on port " + 
serverPort;
-      LOG.error(msg, e);
-      throw new HoodieException(msg, e);
+  }
+
+  private static synchronized void startHttpServer(int serverPort) {
+    if (!PORT_TO_COLLECTOR_REGISTRY.containsKey(serverPort)) {
+      PORT_TO_COLLECTOR_REGISTRY.put(serverPort, new CollectorRegistry());
+    }
+    if (!PORT_TO_SERVER.containsKey(serverPort)) {
+      try {
+        HTTPServer server = new HTTPServer(new InetSocketAddress(serverPort), 
PORT_TO_COLLECTOR_REGISTRY.get(serverPort));
+        PORT_TO_SERVER.put(serverPort, server);
+        Runtime.getRuntime().addShutdownHook(new Thread(server::stop));
+      } catch (Exception e) {
+        String msg = "Could not start PrometheusReporter HTTP server on port " 
+ serverPort;
+        LOG.error(msg, e);
+        throw new HoodieException(msg, e);
+      }
     }
   }
 
@@ -68,8 +102,31 @@ public class PrometheusReporter extends MetricsReporter {
   @Override
   public void stop() {
     collectorRegistry.unregister(metricExports);
+    HTTPServer httpServer = PORT_TO_SERVER.remove(serverPort);
     if (httpServer != null) {
       httpServer.stop();
     }
+    PORT_TO_COLLECTOR_REGISTRY.remove(serverPort);
+  }
+
+  private static class LabeledSampleBuilder implements SampleBuilder {
+    private final DefaultSampleBuilder defaultMetricSampleBuilder = new 
DefaultSampleBuilder();
+    private final List<String> labelNames;
+    private final List<String> labelValues;
+
+    public LabeledSampleBuilder(List<String> labelNames, List<String> 
labelValues) {
+      this.labelNames = labelNames;
+      this.labelValues = labelValues;
+    }
+
+    @Override
+    public Collector.MetricFamilySamples.Sample createSample(String 
dropwizardName, String nameSuffix, List<String> additionalLabelNames, 
List<String> additionalLabelValues, double value) {
+      return defaultMetricSampleBuilder.createSample(
+          dropwizardName,
+          nameSuffix,
+          labelNames,
+          labelValues,
+          value);
+    }
   }
 }

Reply via email to