madhavan-narayanan opened a new issue #12858:
URL: https://github.com/apache/pulsar/issues/12858


   <!---
   Instructions for creating a PIP using this issue template:
   
    1. The author(s) of the proposal will create a GitHub issue ticket using 
this template.
       (Optionally, it can be helpful to send a note discussing the proposal to
       [email protected] mailing list before submitting this GitHub issue. 
This discussion can
       help developers gauge interest in the proposed changes before 
formalizing the proposal.)
    2. The author(s) will send a note to the [email protected] mailing list
       to start the discussion, using subject prefix `[PIP] xxx`. To determine 
the appropriate PIP
       number `xxx`, inspect the mailing list 
(https://lists.apache.org/[email protected])
       for the most recent PIP. Add 1 to that PIP's number to get your PIP's 
number.
    3. Based on the discussion and feedback, some changes might be applied by
       the author(s) to the text of the proposal.
    4. Once some consensus is reached, there will be a vote to formally approve
       the proposal. The vote will be held on the [email protected] 
mailing list. Everyone
       is welcome to vote on the proposal, though it will considered to be 
binding
       only the vote of PMC members. It will be required to have a lazy 
majority of
       at least 3 binding +1s votes. The vote should stay open for at least 48 
hours.
    5. When the vote is closed, if the outcome is positive, the state of the
       proposal is updated and the Pull Requests associated with this proposal 
can
       start to get merged into the master branch.
   
   -->
   
   ## Motivation
   
   The central messaging platform at Intuit uses Apache Pulsar. The platform 
team operates multiple clusters that are used by hundreds of teams across 
Intuit. For complete visibility and to better serve customers, the platform 
needs the ability to intercept all key broker and ledger events. Also needed is 
the ability to transparently control the format of messages that get persisted 
in the disk store
   
   
   ## Goal
   
   The scope of this proposal is limited to broker events and operations. This 
PIP addresses only the traceability/interceptability at the broker level. To 
achieve end-to-end tracing, it is also desirable to intercept events at the 
pulsar proxy level (for topic lookup, ownership assignment flow etc), but that 
can be handled in a separate PIP.
   
   ## API Changes
   Would like to propose the following solution
   
   - Extend the existing interface 
org.apache.pulsar.broker.intercept.BrokerInterceptor to support the following 
granular events with all relevant context information
   `void onConnectionCreated(ServerCnx cnx);`    
   `void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> 
metadata);`
   `void void consumerCreated(ServerCnx cnx,Consumer consumer, Map<String, 
String> metadata);`
   `void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, 
long ledgerId,long entryId, Rate rateIn,                                 
Topic.PublishContext publishContext);`
   `void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,long 
entryId, ByteBuf headersAndPayload);`
   `void void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck 
ackCmd);`
   
   - Support a new interface 
org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor to allow 
interception of write and read operations of a managed ledger and modify the 
payload. The interface details are given in the next section
   
   - Support dynamic load of managed ledger interceptor implementations through 
a broker configuration parameter 'brokerEntryPayloadProcessors' in class 
org.apache.pulsar.broker.ServiceConfiguration
       `Set<String> brokerEntryPayloadProcessors;`
   
   ## Implementation
   
   - interface BrokerInterceptor should be extended to include the additional 
callback methods specified in the section above.
   - The new callback methods need to be invoked at appropriate places in 
pulsar-broker module (in classes ServerCnx, Producer, Consumer)
   - A new interface ManagedLedgerPayloadProcessor to be added with the 
following content
   
![image](https://user-images.githubusercontent.com/92708902/142239049-205e6e31-4806-4369-98b8-1b458efdf215.png)
   
   - A new configuration parameter 'brokerEntryPayloadProcessors' should be 
supported in broker.conf. This can be a list of processors
   - The existing class 
org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl should be 
extended to support 2 additional operations.
   `processPayloadBeforeLedgerWrite(OpAddEntry op, ByteBuf ledgerData)`
   `processPayloadBeforeEntryCache(Bytebuf ledgerData)`
      ManagedLedgerInterceptorImpl should internally use 
ManagedLedgerPayloadProcessor instance(s) to handle the above payload 
processing operations.
   - OpAddEntry should use the method 
ManagedLedgerInterceptor::processPayloadBeforeLedgerWrite to support processing 
of the payload before it gets written to the ledger
   - EntryCacheManager (and EntryCacheImpl) should use method  
ManagedLedgerInterceptor::processPayloadBeforeEntryCache  to process the 
payload immediately after it is read from the ledger
   
   ## Reject Alternatives
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to