sijie commented on a change in pull request #5457: [transaction-coordinator]
Ownership change listeners
URL: https://github.com/apache/pulsar/pull/5457#discussion_r339350510
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -858,6 +867,89 @@ public void removeOwnedServiceUnits(NamespaceName nsName,
BundlesData bundleData
bundleFactory.invalidateBundleCache(nsName);
}
+ protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
+ for (NamespaceBundleOwnershipListener bundleOwnedListener :
bundleOwnershipListeners) {
+ try {
+ if (bundleOwnedListener.getFilter().test(bundle)) {
+ bundleOwnedListener.onLoad(bundle);
+ }
+ } catch (Throwable t) {
+ LOG.error("Call bundle {} ownership lister error", bundle, t);
+ }
+ }
+ if (!CollectionUtils.isEmpty(topicOwnershipListeners)) {
+ getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics,
ex) -> {
+ if (ex == null) {
+ for (String topic : topics) {
+ TopicName topicName = TopicName.get(topic);
+ for (TopicOwnershipListener topicOwnershipListener :
topicOwnershipListeners) {
+ if
(topicOwnershipListener.getFilter().test(topicName)) {
+ try {
+ topicOwnershipListener.onLoad(topicName);
+ } catch (Throwable t) {
+ LOG.error("Call topic {} ownership lister
error", topic, t);
+ }
+ }
+ }
+ }
+ } else {
+ LOG.error("Get owned topic list for namespace bundle {}
error.", bundle, ex);
+ }
+ });
+ }
+ }
+
+ protected void onNamespaceBundleUnload(NamespaceBundle bundle) {
+ for (NamespaceBundleOwnershipListener bundleOwnedListener :
bundleOwnershipListeners) {
+ try {
+ if (bundleOwnedListener.getFilter().test(bundle)) {
+ bundleOwnedListener.unLoad(bundle);
+ }
+ } catch (Throwable t) {
+ LOG.error("Call bundle {} ownership lister error", bundle, t);
+ }
+ }
+ if (!CollectionUtils.isEmpty(topicOwnershipListeners)) {
+ getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics,
ex) -> {
+ if (ex == null) {
+ for (String topic : topics) {
+ TopicName topicName = TopicName.get(topic);
+ for (TopicOwnershipListener topicOwnershipListener :
topicOwnershipListeners) {
+ if
(topicOwnershipListener.getFilter().test(topicName)) {
+ try {
+ topicOwnershipListener.unLoad(topicName);
+ } catch (Throwable t) {
+ LOG.error("Call topic {} ownership lister
error", topic, t);
+ }
+ }
+ }
+ }
+ } else {
+ LOG.error("Get owned topic list for namespace bundle {}
error.", bundle, ex);
+ }
+ });
+ }
+ }
+
+ public void
addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener...
listeners) {
+ checkNotNull(listeners);
+ for (NamespaceBundleOwnershipListener listener : listeners) {
+ if (listener != null) {
+ bundleOwnershipListeners.add(listener);
+ }
+ }
+ getOwnedServiceUnits().forEach(this::onNamespaceBundleOwned);
+ }
+
+ public void addTopicOwnershipListener(TopicOwnershipListener... listeners)
{
+ checkNotNull(listeners);
+ for (TopicOwnershipListener listener : listeners) {
+ if (listener != null) {
+ topicOwnershipListeners.add(listener);
+ }
+ }
Review comment:
for newly added listeners, we need to trigger `#onLoad` for already owned
topics, no?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services