This is an automated email from the ASF dual-hosted git repository.
dahn pushed a commit to branch
6778-if-kafka-is-turned-on-internal-subs-dont-work
in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to
refs/heads/6778-if-kafka-is-turned-on-internal-subs-dont-work by this push:
new 424ab709f74 registry of message busses
424ab709f74 is described below
commit 424ab709f741f5eaeef31b0f322ddb5bf15d1580
Author: Daan Hoogland <[email protected]>
AuthorDate: Thu Sep 21 10:22:46 2023 +0200
registry of message busses
---
...-core-lifecycle-compute-context-inheritable.xml | 5 ++++
.../core/spring-core-registry-core-context.xml | 9 +++++--
.../apache/cloudstack/framework/events/Event.java | 12 ++++-----
.../framework/events/EventDistributor.java | 6 +++++
.../framework/events/EventDistributorImpl.java | 29 ++++++++++++++++++++++
.../cloudstack/mom/inmemory/InMemoryEventBus.java | 4 +++
.../apache/cloudstack/mom/kafka/KafkaEventBus.java | 5 +++-
.../cloudstack/mom/rabbitmq/RabbitMQEventBus.java | 3 +++
server/src/main/java/com/cloud/api/ApiServer.java | 17 +++++++------
.../hypervisor/HypervisorGuruManagerImpl.java | 2 +-
.../core/spring-server-core-managers-context.xml | 4 +++
.../cloud/utils/component/ComponentContext.java | 7 ++++++
12 files changed, 86 insertions(+), 17 deletions(-)
diff --git
a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
index fb0e8780ecc..ef6adab9dd9 100644
---
a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
+++
b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
@@ -39,6 +39,11 @@
<property name="typeClass" value="com.cloud.ha.FenceBuilder" />
</bean>
+ <bean
class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
+ <property name="registry" ref="eventBussesRegistry" />
+ <property name="typeClass"
value="org.apache.cloudstack.framework.events.EventBus" />
+ </bean>
+
<bean
class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
<property name="registry" ref="hypervisorGurusRegistry" />
<property name="typeClass" value="com.cloud.hypervisor.HypervisorGuru"
/>
diff --git
a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
index a7f384c76a9..a96e5159589 100644
---
a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
+++
b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
@@ -287,11 +287,16 @@
<property name="excludeKey" value="api.commands.exclude" />
</bean>
+ <bean id="eventBussesRegistry"
+
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
+ <property name="excludeKey" value="event.busses.exclude" />
+ </bean>
+
<bean id="hypervisorGurusRegistry"
-
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
+
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="hypervisor.gurus.exclude" />
</bean>
-
+
<bean id="vpcProvidersRegistry"
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="vpc.providers.exclude" />
diff --git
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
index 4a3eaf9e68c..bd1ea2aa5f9 100644
---
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
+++
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
@@ -31,11 +31,11 @@ public class Event {
String description;
public Event(String eventSource, String eventCategory, String eventType,
String resourceType, String resourceUUID) {
- this.eventCategory = eventCategory;
- this.eventType = eventType;
- this.eventSource = eventSource;
- this.resourceType = resourceType;
- this.resourceUUID = resourceUUID;
+ setEventCategory(eventCategory);
+ setEventType(eventType);
+ setEventSource(eventSource);
+ setResourceType(resourceType);
+ setResourceUUID(resourceUUID);
}
public String getEventCategory() {
@@ -68,7 +68,7 @@ public class Event {
public void setDescription(Object message) {
Gson gson = new Gson();
- this.description = gson.toJson(message).toString();
+ this.description = gson.toJson(message);
}
public void setDescription(String description) {
diff --git
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
new file mode 100644
index 00000000000..944d1f9377b
--- /dev/null
+++
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
@@ -0,0 +1,6 @@
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.Manager;
+
+public interface EventDistributor extends Manager {
+}
diff --git
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
new file mode 100644
index 00000000000..fc9cca35dc9
--- /dev/null
+++
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
@@ -0,0 +1,29 @@
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.ManagerBase;
+import org.apache.log4j.Logger;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+
+public class EventDistributorImpl extends ManagerBase implements
EventDistributor {
+ private static final Logger LOGGER =
Logger.getLogger(EventDistributorImpl.class);
+
+ public void setEventBusses(List<EventBus> eventBusses) {
+ this.eventBusses = eventBusses;
+ }
+
+ List<EventBus> eventBusses;
+
+ @PostConstruct
+ public void init() {
+ for (EventBus bus : eventBusses) {
+ try {
+ bus.publish(new Event("server", "NONE","starting", "server",
"NONE"));
+ } catch (EventBusException e) {
+ LOGGER.debug(String.format("no publish for bus %s",
bus.getClass().getName()), e);
+ }
+ }
+ }
+
+}
diff --git
a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
index b7d74df980f..eac54af9045 100644
---
a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
+++
b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
@@ -87,7 +87,11 @@ public class InMemoryEventBus extends ManagerBase implements
EventBus {
@Override
public void publish(Event event) throws EventBusException {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish %s", event));
+ }
if (subscribers == null || subscribers.isEmpty()) {
+ s_logger.trace("no subscribers, no publish");
return; // no subscriber to publish to, so just return
}
diff --git
a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
index 17a58a5d232..8ec943c0cee 100644
---
a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
+++
b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
@@ -100,7 +100,10 @@ public class KafkaEventBus extends ManagerBase implements
EventBus {
@Override
public void publish(Event event) throws EventBusException {
- ProducerRecord<String, String> record = new
ProducerRecord<String,String>(_topic, event.getResourceUUID(),
event.getDescription());
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish %s", event));
+ }
+ ProducerRecord<String, String> record = new ProducerRecord<>(_topic,
event.getResourceUUID(), event.getDescription());
_producer.send(record);
}
diff --git
a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
index f54c769908d..8c0d3f225c3 100644
---
a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
+++
b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -267,6 +267,9 @@ public class RabbitMQEventBus extends ManagerBase
implements EventBus {
// publish event on to the exchange created on AMQP server
@Override
public void publish(Event event) throws EventBusException {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish %s", event));
+ }
String routingKey = createRoutingKey(event);
String eventDescription = event.getDescription();
diff --git a/server/src/main/java/com/cloud/api/ApiServer.java
b/server/src/main/java/com/cloud/api/ApiServer.java
index b602ed2edbc..cea50273a6e 100644
--- a/server/src/main/java/com/cloud/api/ApiServer.java
+++ b/server/src/main/java/com/cloud/api/ApiServer.java
@@ -97,6 +97,7 @@ import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
@@ -197,26 +198,28 @@ public class ApiServer extends ManagerBase implements
HttpRequestHandler, ApiSer
*/
private static final String CONTROL_CHARACTERS =
"[\000-\011\013-\014\016-\037\177]";
+ @Inject
+ private AccountManager accountMgr;
+ @Inject
+ private APIAuthenticationManager authManager;
@Inject
private ApiDispatcher dispatcher;
@Inject
- private DispatchChainFactory dispatchChainFactory;
+ private AsyncJobManager asyncMgr;
@Inject
- private AccountManager accountMgr;
+ private DispatchChainFactory dispatchChainFactory;
@Inject
private DomainManager domainMgr;
@Inject
private DomainDao domainDao;
@Inject
- private UUIDManager uuidMgr;
- @Inject
- private AsyncJobManager asyncMgr;
- @Inject
private EntityManager entityMgr;
@Inject
- private APIAuthenticationManager authManager;
+ private EventDistributor eventDistributor;
@Inject
private ProjectDao projectDao;
+ @Inject
+ private UUIDManager uuidMgr;
private List<PluggableService> pluggableServices;
diff --git
a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
index a5f1f9fa5cb..03c2f485669 100644
--- a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
+++ b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
@@ -40,7 +40,7 @@ public class HypervisorGuruManagerImpl extends ManagerBase
implements Hypervisor
HostDao _hostDao;
List<HypervisorGuru> _hvGuruList;
- Map<HypervisorType, HypervisorGuru> _hvGurus = new
ConcurrentHashMap<HypervisorType, HypervisorGuru>();
+ Map<HypervisorType, HypervisorGuru> _hvGurus = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
diff --git
a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
index ee676aabbfa..fdcf2e20d4e 100644
---
a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
+++
b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
@@ -144,6 +144,10 @@
<property name="staticNatElements"
value="#{staticNatServiceProvidersRegistry.registered}" />
</bean>
+ <bean id="eventDistributor"
class="org.apache.cloudstack.framework.events.EventDistributorImpl" >
+ <property name="eventBusses" value="#{eventBussesRegistry.registered}"
/>
+ </bean>
+
<bean id="hypervisorGuruManagerImpl"
class="com.cloud.hypervisor.HypervisorGuruManagerImpl" >
<property name="hvGuruList"
value="#{hypervisorGurusRegistry.registered}" />
</bean>
diff --git
a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
index 8486dbf4bd4..decaa34cf95 100644
--- a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
+++ b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
@@ -178,6 +178,13 @@ public class ComponentContext implements
ApplicationContextAware {
return (T)s_appContext.getBean(name);
}
+ /**
+ * only ever used to get the event bus
+ *
+ * @param beanType the component type to return
+ * @return one of the component registered for the requested type
+ * @param <T>
+ */
public static <T> T getComponent(Class<T> beanType) {
assert (s_appContext != null);
Map<String, T> matchedTypes = getComponentsOfType(beanType);