This is an automated email from the ASF dual-hosted git repository. dahn pushed a commit to branch 6778-if-kafka-is-turned-on-internal-subs-dont-work in repository https://gitbox.apache.org/repos/asf/cloudstack.git
commit 167c91f997ad14e82d3716451e210c3d122021c1 Author: Daan Hoogland <[email protected]> AuthorDate: Fri Sep 22 19:17:52 2023 +0200 extra logging --- .../cloudstack/framework/events/EventDistributorImpl.java | 4 +++- .../org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java | 9 ++++++++- .../java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java | 9 ++++++++- .../org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java | 10 ++++++++-- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java index eed60c45ddd..0ef9e09cff6 100644 --- a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java +++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java @@ -36,6 +36,9 @@ public class EventDistributorImpl extends ManagerBase implements EventDistributo @PostConstruct public void init() { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("testing %d event busses", eventBusses.size())); + } for (EventBus bus : eventBusses) { try { bus.publish(new Event("server", "NONE","starting", "server", "NONE")); @@ -44,5 +47,4 @@ public class EventDistributorImpl extends ManagerBase implements EventDistributo } } } - } diff --git a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java index eac54af9045..5538a50988d 100644 --- a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java +++ b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java @@ -62,6 +62,10 @@ public class InMemoryEventBus extends ManagerBase implements EventBus { if (subscriber == null || topic == null) { throw new EventBusException("Invalid EventSubscriber/EventTopic object passed."); } + if (s_logger.isDebugEnabled()) { + s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource())); + } + UUID subscriberId = UUID.randomUUID(); subscribers.put(subscriberId, new Pair<EventTopic, EventSubscriber>(topic, subscriber)); @@ -70,6 +74,9 @@ public class InMemoryEventBus extends ManagerBase implements EventBus { @Override public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + if (s_logger.isDebugEnabled()) { + s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId)); + } if (subscriberId == null) { throw new EventBusException("Cannot unregister a null subscriberId."); } @@ -88,7 +95,7 @@ public class InMemoryEventBus extends ManagerBase implements EventBus { @Override public void publish(Event event) throws EventBusException { if (s_logger.isTraceEnabled()) { - s_logger.trace(String.format("publish %s", event)); + s_logger.trace(String.format("publish \'%s\'", event.getDescription())); } if (subscribers == null || subscribers.isEmpty()) { s_logger.trace("no subscribers, no publish"); diff --git a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java index 8ec943c0cee..f680ad29c24 100644 --- a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java +++ b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java @@ -89,19 +89,26 @@ public class KafkaEventBus extends ManagerBase implements EventBus { @Override public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { + if (s_logger.isDebugEnabled()) { + s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource())); + } + /* NOOP */ return UUID.randomUUID(); } @Override public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + if (s_logger.isDebugEnabled()) { + s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId)); + } /* NOOP */ } @Override public void publish(Event event) throws EventBusException { if (s_logger.isTraceEnabled()) { - s_logger.trace(String.format("publish %s", event)); + s_logger.trace(String.format("publish \'%s\'", event.getDescription())); } ProducerRecord<String, String> record = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription()); _producer.send(record); diff --git a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java index 8c0d3f225c3..5e5589aca5c 100644 --- a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java +++ b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java @@ -187,11 +187,14 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { */ @Override public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { - if (subscriber == null || topic == null) { throw new EventBusException("Invalid EventSubscriber/EventTopic object passed."); } + if (s_logger.isDebugEnabled()) { + s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource())); + } + // create a UUID, that will be used for managing subscriptions and also used as queue name // for on the queue used for the subscriber on the AMQP broker UUID queueId = UUID.randomUUID(); @@ -252,6 +255,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { @Override public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + if (s_logger.isDebugEnabled()) { + s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId)); + } try { String classname = subscriber.getClass().getName(); String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString(); @@ -268,7 +274,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { @Override public void publish(Event event) throws EventBusException { if (s_logger.isTraceEnabled()) { - s_logger.trace(String.format("publish %s", event)); + s_logger.trace(String.format("publish \'%s\'", event.getDescription())); } String routingKey = createRoutingKey(event);
