This is an automated email from the ASF dual-hosted git repository.
dahn pushed a commit to branch
6778-if-kafka-is-turned-on-internal-subs-dont-work
in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to
refs/heads/6778-if-kafka-is-turned-on-internal-subs-dont-work by this push:
new f743be4f1f5 extra logging
f743be4f1f5 is described below
commit f743be4f1f50d036de5edde3dcd84e7b1db9510c
Author: Daan Hoogland <[email protected]>
AuthorDate: Fri Sep 22 19:17:52 2023 +0200
extra logging
---
.../cloudstack/framework/events/EventDistributorImpl.java | 4 +++-
.../org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java | 9 ++++++++-
.../java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java | 9 ++++++++-
.../org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java | 10 ++++++++--
4 files changed, 27 insertions(+), 5 deletions(-)
diff --git
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
index eed60c45ddd..0ef9e09cff6 100644
---
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
+++
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
@@ -36,6 +36,9 @@ public class EventDistributorImpl extends ManagerBase
implements EventDistributo
@PostConstruct
public void init() {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(String.format("testing %d event busses",
eventBusses.size()));
+ }
for (EventBus bus : eventBusses) {
try {
bus.publish(new Event("server", "NONE","starting", "server",
"NONE"));
@@ -44,5 +47,4 @@ public class EventDistributorImpl extends ManagerBase
implements EventDistributo
}
}
}
-
}
diff --git
a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
index eac54af9045..5538a50988d 100644
---
a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
+++
b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
@@ -62,6 +62,10 @@ public class InMemoryEventBus extends ManagerBase implements
EventBus {
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic
object passed.");
}
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type
\'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(),
topic.getEventSource()));
+ }
+
UUID subscriberId = UUID.randomUUID();
subscribers.put(subscriberId, new Pair<EventTopic,
EventSubscriber>(topic, subscriber));
@@ -70,6 +74,9 @@ public class InMemoryEventBus extends ManagerBase implements
EventBus {
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber)
throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
if (subscriberId == null) {
throw new EventBusException("Cannot unregister a null
subscriberId.");
}
@@ -88,7 +95,7 @@ public class InMemoryEventBus extends ManagerBase implements
EventBus {
@Override
public void publish(Event event) throws EventBusException {
if (s_logger.isTraceEnabled()) {
- s_logger.trace(String.format("publish %s", event));
+ s_logger.trace(String.format("publish \'%s\'",
event.getDescription()));
}
if (subscribers == null || subscribers.isEmpty()) {
s_logger.trace("no subscribers, no publish");
diff --git
a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
index 8ec943c0cee..f680ad29c24 100644
---
a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
+++
b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
@@ -89,19 +89,26 @@ public class KafkaEventBus extends ManagerBase implements
EventBus {
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws
EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type
\'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(),
topic.getEventSource()));
+ }
+
/* NOOP */
return UUID.randomUUID();
}
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber)
throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
/* NOOP */
}
@Override
public void publish(Event event) throws EventBusException {
if (s_logger.isTraceEnabled()) {
- s_logger.trace(String.format("publish %s", event));
+ s_logger.trace(String.format("publish \'%s\'",
event.getDescription()));
}
ProducerRecord<String, String> record = new ProducerRecord<>(_topic,
event.getResourceUUID(), event.getDescription());
_producer.send(record);
diff --git
a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
index 8c0d3f225c3..5e5589aca5c 100644
---
a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
+++
b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -187,11 +187,14 @@ public class RabbitMQEventBus extends ManagerBase
implements EventBus {
*/
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws
EventBusException {
-
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic
object passed.");
}
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type
\'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(),
topic.getEventSource()));
+ }
+
// create a UUID, that will be used for managing subscriptions and
also used as queue name
// for on the queue used for the subscriber on the AMQP broker
UUID queueId = UUID.randomUUID();
@@ -252,6 +255,9 @@ public class RabbitMQEventBus extends ManagerBase
implements EventBus {
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber)
throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
try {
String classname = subscriber.getClass().getName();
String queueName =
UUID.nameUUIDFromBytes(classname.getBytes()).toString();
@@ -268,7 +274,7 @@ public class RabbitMQEventBus extends ManagerBase
implements EventBus {
@Override
public void publish(Event event) throws EventBusException {
if (s_logger.isTraceEnabled()) {
- s_logger.trace(String.format("publish %s", event));
+ s_logger.trace(String.format("publish \'%s\'",
event.getDescription()));
}
String routingKey = createRoutingKey(event);