This is an automated email from the ASF dual-hosted git repository.

dahn pushed a commit to branch 
6778-if-kafka-is-turned-on-internal-subs-dont-work
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to 
refs/heads/6778-if-kafka-is-turned-on-internal-subs-dont-work by this push:
     new 424ab709f74 registry of message busses
424ab709f74 is described below

commit 424ab709f741f5eaeef31b0f322ddb5bf15d1580
Author: Daan Hoogland <[email protected]>
AuthorDate: Thu Sep 21 10:22:46 2023 +0200

    registry of message busses
---
 ...-core-lifecycle-compute-context-inheritable.xml |  5 ++++
 .../core/spring-core-registry-core-context.xml     |  9 +++++--
 .../apache/cloudstack/framework/events/Event.java  | 12 ++++-----
 .../framework/events/EventDistributor.java         |  6 +++++
 .../framework/events/EventDistributorImpl.java     | 29 ++++++++++++++++++++++
 .../cloudstack/mom/inmemory/InMemoryEventBus.java  |  4 +++
 .../apache/cloudstack/mom/kafka/KafkaEventBus.java |  5 +++-
 .../cloudstack/mom/rabbitmq/RabbitMQEventBus.java  |  3 +++
 server/src/main/java/com/cloud/api/ApiServer.java  | 17 +++++++------
 .../hypervisor/HypervisorGuruManagerImpl.java      |  2 +-
 .../core/spring-server-core-managers-context.xml   |  4 +++
 .../cloud/utils/component/ComponentContext.java    |  7 ++++++
 12 files changed, 86 insertions(+), 17 deletions(-)

diff --git 
a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
 
b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
index fb0e8780ecc..ef6adab9dd9 100644
--- 
a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
+++ 
b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
@@ -39,6 +39,11 @@
         <property name="typeClass" value="com.cloud.ha.FenceBuilder" />
     </bean>
 
+    <bean 
class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
+        <property name="registry" ref="eventBussesRegistry" />
+        <property name="typeClass" 
value="org.apache.cloudstack.framework.events.EventBus" />
+    </bean>
+
     <bean 
class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
         <property name="registry" ref="hypervisorGurusRegistry" />
         <property name="typeClass" value="com.cloud.hypervisor.HypervisorGuru" 
/>
diff --git 
a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
 
b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
index a7f384c76a9..a96e5159589 100644
--- 
a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
+++ 
b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
@@ -287,11 +287,16 @@
         <property name="excludeKey" value="api.commands.exclude" />
     </bean>
 
+    <bean id="eventBussesRegistry"
+          
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
+        <property name="excludeKey" value="event.busses.exclude" />
+    </bean>
+
     <bean id="hypervisorGurusRegistry"
-        
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
+          
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
         <property name="excludeKey" value="hypervisor.gurus.exclude" />
     </bean>
-    
+
     <bean id="vpcProvidersRegistry"
         
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
         <property name="excludeKey" value="vpc.providers.exclude" />
diff --git 
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
 
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
index 4a3eaf9e68c..bd1ea2aa5f9 100644
--- 
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
+++ 
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
@@ -31,11 +31,11 @@ public class Event {
     String description;
 
     public Event(String eventSource, String eventCategory, String eventType, 
String resourceType, String resourceUUID) {
-        this.eventCategory = eventCategory;
-        this.eventType = eventType;
-        this.eventSource = eventSource;
-        this.resourceType = resourceType;
-        this.resourceUUID = resourceUUID;
+        setEventCategory(eventCategory);
+        setEventType(eventType);
+        setEventSource(eventSource);
+        setResourceType(resourceType);
+        setResourceUUID(resourceUUID);
     }
 
     public String getEventCategory() {
@@ -68,7 +68,7 @@ public class Event {
 
     public void setDescription(Object message) {
         Gson gson = new Gson();
-        this.description = gson.toJson(message).toString();
+        this.description = gson.toJson(message);
     }
 
     public void setDescription(String description) {
diff --git 
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
 
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
new file mode 100644
index 00000000000..944d1f9377b
--- /dev/null
+++ 
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
@@ -0,0 +1,6 @@
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.Manager;
+
+public interface EventDistributor extends Manager {
+}
diff --git 
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
 
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
new file mode 100644
index 00000000000..fc9cca35dc9
--- /dev/null
+++ 
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
@@ -0,0 +1,29 @@
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.ManagerBase;
+import org.apache.log4j.Logger;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+
+public class EventDistributorImpl extends ManagerBase implements 
EventDistributor {
+    private static final Logger LOGGER = 
Logger.getLogger(EventDistributorImpl.class);
+
+    public void setEventBusses(List<EventBus> eventBusses) {
+        this.eventBusses = eventBusses;
+    }
+
+    List<EventBus> eventBusses;
+
+    @PostConstruct
+    public void init() {
+        for (EventBus bus : eventBusses) {
+            try {
+                bus.publish(new Event("server", "NONE","starting", "server", 
"NONE"));
+            } catch (EventBusException e) {
+                LOGGER.debug(String.format("no publish for bus %s", 
bus.getClass().getName()), e);
+            }
+        }
+    }
+
+}
diff --git 
a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
 
b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
index b7d74df980f..eac54af9045 100644
--- 
a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
+++ 
b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
@@ -87,7 +87,11 @@ public class InMemoryEventBus extends ManagerBase implements 
EventBus {
 
     @Override
     public void publish(Event event) throws EventBusException {
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace(String.format("publish %s", event));
+        }
         if (subscribers == null || subscribers.isEmpty()) {
+            s_logger.trace("no subscribers, no publish");
             return; // no subscriber to publish to, so just return
         }
 
diff --git 
a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
 
b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
index 17a58a5d232..8ec943c0cee 100644
--- 
a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
+++ 
b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
@@ -100,7 +100,10 @@ public class KafkaEventBus extends ManagerBase implements 
EventBus {
 
     @Override
     public void publish(Event event) throws EventBusException {
-        ProducerRecord<String, String> record = new 
ProducerRecord<String,String>(_topic, event.getResourceUUID(), 
event.getDescription());
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace(String.format("publish %s", event));
+        }
+        ProducerRecord<String, String> record = new ProducerRecord<>(_topic, 
event.getResourceUUID(), event.getDescription());
         _producer.send(record);
     }
 
diff --git 
a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
 
b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
index f54c769908d..8c0d3f225c3 100644
--- 
a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
+++ 
b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -267,6 +267,9 @@ public class RabbitMQEventBus extends ManagerBase 
implements EventBus {
     // publish event on to the exchange created on AMQP server
     @Override
     public void publish(Event event) throws EventBusException {
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace(String.format("publish %s", event));
+        }
 
         String routingKey = createRoutingKey(event);
         String eventDescription = event.getDescription();
diff --git a/server/src/main/java/com/cloud/api/ApiServer.java 
b/server/src/main/java/com/cloud/api/ApiServer.java
index b602ed2edbc..cea50273a6e 100644
--- a/server/src/main/java/com/cloud/api/ApiServer.java
+++ b/server/src/main/java/com/cloud/api/ApiServer.java
@@ -97,6 +97,7 @@ import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.events.EventBus;
 import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventDistributor;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
@@ -197,26 +198,28 @@ public class ApiServer extends ManagerBase implements 
HttpRequestHandler, ApiSer
      */
     private static final String CONTROL_CHARACTERS = 
"[\000-\011\013-\014\016-\037\177]";
 
+    @Inject
+    private AccountManager accountMgr;
+    @Inject
+    private APIAuthenticationManager authManager;
     @Inject
     private ApiDispatcher dispatcher;
     @Inject
-    private DispatchChainFactory dispatchChainFactory;
+    private AsyncJobManager asyncMgr;
     @Inject
-    private AccountManager accountMgr;
+    private DispatchChainFactory dispatchChainFactory;
     @Inject
     private DomainManager domainMgr;
     @Inject
     private DomainDao domainDao;
     @Inject
-    private UUIDManager uuidMgr;
-    @Inject
-    private AsyncJobManager asyncMgr;
-    @Inject
     private EntityManager entityMgr;
     @Inject
-    private APIAuthenticationManager authManager;
+    private EventDistributor eventDistributor;
     @Inject
     private ProjectDao projectDao;
+    @Inject
+    private UUIDManager uuidMgr;
 
     private List<PluggableService> pluggableServices;
 
diff --git 
a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java 
b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
index a5f1f9fa5cb..03c2f485669 100644
--- a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
+++ b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
@@ -40,7 +40,7 @@ public class HypervisorGuruManagerImpl extends ManagerBase 
implements Hypervisor
     HostDao _hostDao;
 
     List<HypervisorGuru> _hvGuruList;
-    Map<HypervisorType, HypervisorGuru> _hvGurus = new 
ConcurrentHashMap<HypervisorType, HypervisorGuru>();
+    Map<HypervisorType, HypervisorGuru> _hvGurus = new ConcurrentHashMap<>();
 
     @PostConstruct
     public void init() {
diff --git 
a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
 
b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
index ee676aabbfa..fdcf2e20d4e 100644
--- 
a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
+++ 
b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
@@ -144,6 +144,10 @@
         <property name="staticNatElements" 
value="#{staticNatServiceProvidersRegistry.registered}" />
     </bean>
 
+    <bean id="eventDistributor" 
class="org.apache.cloudstack.framework.events.EventDistributorImpl" >
+        <property name="eventBusses" value="#{eventBussesRegistry.registered}" 
/>
+    </bean>
+
     <bean id="hypervisorGuruManagerImpl" 
class="com.cloud.hypervisor.HypervisorGuruManagerImpl" >
         <property name="hvGuruList" 
value="#{hypervisorGurusRegistry.registered}" />
     </bean>
diff --git 
a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java 
b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
index 8486dbf4bd4..decaa34cf95 100644
--- a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
+++ b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
@@ -178,6 +178,13 @@ public class ComponentContext implements 
ApplicationContextAware {
         return (T)s_appContext.getBean(name);
     }
 
+    /**
+     * only ever used to get the event bus
+     *
+     * @param beanType the component type to return
+     * @return one of the component registered for the requested type
+     * @param <T>
+     */
     public static <T> T getComponent(Class<T> beanType) {
         assert (s_appContext != null);
         Map<String, T> matchedTypes = getComponentsOfType(beanType);

Reply via email to