This is an automated email from the ASF dual-hosted git repository. dahn pushed a commit to branch 6778-if-kafka-is-turned-on-internal-subs-dont-work in repository https://gitbox.apache.org/repos/asf/cloudstack.git
commit da668f3d7c3e7a1a7ed2635fb40c1b42aede018e Author: Daan Hoogland <[email protected]> AuthorDate: Thu Sep 21 10:22:46 2023 +0200 registry of message busses --- ...-core-lifecycle-compute-context-inheritable.xml | 5 ++++ .../core/spring-core-registry-core-context.xml | 9 +++++-- .../apache/cloudstack/framework/events/Event.java | 12 ++++----- .../framework/events/EventDistributor.java | 6 +++++ .../framework/events/EventDistributorImpl.java | 29 ++++++++++++++++++++++ .../cloudstack/mom/inmemory/InMemoryEventBus.java | 4 +++ .../apache/cloudstack/mom/kafka/KafkaEventBus.java | 5 +++- .../cloudstack/mom/rabbitmq/RabbitMQEventBus.java | 3 +++ server/src/main/java/com/cloud/api/ApiServer.java | 17 +++++++------ .../hypervisor/HypervisorGuruManagerImpl.java | 2 +- .../core/spring-server-core-managers-context.xml | 4 +++ .../cloud/utils/component/ComponentContext.java | 7 ++++++ 12 files changed, 86 insertions(+), 17 deletions(-) diff --git a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml index fb0e8780ecc..ef6adab9dd9 100644 --- a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml +++ b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml @@ -39,6 +39,11 @@ <property name="typeClass" value="com.cloud.ha.FenceBuilder" /> </bean> + <bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle"> + <property name="registry" ref="eventBussesRegistry" /> + <property name="typeClass" value="org.apache.cloudstack.framework.events.EventBus" /> + </bean> + <bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle"> <property name="registry" ref="hypervisorGurusRegistry" /> <property name="typeClass" value="com.cloud.hypervisor.HypervisorGuru" /> diff --git a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml index a7f384c76a9..a96e5159589 100644 --- a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml +++ b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml @@ -287,11 +287,16 @@ <property name="excludeKey" value="api.commands.exclude" /> </bean> + <bean id="eventBussesRegistry" + class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry"> + <property name="excludeKey" value="event.busses.exclude" /> + </bean> + <bean id="hypervisorGurusRegistry" - class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry"> + class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry"> <property name="excludeKey" value="hypervisor.gurus.exclude" /> </bean> - + <bean id="vpcProvidersRegistry" class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry"> <property name="excludeKey" value="vpc.providers.exclude" /> diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java index 4a3eaf9e68c..bd1ea2aa5f9 100644 --- a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java +++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java @@ -31,11 +31,11 @@ public class Event { String description; public Event(String eventSource, String eventCategory, String eventType, String resourceType, String resourceUUID) { - this.eventCategory = eventCategory; - this.eventType = eventType; - this.eventSource = eventSource; - this.resourceType = resourceType; - this.resourceUUID = resourceUUID; + setEventCategory(eventCategory); + setEventType(eventType); + setEventSource(eventSource); + setResourceType(resourceType); + setResourceUUID(resourceUUID); } public String getEventCategory() { @@ -68,7 +68,7 @@ public class Event { public void setDescription(Object message) { Gson gson = new Gson(); - this.description = gson.toJson(message).toString(); + this.description = gson.toJson(message); } public void setDescription(String description) { diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java new file mode 100644 index 00000000000..944d1f9377b --- /dev/null +++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java @@ -0,0 +1,6 @@ +package org.apache.cloudstack.framework.events; + +import com.cloud.utils.component.Manager; + +public interface EventDistributor extends Manager { +} diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java new file mode 100644 index 00000000000..fc9cca35dc9 --- /dev/null +++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java @@ -0,0 +1,29 @@ +package org.apache.cloudstack.framework.events; + +import com.cloud.utils.component.ManagerBase; +import org.apache.log4j.Logger; + +import javax.annotation.PostConstruct; +import java.util.List; + +public class EventDistributorImpl extends ManagerBase implements EventDistributor { + private static final Logger LOGGER = Logger.getLogger(EventDistributorImpl.class); + + public void setEventBusses(List<EventBus> eventBusses) { + this.eventBusses = eventBusses; + } + + List<EventBus> eventBusses; + + @PostConstruct + public void init() { + for (EventBus bus : eventBusses) { + try { + bus.publish(new Event("server", "NONE","starting", "server", "NONE")); + } catch (EventBusException e) { + LOGGER.debug(String.format("no publish for bus %s", bus.getClass().getName()), e); + } + } + } + +} diff --git a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java index b7d74df980f..eac54af9045 100644 --- a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java +++ b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java @@ -87,7 +87,11 @@ public class InMemoryEventBus extends ManagerBase implements EventBus { @Override public void publish(Event event) throws EventBusException { + if (s_logger.isTraceEnabled()) { + s_logger.trace(String.format("publish %s", event)); + } if (subscribers == null || subscribers.isEmpty()) { + s_logger.trace("no subscribers, no publish"); return; // no subscriber to publish to, so just return } diff --git a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java index 17a58a5d232..8ec943c0cee 100644 --- a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java +++ b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java @@ -100,7 +100,10 @@ public class KafkaEventBus extends ManagerBase implements EventBus { @Override public void publish(Event event) throws EventBusException { - ProducerRecord<String, String> record = new ProducerRecord<String,String>(_topic, event.getResourceUUID(), event.getDescription()); + if (s_logger.isTraceEnabled()) { + s_logger.trace(String.format("publish %s", event)); + } + ProducerRecord<String, String> record = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription()); _producer.send(record); } diff --git a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java index f54c769908d..8c0d3f225c3 100644 --- a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java +++ b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java @@ -267,6 +267,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { // publish event on to the exchange created on AMQP server @Override public void publish(Event event) throws EventBusException { + if (s_logger.isTraceEnabled()) { + s_logger.trace(String.format("publish %s", event)); + } String routingKey = createRoutingKey(event); String eventDescription = event.getDescription(); diff --git a/server/src/main/java/com/cloud/api/ApiServer.java b/server/src/main/java/com/cloud/api/ApiServer.java index b602ed2edbc..cea50273a6e 100644 --- a/server/src/main/java/com/cloud/api/ApiServer.java +++ b/server/src/main/java/com/cloud/api/ApiServer.java @@ -97,6 +97,7 @@ import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.Configurable; import org.apache.cloudstack.framework.events.EventBus; import org.apache.cloudstack.framework.events.EventBusException; +import org.apache.cloudstack.framework.events.EventDistributor; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; @@ -197,26 +198,28 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer */ private static final String CONTROL_CHARACTERS = "[\000-\011\013-\014\016-\037\177]"; + @Inject + private AccountManager accountMgr; + @Inject + private APIAuthenticationManager authManager; @Inject private ApiDispatcher dispatcher; @Inject - private DispatchChainFactory dispatchChainFactory; + private AsyncJobManager asyncMgr; @Inject - private AccountManager accountMgr; + private DispatchChainFactory dispatchChainFactory; @Inject private DomainManager domainMgr; @Inject private DomainDao domainDao; @Inject - private UUIDManager uuidMgr; - @Inject - private AsyncJobManager asyncMgr; - @Inject private EntityManager entityMgr; @Inject - private APIAuthenticationManager authManager; + private EventDistributor eventDistributor; @Inject private ProjectDao projectDao; + @Inject + private UUIDManager uuidMgr; private List<PluggableService> pluggableServices; diff --git a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java index a5f1f9fa5cb..03c2f485669 100644 --- a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java +++ b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java @@ -40,7 +40,7 @@ public class HypervisorGuruManagerImpl extends ManagerBase implements Hypervisor HostDao _hostDao; List<HypervisorGuru> _hvGuruList; - Map<HypervisorType, HypervisorGuru> _hvGurus = new ConcurrentHashMap<HypervisorType, HypervisorGuru>(); + Map<HypervisorType, HypervisorGuru> _hvGurus = new ConcurrentHashMap<>(); @PostConstruct public void init() { diff --git a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml index ee676aabbfa..fdcf2e20d4e 100644 --- a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml +++ b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml @@ -144,6 +144,10 @@ <property name="staticNatElements" value="#{staticNatServiceProvidersRegistry.registered}" /> </bean> + <bean id="eventDistributor" class="org.apache.cloudstack.framework.events.EventDistributorImpl" > + <property name="eventBusses" value="#{eventBussesRegistry.registered}" /> + </bean> + <bean id="hypervisorGuruManagerImpl" class="com.cloud.hypervisor.HypervisorGuruManagerImpl" > <property name="hvGuruList" value="#{hypervisorGurusRegistry.registered}" /> </bean> diff --git a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java index 8486dbf4bd4..decaa34cf95 100644 --- a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java +++ b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java @@ -178,6 +178,13 @@ public class ComponentContext implements ApplicationContextAware { return (T)s_appContext.getBean(name); } + /** + * only ever used to get the event bus + * + * @param beanType the component type to return + * @return one of the component registered for the requested type + * @param <T> + */ public static <T> T getComponent(Class<T> beanType) { assert (s_appContext != null); Map<String, T> matchedTypes = getComponentsOfType(beanType);
