madhavan-narayanan opened a new issue #12858:
URL: https://github.com/apache/pulsar/issues/12858
<!---
Instructions for creating a PIP using this issue template:
1. The author(s) of the proposal will create a GitHub issue ticket using
this template.
(Optionally, it can be helpful to send a note discussing the proposal to
[email protected] mailing list before submitting this GitHub issue.
This discussion can
help developers gauge interest in the proposed changes before
formalizing the proposal.)
2. The author(s) will send a note to the [email protected] mailing list
to start the discussion, using subject prefix `[PIP] xxx`. To determine
the appropriate PIP
number `xxx`, inspect the mailing list
(https://lists.apache.org/[email protected])
for the most recent PIP. Add 1 to that PIP's number to get your PIP's
number.
3. Based on the discussion and feedback, some changes might be applied by
the author(s) to the text of the proposal.
4. Once some consensus is reached, there will be a vote to formally approve
the proposal. The vote will be held on the [email protected]
mailing list. Everyone
is welcome to vote on the proposal, though it will considered to be
binding
only the vote of PMC members. It will be required to have a lazy
majority of
at least 3 binding +1s votes. The vote should stay open for at least 48
hours.
5. When the vote is closed, if the outcome is positive, the state of the
proposal is updated and the Pull Requests associated with this proposal
can
start to get merged into the master branch.
-->
## Motivation
The central messaging platform at Intuit uses Apache Pulsar. The platform
team operates multiple clusters that are used by hundreds of teams across
Intuit. For complete visibility and to better serve customers, the platform
needs the ability to intercept all key broker and ledger events. Also needed is
the ability to transparently control the format of messages that get persisted
in the disk store
## Goal
The scope of this proposal is limited to broker events and operations. This
PIP addresses only the traceability/interceptability at the broker level. To
achieve end-to-end tracing, it is also desirable to intercept events at the
pulsar proxy level (for topic lookup, ownership assignment flow etc), but that
can be handled in a separate PIP.
## API Changes
Would like to propose the following solution
- Extend the existing interface
org.apache.pulsar.broker.intercept.BrokerInterceptor to support the following
granular events with all relevant context information
`void onConnectionCreated(ServerCnx cnx);`
`void producerCreated(ServerCnx cnx, Producer producer, Map<String, String>
metadata);`
`void void consumerCreated(ServerCnx cnx,Consumer consumer, Map<String,
String> metadata);`
`void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs,
long ledgerId,long entryId, Rate rateIn,
Topic.PublishContext publishContext);`
`void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,long
entryId, ByteBuf headersAndPayload);`
`void void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck
ackCmd);`
- Support a new interface
org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor to allow
interception of write and read operations of a managed ledger and modify the
payload. The interface details are given in the next section
- Support dynamic load of managed ledger interceptor implementations through
a broker configuration parameter 'brokerEntryPayloadProcessors' in class
org.apache.pulsar.broker.ServiceConfiguration
`Set<String> brokerEntryPayloadProcessors;`
## Implementation
- interface BrokerInterceptor should be extended to include the additional
callback methods specified in the section above.
- The new callback methods need to be invoked at appropriate places in
pulsar-broker module (in classes ServerCnx, Producer, Consumer)
- A new interface ManagedLedgerPayloadProcessor to be added with the
following content

- A new configuration parameter 'brokerEntryPayloadProcessors' should be
supported in broker.conf. This can be a list of processors
- The existing class
org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl should be
extended to support 2 additional operations.
`processPayloadBeforeLedgerWrite(OpAddEntry op, ByteBuf ledgerData)`
`processPayloadBeforeEntryCache(Bytebuf ledgerData)`
ManagedLedgerInterceptorImpl should internally use
ManagedLedgerPayloadProcessor instance(s) to handle the above payload
processing operations.
- OpAddEntry should use the method
ManagedLedgerInterceptor::processPayloadBeforeLedgerWrite to support processing
of the payload before it gets written to the ledger
- EntryCacheManager (and EntryCacheImpl) should use method
ManagedLedgerInterceptor::processPayloadBeforeEntryCache to process the
payload immediately after it is read from the ledger
## Reject Alternatives
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]