hailin0 commented on code in PR #2888:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2888#discussion_r981967555


##########
seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/SeatunnelMetricReporter.java:
##########
@@ -0,0 +1,111 @@
+package org.apache.seatunnel.metrics.flink;
+
+import org.apache.seatunnel.metrics.core.Counter;
+import org.apache.seatunnel.metrics.core.Gauge;
+import org.apache.seatunnel.metrics.core.Histogram;
+import org.apache.seatunnel.metrics.core.Meter;
+import org.apache.seatunnel.metrics.core.MetricInfo;
+import org.apache.seatunnel.metrics.core.SimpleCounter;
+import org.apache.seatunnel.metrics.core.SimpleGauge;
+import org.apache.seatunnel.metrics.core.SimpleHistogram;
+import org.apache.seatunnel.metrics.core.SimpleMeter;
+import org.apache.seatunnel.metrics.core.reporter.MetricReporter;
+import org.apache.seatunnel.metrics.prometheus.PrometheusPushGatewayReporter;
+
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * exports Flink metrics to Seatunnel
+ */
+public class SeatunnelMetricReporter extends AbstractSeatunnelReporter 
implements Scheduled {
+    private final Logger log = 
LoggerFactory.getLogger(SeatunnelMetricReporter.class);
+    private MetricReporter reporter;
+    private String host;
+    private int port;
+    private String jobName;
+    private static final int DEFAULT_PORT = 9091;
+
+    @Override
+    public void open(MetricConfig metricConfig) {
+        MetricConfig config = metricConfig;
+        config.isEmpty();
+        host = config.getString("host", "localhost");
+        port = config.getInteger("port", DEFAULT_PORT);
+        jobName = config.getString("jobName", "flinkJob");
+    }
+
+    @Override
+    public void close() {
+        log.info("StreamMetricReporter close");
+    }
+
+    @Override
+    public void report() {
+        log.info("reporter report");
+        HashMap<Counter, MetricInfo> countersIndex = new HashMap<>();
+        HashMap<Gauge, MetricInfo> gaugesIndex = new HashMap<>();
+        HashMap<Histogram, MetricInfo> histogramsIndex = new HashMap<>();
+        HashMap<Meter, MetricInfo> metersIndex = new HashMap<>();
+
+        HashSet<String> name = new HashSet<>();
+
+        //Convert flink metrics to seatunnel
+        for (Map.Entry<org.apache.flink.metrics.Counter, MetricInfo> metric : 
counters.entrySet()) {
+            //Skip processing on a repeat
+            if (name.contains(metric.getValue().getMetricName())) {
+                continue;
+            }
+            name.add(metric.getValue().getMetricName());
+            countersIndex.put(new SimpleCounter(metric.getKey().getCount()), 
metric.getValue());
+        }
+
+        name.clear();
+        for (Map.Entry<org.apache.flink.metrics.Gauge<?>, MetricInfo> metric : 
gauges.entrySet()) {
+            //Skip processing on a repeat
+            if (name.contains(metric.getValue().getMetricName())) {
+                continue;
+            }
+            name.add(metric.getValue().getMetricName());
+            Object num = metric.getKey().getValue();
+            if (num instanceof Number) {
+                gaugesIndex.put(new SimpleGauge((Number) num), 
metric.getValue());
+            }
+        }
+
+        name.clear();
+        for (Map.Entry<org.apache.flink.metrics.Meter, MetricInfo> metric : 
meters.entrySet()) {
+            //Skip processing on a repeat
+            if (name.contains(metric.getValue().getMetricName())) {
+                continue;
+            }
+            name.add(metric.getValue().getMetricName());
+            metersIndex.put(new SimpleMeter(metric.getKey().getRate(), 
metric.getKey().getCount()), metric.getValue());
+
+        }
+        final double quantile05 = 0.5;
+        final double quantile75 = 0.75;
+        final double quantile95 = 0.95;
+        //todo histogram
+        for (Map.Entry<org.apache.flink.metrics.Histogram, MetricInfo> metric 
: histograms.entrySet()) {
+            org.apache.flink.metrics.Histogram key = metric.getKey();
+            HashMap<Double, Double> quantile = new HashMap<>();
+            quantile.put(quantile05, 
key.getStatistics().getQuantile(quantile05));
+            quantile.put(quantile75, 
key.getStatistics().getQuantile(quantile75));
+            quantile.put(quantile95, 
key.getStatistics().getQuantile(quantile95));
+            histogramsIndex.put(new SimpleHistogram(key.getCount(), 
key.getStatistics().getMin(), key.getStatistics().getMax(), 
key.getStatistics().getStdDev(), key.getStatistics().getMean(), quantile), 
metric.getValue());
+        }
+        //todo handle user config
+        reporter = new PrometheusPushGatewayReporter(jobName, host, port);

Review Comment:
   Moved outside the `SeatunnelMetricReporter#report()` method?



##########
seatunnel-metrics/seatunnel-metrics-spark/src/main/java/org/apache/seatunnel/metrics/spark/SeatunnelMetricSink.scala:
##########
@@ -0,0 +1,219 @@
+package org.apache.seatunnel.metrics.spark
+
+import java.util
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConversions._
+import com.codahale.metrics
+import com.codahale.metrics.{Counter, Histogram, Meter, _}
+import org.apache.seatunnel.metrics.core.{Gauge, _}
+import org.apache.seatunnel.metrics.prometheus.PrometheusPushGatewayReporter
+import org.apache.spark.internal.Logging
+
+object SeatunnelMetricSink {
+  trait SinkConfig extends Serializable {
+    def metricsNamespace: Option[String]
+
+    def sparkAppId: Option[String]
+
+    def sparkAppName: Option[String]
+
+    def executorId: Option[String]
+  }
+}
+
+abstract class SeatunnelMetricSink(
+    property: Properties,
+    registry: MetricRegistry,
+    sinkConfig: SeatunnelMetricSink.SinkConfig) extends Logging {
+
+  import sinkConfig._
+
+  protected class SeatunnelMetricReporter(registry: MetricRegistry, 
metricFilter: MetricFilter)
+    extends ScheduledReporter(
+      registry,
+      "seatunnel-reporter",
+      metricFilter,
+      TimeUnit.SECONDS,
+      TimeUnit.MILLISECONDS) {
+
+    override def report(
+        gauges: util.SortedMap[String, metrics.Gauge[_]],
+        counters: util.SortedMap[String, Counter],
+        histograms: util.SortedMap[String, Histogram],
+        meters: util.SortedMap[String, Meter],
+        timers: util.SortedMap[String, Timer]): Unit = {
+      logInfo(
+        s"metricsNamespace=$metricsNamespace, sparkAppName=$sparkAppName, 
sparkAppId=$sparkAppId, " +
+          s"executorId=$executorId")
+
+      val role: String = (sparkAppId, executorId) match {
+        case (Some(_), Some("driver")) | (Some(_), Some("<driver>")) => 
"driver"
+        case (Some(_), Some(_)) => "executor"
+        case _ => "unknown"
+      }
+
+      val job: String = role match {
+        case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
+        case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
+        case _ => metricsNamespace.getOrElse("unknown")
+      }
+
+      // val instance: String = "instance"
+      val appName: String = sparkAppName.getOrElse("")
+
+      logInfo(s"role=$role, job=$job")
+
+      val dimensionKeys = new util.LinkedList[String]()
+      val dimensionValues = new util.LinkedList[String]()
+      dimensionKeys.add("job_name")
+      dimensionValues.add(appName)
+      dimensionKeys.add("job_id")
+      dimensionValues.add(job)
+      dimensionKeys.add("role")
+      dimensionValues.add(role)
+
+      val countersIndex = new 
util.HashMap[org.apache.seatunnel.metrics.core.Counter, MetricInfo]
+      val gaugesIndex = new 
util.HashMap[org.apache.seatunnel.metrics.core.Gauge[_], MetricInfo]
+      val histogramsIndex =
+        new util.HashMap[org.apache.seatunnel.metrics.core.Histogram, 
MetricInfo]
+      val metersIndex = new 
util.HashMap[org.apache.seatunnel.metrics.core.Meter, MetricInfo]
+
+      for (metricName <- gauges.keySet()) {
+        val metric = gauges.get(metricName)
+        val num = numeric(metric.getValue)
+        if (num.toString != Long.MaxValue.toString) {
+          gaugesIndex.put(
+            new SimpleGauge(num),
+            newMetricInfo(metricName, dimensionKeys, dimensionValues))
+        } else {
+          logError(metricName + " is not a number ")
+        }
+      }
+
+      for (metricName <- counters.keySet()) {
+        val metric = counters.get(metricName)
+        countersIndex.put(
+          new SimpleCounter(metric.getCount),
+          newMetricInfo(metricName, dimensionKeys, dimensionValues))
+      }
+
+      for (metricName <- meters.keySet()) {
+        val metric = meters.get(metricName)
+        metersIndex.put(
+          new SimpleMeter(metric.getMeanRate, metric.getCount),
+          newMetricInfo(metricName, dimensionKeys, dimensionValues))
+      }
+
+      for (metricName <- histograms.keySet()) {
+        val metric = histograms.get(metricName)
+        histogramsIndex.put(
+          new SimpleHistogram(
+            metric.getCount,
+            metric.getSnapshot.getMin,
+            metric.getSnapshot.getMax,
+            metric.getSnapshot.getStdDev,
+            metric.getSnapshot.getMean,
+            new util.HashMap[java.lang.Double, java.lang.Double]() {
+              0.75 -> metric.getSnapshot.get75thPercentile();
+              0.95 -> metric.getSnapshot.get95thPercentile();
+              0.99 -> metric.getSnapshot.get99thPercentile()
+            }),
+          newMetricInfo(metricName, dimensionKeys, dimensionValues))
+      }
+      val reporter = new PrometheusPushGatewayReporter(pollJobName, pollHost, 
pollPort)

Review Comment:
   as above



##########
seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/reporter/MetricReporter.java:
##########
@@ -0,0 +1,22 @@
+package org.apache.seatunnel.metrics.core.reporter;
+
+import org.apache.seatunnel.metrics.core.Counter;
+import org.apache.seatunnel.metrics.core.Gauge;
+import org.apache.seatunnel.metrics.core.Histogram;
+import org.apache.seatunnel.metrics.core.Meter;
+import org.apache.seatunnel.metrics.core.Metric;
+import org.apache.seatunnel.metrics.core.MetricInfo;
+
+import java.util.Map;
+
+/** Reporters are used to export seatunnel {@link Metric Metrics} to an 
external backend. */
+public interface MetricReporter {
+    MetricReporter open();

Review Comment:
   Why return `MetricReporter` object? this is object method



##########
seatunnel-metrics/seatunnel-metrics-prometheus/src/main/java/org/apache/seatunnel/metrics/prometheus/PrometheusPushGatewayReporter.java:
##########
@@ -0,0 +1,235 @@
+package org.apache.seatunnel.metrics.prometheus;
+
+import org.apache.seatunnel.metrics.core.Counter;
+import org.apache.seatunnel.metrics.core.Gauge;
+import org.apache.seatunnel.metrics.core.Histogram;
+import org.apache.seatunnel.metrics.core.Meter;
+import org.apache.seatunnel.metrics.core.Metric;
+import org.apache.seatunnel.metrics.core.MetricInfo;
+import org.apache.seatunnel.metrics.core.reporter.MetricReporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A reporter which outputs measurements to PrometheusPushGateway
+ */
+public class PrometheusPushGatewayReporter implements MetricReporter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+    final URL hostUrl;
+    private final PushGateway pushGateway;
+    private final String jobName;
+    private static final int DEFAULT_PORT = 9091;
+
+    public PrometheusPushGatewayReporter(String jobName, String host, int 
port) {
+        String url = "";
+        if (isNullOrWhitespaceOnly(host) || port < 1) {
+            throw new IllegalArgumentException(
+                    "Invalid host/port configuration. Host: " + host + " Port: 
" + port);
+        } else {
+            url = "http://"; + host + ":" + port;
+        }
+
+        this.jobName = jobName;
+        try {
+            this.hostUrl = new URL(url);
+        } catch (MalformedURLException e) {
+            throw new RuntimeException(e);
+        }
+        this.pushGateway = new PushGateway(hostUrl);
+    }
+
+    @Override
+    public PrometheusPushGatewayReporter open() {
+        //todo Handle user config
+        return new PrometheusPushGatewayReporter("flink_prometheus_job", 
"localhost", DEFAULT_PORT);

Review Comment:
   Why create `PrometheusPushGatewayReporter` object again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to