omzmarlon opened a new pull request, #15163:
URL: https://github.com/apache/pulsar/pull/15163
### Motivation
Currently Pulsar Functions have a very limited interface for tracking
function metrics. `BaseContext` interface only provides the
`recordMetric(String metricName, double value)` method. Under the hood
`ContextImpl` saves all metric values to a prometheus Summary. It doesn’t allow
the user to track different types of metrics like `Counter` `Gauge`
`Histogram`. User also has no way to manipulate the labels to attach for their
metrics. (Additionally since Prometheus Summary performs all the quantile
calculations on the client side and offers no aggregation support, Summary can
often be a less preferable choice compared to Histogram)
This MR intends to introduce a new way to record metrics for Pulsar
functions such that a user can decide what type of metric to track and what
name, labels the metric should use.
At the API level, a `Metric` type is introduced, which all supported metric
types (implemented `Counter`, `Gauge`, `Histogram`, `Summary`) extends. Custom
user metrics in Pulsar functions will be supplied and returned using this
interface. In the `BaseContext` a new `MetricProvider metricProvider();` method
is defined. The `MetricProvider` returns builders for each type of metric. Each
metric builder allows the user to set useful data such as name, labels and help
messages for the metric. Metric builders also allow users to either supply
their own metric (defined through the `Metric` interface) or use the internal
default implementation. To explain this design, let’s look at an example.
Imagine inside a Pulsar source, with the new metrics API user could define
custom metrics like this:
```java
import org.apache.pulsar.functions.api.metrics.Counter;
import org.apache.pulsar.io.core.Source;
public class DemoSource implements Source<Data> {
Counter counter;
@Override
public void open(Map<String, Object> config, SourceContext
sourceContext) throws Exception {
this.counter = sourceContext.metricProvider()
.registerCounter()
.name(“demo_counter”)
.help(“helper message for counter”)
.labelNames(“label1”, “label2")
.labels(“foo”, “bar”)
.register();
}
@Override
public Record<Person> read() throws Exception {
counter.inc();
// ... other logic ...
}
}
```
In this example, the user directly use an internal Counter implementation
provided by the framework to track counter metrics. In certain situations,
however, a user may need to proxy a metric defined in external systems or
libraries. For example, imagine a Pulsar source implementation that uses some
HTTP client library that internally uses its own counter to track number of
requests sent, but the user of this library still wants to report this metric
through the Pulsar framework. In this case, the user can register the metric
through the Pulsar framework this way:
```java
import org.apache.pulsar.functions.api.metrics.Counter;
import org.apache.pulsar.io.core.Source;
public class DemoSource implements Source<Data> {
Counter counter;
@Override
public void open(Map<String, Object> config, SourceContext
sourceContext) throws Exception {
CounterProxy counterProxy = new CounterProxy(new ClientLibrary());
this.counter = sourceContext.metricProvider()
.registerCounter()
.name(“demo_counter”)
.help(“helper message for counter”)
.labelNames(“label1”, “label2")
.labels(“foo”, “bar”)
.counter(counterProxy) // a custom counter is provided. the
value of the counter will be reported through pulsar framework
.register();
}
@Override
public Record<Person> read() throws Exception {
counter.inc();
// ... other logic ...
}
static class CounterProxy implements Counter {
ClientLibrary client;
CounterProxy(ClientLibrary client) {
this.client = client;
}
@Override
public double get() {
// proxying the counter value from a separate system
return this.client.getCounter().get();
}
// ... override other methods ...
}
}
```
With this new metrics API, registered custom user metrics would be reported
on [`http://localhost:8080/metrics/`](http://localhost:8080/metrics/) like this:
```
# HELP pulsar_source_user_metric_demo_counter helper message for counter”
# TYPE pulsar_source_user_metric_demo_counter counter
pulsar_source_user_metric_demo_counter{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,}
259.0
```
Internally, since Pulsar is already using Prometheus as its underlying
metrics system, registered user custom metrics will be reported through
Prometheus. Metrics collection is either achieved by using existing Prometheus
`io.prometheus.client.Collector` (`Counter` `Gauge`) implementation or
providing our own collector implementation (`Histogram`, `Summary`).
### Modifications
This MR contains lots of changes in the `pulsar-functions` module, but the
idea behind it is pretty simple as described above.
### Verifying this change
New metric APIs and implementations are covered by unit tests.
Also manually verified the system works end to end by:
1. Adding and registering an example `Counter` and `Summary` in
`DataGeneratorSource`
2. Starting up a local standalone Pulsar cluster by: `bin/pulsar standalone`
3. Creating a `DataGeneratorSource` by: `bin/pulsar-admin sources create`
4. After the source is successfully running for a while, hit
[`http://localhost:8080/metrics/`](http://localhost:8080/metrics/) to verify
the registered metrics have been reported correctly:
```
# TYPE pulsar_source_system_exception gauge
# HELP pulsar_source_user_metric_records_counter counter for records read
# TYPE pulsar_source_user_metric_records_counter counter
pulsar_source_user_metric_records_counter{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0”,cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,}
1557.0
# HELP pulsar_source_user_metric_runtime runtime latency
# TYPE pulsar_source_user_metric_runtime summary
pulsar_source_user_metric_runtime{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,quantile=“0.5”,}
0.513479111265846
pulsar_source_user_metric_runtime{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0”,cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,quantile=“0.9",}
0.8995500765220129
pulsar_source_user_metric_runtime{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,quantile=“0.99”,}
0.9922919051399153
pulsar_source_user_metric_runtime{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0”,cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,quantile=“0.999",}
0.9998848111911606
pulsar_source_user_metric_runtime_count{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,}
1557.0
pulsar_source_user_metric_runtime_sum{tenant=“public”,namespace=“public/default”,name=“pulsar-io-datagen”,instance_id=“0",cluster=“standalone”,fqfn=“public/default/pulsar-io-datagen”,label1=“foo”,label2=“bar”,}
792.5118673696929
```
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): no
- The public API: yes
- The schema: no
- The default values of configurations: no
- The wire protocol: no
- The rest endpoints: no
- The admin cli options: no
- Anything that affects deployment: no
For the public API, in the `BaseContext` public interface, a new method
`MetricProvider metricProvider()` is added, and the old `void
recordMetric(String metricName, double value)` is marked as `@Deprecated`.
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [x] `doc-required`
(Your PR needs to update docs and you will update later)
- [ ] `no-need-doc`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-added`
(Docs have been already added)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]