dlg99 opened a new issue, #19224:
URL: https://github.com/apache/pulsar/issues/19224
### Motivation
Some Protocol Handlers may need to know about the topic-specific events to
update internal caches and/or state.
These mechanisms will be useful also for core Pulsar components (like the
Transactions subsystem) and probably we would be able to simplify the
interaction between the internal components in the broker by using an unified
mechanism to handle the lifecycle of topics.
Specific use cases:
KOP keeps some state for the topic and needs to handle such cases as:
- Topic Unloaded: release resources dedicated to the topic
- Topic Loaded: trigger loading of components tied to the partition
(GroupCoordinator, TransactionManager)
- Topic Deleted: remove any persistent state associated to the topic that is
stored in additional side system topics
- Topic Created: the same as “deleted” (ensure that there is no state on
system topics related to the new topic)
### Goal
This PIP defines a set of events needed for the protocol handlers (and for
internal broker components) to get notifications about topic-specific events as
seen by BrokerService. PIP outlines changes needed for protocol handlers to
keep/cache state consistent with BrokerService’s.
The changes should not affect Pulsar running without protocol handlers or
with protocol handlers that do not rely on the new events.
### API Changes
```java
/**
* Listener for the Topic events.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface TopicEventsListener {
/**
* Types of events currently supported.
* create/load/unload/delete
*/
enum TopicEvent {
// create events included into load events
CREATE,
LOAD,
UNLOAD,
DELETE,
}
/**
* Stages of events currently supported.
* before starting the event/successful completion/failed completion
*/
enum EventStage {
BEFORE,
SUCCESS,
FAILURE
}
/**
* Outcome of the listener.
* Ignored for events at final stages (SUCCESS/FAILURE),
*
*/
enum EventProcessingOutcome {
OK,
FAILURE,
NOT_ALLOWED
}
/**
* POJO for event processing result (outcome, message)
*/
@ToString(includeFieldNames=true)
@Data(staticConstructor="of")
class EventProcessingResult {
private final EventProcessingOutcome outcome;
private final String message;
}
/**
* Handle topic event.
* Choice of the thread / maintenance of the thread pool is up to the
event handlers.
* @param topicName - name of the topic
* @param event - TopicEvent
* @param stage - EventStage
* @param t - exception in case of FAILURE, if present/known
* @return - EventProcessingResult.
* EventProcessingResult.EventProcessingOutcome != OK indicates
request to cancel
* event at BEFORE stage.
*/
EventProcessingResult handleEvent(String topicName, TopicEvent event,
EventStage stage, Throwable t);
}
```
BrokerService:
```java
public void addTopicEventListener(TopicEventsListener... listeners)
public void removeTopicEventListener(TopicEventsListener... listeners)
```
### Implementation
See PR for the proposed implementation.
https://github.com/apache/pulsar/pull/19153
### Alternatives
Add new methods to the BrokerInterceptor API
### Anything else?
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]