omzmarlon opened a new pull request, #15163:
URL: https://github.com/apache/pulsar/pull/15163

   ### Motivation
   Currently Pulsar Functions have a very limited interface for tracking 
function metrics. `BaseContext` interface only provides the 
`recordMetric(String metricName, double value)` method. Under the hood 
`ContextImpl` saves all metric values to a prometheus Summary. It doesn’t allow 
the user to track different types of metrics like `Counter` `Gauge` 
`Histogram`. User also has no way to manipulate the labels to attach for their 
metrics. (Additionally since Prometheus Summary performs all the quantile 
calculations on the client side and offers no aggregation support, Summary can 
often be a less preferable choice compared to Histogram)
   
   This MR intends to introduce a new way to record metrics for Pulsar 
functions such that a user can decide what type of metric to track and what 
name, labels the metric should use. 
   
   At the API level, a `Metric` type is introduced, which all supported metric 
types (implemented `Counter`, `Gauge`, `Histogram`, `Summary`) extends. Custom 
user metrics in Pulsar functions will be supplied and returned using this 
interface. In the `BaseContext` a new `MetricProvider metricProvider();` method 
is defined. The `MetricProvider` returns builders for each type of metric. Each 
metric builder allows the user to set useful data such as name, labels and help 
messages for the metric. Metric builders also allow users to either supply 
their own metric (defined through the `Metric` interface) or use the internal 
default implementation. To explain this design, let’s look at an example. 
Imagine inside a Pulsar source, with the new metrics API user could define 
custom metrics like this:
   
   ```java
   import org.apache.pulsar.functions.api.metrics.Counter;
   import org.apache.pulsar.io.core.Source;
   
   public class DemoSource implements Source<Data> {
       
       Counter counter; 
      
       @Override
       public void open(Map<String, Object> config, SourceContext 
sourceContext) throws Exception {
           this.counter = sourceContext.metricProvider()
               .registerCounter()
               .name(“demo_counter”)
               .help(“helper message for counter”)
               .labelNames(“label1”, “label2")
               .labels(“foo”, “bar”)
               .register();
       }
   
       @Override
       public Record<Person> read() throws Exception {
           counter.inc();
           // ... other logic ...
       }
   }
   ```
   
   In this example, the user directly use an internal Counter implementation 
provided by the framework to track counter metrics. In certain situations, 
however, a user may need to proxy a metric defined in external systems or 
libraries. For example, imagine a Pulsar source implementation that uses some 
HTTP client library that internally uses its own counter to track number of 
requests sent, but the user of this library still wants to report this metric 
through the Pulsar framework. In this case, the user can register the metric 
through the Pulsar framework this way: 
   
   ```java
   import org.apache.pulsar.functions.api.metrics.Counter;
   import org.apache.pulsar.io.core.Source;
   
   public class DemoSource implements Source<Data> {
       
       Counter counter; 
      
       @Override
       public void open(Map<String, Object> config, SourceContext 
sourceContext) throws Exception {
           CounterProxy counterProxy = new CounterProxy(new ClientLibrary());
           this.counter = sourceContext.metricProvider()
               .registerCounter()
               .name(“demo_counter”)
               .help(“helper message for counter”)
               .labelNames(“label1”, “label2")
               .labels(“foo”, “bar”)
               .counter(counterProxy) // a custom counter is provided. the 
value of the counter will be reported through pulsar framework
               .register();
       }
   
       @Override
       public Record<Person> read() throws Exception {
           counter.inc();
           // ... other logic ...
       }
       
       static class CounterProxy implements Counter {
           ClientLibrary client;
   
           CounterProxy(ClientLibrary client) {
               this.client = client;
           }
   
           @Override
           public double get() {
               // proxying the counter value from a separate system
               return this.client.getCounter().get();
           }
           // ... override other methods ...
       }
   }
   ```
   
   With this new metrics API, registered custom user metrics would be reported 
on [`http://localhost:8080/metrics/`](http://localhost:8080/metrics/) like this:
   
   ```
   # HELP pulsar_source_user_metric_demo_counter helper message for counter”
   # TYPE pulsar_source_user_metric_demo_counter counter
   
pulsar_source_user_metric_demo_counter{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,}
 259.0
   ```
   
   Internally, since Pulsar is already using Prometheus as its underlying 
metrics system, registered user custom metrics will be reported through 
Prometheus. Metrics collection is either achieved by using existing Prometheus 
`io.prometheus.client.Collector` (`Counter` `Gauge`) implementation or 
providing our own collector implementation (`Histogram`, `Summary`). 
   
   
   ### Modifications
   
   This MR contains lots of changes in the `pulsar-functions` module, but the 
idea behind it is pretty simple as described above.
   
   ### Verifying this change
   
   New metric APIs and implementations are covered by unit tests. 
   
   Also manually verified the system works end to end by:
   
   1. Adding and registering an example `Counter` and `Summary` in 
`DataGeneratorSource`
   2. Starting up a local standalone Pulsar cluster by: `bin/pulsar standalone` 
   3. Creating a `DataGeneratorSource` by: `bin/pulsar-admin sources create`
   4. After the source is successfully running for a while, hit 
[`http://localhost:8080/metrics/`](http://localhost:8080/metrics/) to verify 
the registered metrics have been reported correctly:
   
   ```
   # TYPE pulsar_source_system_exception gauge
   # HELP pulsar_source_user_metric_records_counter counter for records read
   # TYPE pulsar_source_user_metric_records_counter counter
   
pulsar_source_user_metric_records_counter{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0”,cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,}
 1557.0
   
   # HELP pulsar_source_user_metric_runtime runtime latency
   # TYPE pulsar_source_user_metric_runtime summary
   
pulsar_source_user_metric_runtime{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,quantile=“0.5”,}
 0.513479111265846
   
pulsar_source_user_metric_runtime{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0”,cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,quantile=“0.9",}
 0.8995500765220129
   
pulsar_source_user_metric_runtime{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,quantile=“0.99”,}
 0.9922919051399153
   
pulsar_source_user_metric_runtime{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0”,cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,quantile=“0.999",}
 0.9998848111911606
   
pulsar_source_user_metric_runtime_count{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,}
 1557.0
   
pulsar_source_user_metric_runtime_sum{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,}
 792.5118673696929
   ```
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
   - Dependencies (does it add or upgrade a dependency): no
   - The public API: yes
   - The schema: no
   - The default values of configurations: no
   - The wire protocol: no
   - The rest endpoints: no
   - The admin cli options: no
   - Anything that affects deployment: no
   
   For the public API, in the `BaseContext` public interface, a new method 
`MetricProvider metricProvider()` is added, and  the old `void 
recordMetric(String metricName, double value)` is marked as `@Deprecated`.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `no-need-doc` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-added`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to