This is an automated email from the ASF dual-hosted git repository. dahn pushed a commit to branch 6778-if-kafka-is-turned-on-internal-subs-dont-work in repository https://gitbox.apache.org/repos/asf/cloudstack.git
commit 9b78fe8ecf0db14ec2c2359a18ff26ffe3dee045 Author: Daan Hoogland <[email protected]> AuthorDate: Tue Sep 26 13:17:27 2023 +0200 events via the distributor --- .../com/cloud/network/NetworkStateListener.java | 24 +++----------- .../framework/events/EventDistributorImpl.java | 1 + .../network/contrail/management/EventUtils.java | 5 +++ .../storage/listener/SnapshotStateListener.java | 19 +++-------- .../storage/listener/VolumeStateListener.java | 38 +++++++++------------- .../java/com/cloud/vm/UserVmStateListener.java | 32 +++++------------- 6 files changed, 38 insertions(+), 81 deletions(-) diff --git a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java index 1e1251d8cdc..c506bd66cc9 100644 --- a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java +++ b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java @@ -25,15 +25,11 @@ import java.util.Map; import javax.inject.Inject; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; -import org.apache.cloudstack.framework.events.EventBus; -import org.apache.cloudstack.framework.events.EventBusException; -import org.apache.log4j.Logger; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.apache.cloudstack.framework.events.EventDistributor; import com.cloud.event.EventCategory; import com.cloud.network.Network.Event; import com.cloud.network.Network.State; -import com.cloud.utils.component.ComponentContext; import com.cloud.utils.fsm.StateListener; import com.cloud.utils.fsm.StateMachine2; @@ -41,10 +37,8 @@ public class NetworkStateListener implements StateListener<State, Event, Network @Inject private ConfigurationDao _configDao; - - private static EventBus s_eventBus = null; - - private static final Logger s_logger = Logger.getLogger(NetworkStateListener.class); + @Inject + private EventDistributor eventDistributor; public NetworkStateListener(ConfigurationDao configDao) { _configDao = configDao; @@ -72,11 +66,6 @@ public class NetworkStateListener implements StateListener<State, Event, Network boolean configValue = Boolean.parseBoolean(value); if(!configValue) return; - try { - s_eventBus = ComponentContext.getComponent(EventBus.class); - } catch (NoSuchBeanDefinitionException nbe) { - return; // no provider is configured to provide events bus, so just return - } String resourceName = getEntityFromClassName(Network.class.getName()); org.apache.cloudstack.framework.events.Event eventMsg = @@ -91,11 +80,8 @@ public class NetworkStateListener implements StateListener<State, Event, Network eventDescription.put("eventDateTime", eventDate); eventMsg.setDescription(eventDescription); - try { - s_eventBus.publish(eventMsg); - } catch (EventBusException e) { - s_logger.warn("Failed to publish state change event on the event bus."); - } + + eventDistributor.publish(eventMsg); } private String getEntityFromClassName(String entityClassName) { diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java index 1b151eb363c..cf8e09a8370 100644 --- a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java +++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java @@ -45,6 +45,7 @@ public class EventDistributorImpl extends ManagerBase implements EventDistributo @Override public List<EventBusException> publish(Event event) { + LOGGER.info(String.format("publishing %s to %d event busses", (event == null ? "<none>" : event.getDescription()), eventBusses.size())); List<EventBusException> exceptions = new ArrayList<>(); for (EventBus bus : eventBusses) { try { diff --git a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java index 78ec01344ca..d2f2fd2182a 100644 --- a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java +++ b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.ArrayList; import java.util.List; +import org.apache.cloudstack.framework.events.EventDistributor; import org.apache.log4j.Logger; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.stereotype.Component; @@ -42,10 +43,14 @@ import com.cloud.server.ManagementService; import com.cloud.utils.component.ComponentContext; import com.cloud.utils.component.ComponentMethodInterceptor; +import javax.inject.Inject; + @Component public class EventUtils { private static final Logger s_logger = Logger.getLogger(EventUtils.class); + @Inject + private EventDistributor eventDistributor; protected static EventBus s_eventBus = null; public EventUtils() { diff --git a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java index c68b05c5062..7a651526002 100644 --- a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java +++ b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java @@ -26,10 +26,8 @@ import javax.annotation.PostConstruct; import javax.inject.Inject; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; -import org.apache.cloudstack.framework.events.EventBus; -import org.apache.cloudstack.framework.events.EventBusException; +import org.apache.cloudstack.framework.events.EventDistributor; import org.apache.log4j.Logger; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.stereotype.Component; import com.cloud.configuration.Config; @@ -39,18 +37,18 @@ import com.cloud.storage.Snapshot; import com.cloud.storage.Snapshot.Event; import com.cloud.storage.Snapshot.State; import com.cloud.storage.SnapshotVO; -import com.cloud.utils.component.ComponentContext; import com.cloud.utils.fsm.StateListener; import com.cloud.utils.fsm.StateMachine2; @Component public class SnapshotStateListener implements StateListener<State, Event, SnapshotVO> { - protected static EventBus s_eventBus = null; protected static ConfigurationDao s_configDao; @Inject private ConfigurationDao configDao; + @Inject + private EventDistributor eventDistributor; private static final Logger s_logger = Logger.getLogger(SnapshotStateListener.class); @@ -83,11 +81,6 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh if(!configValue) { return; } - try { - s_eventBus = ComponentContext.getComponent(EventBus.class); - } catch (NoSuchBeanDefinitionException nbe) { - return; // no provider is configured to provide events bus, so just return - } String resourceName = getEntityFromClassName(Snapshot.class.getName()); org.apache.cloudstack.framework.events.Event eventMsg = @@ -103,11 +96,7 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh eventDescription.put("eventDateTime", eventDate); eventMsg.setDescription(eventDescription); - try { - s_eventBus.publish(eventMsg); - } catch (EventBusException e) { - s_logger.warn("Failed to publish state change event on the event bus."); - } + eventDistributor.publish(eventMsg); } private String getEntityFromClassName(String entityClassName) { diff --git a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java index d2a4dc93b2f..9f063fe8913 100644 --- a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java +++ b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java @@ -22,34 +22,35 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; +import javax.inject.Inject; + +import com.cloud.configuration.Config; +import com.cloud.event.EventCategory; import com.cloud.event.EventTypes; import com.cloud.event.UsageEventUtils; +import com.cloud.server.ManagementService; +import com.cloud.storage.Volume; +import com.cloud.storage.Volume.Event; +import com.cloud.storage.Volume.State; +import com.cloud.utils.fsm.StateListener; import com.cloud.utils.fsm.StateMachine2; import com.cloud.vm.VMInstanceVO; import com.cloud.vm.VirtualMachine; import com.cloud.vm.dao.VMInstanceDao; -import org.apache.log4j.Logger; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; -import org.apache.cloudstack.framework.events.EventBus; -import org.apache.cloudstack.framework.events.EventBusException; +import org.apache.cloudstack.framework.events.EventDistributor; -import com.cloud.configuration.Config; -import com.cloud.event.EventCategory; -import com.cloud.server.ManagementService; -import com.cloud.storage.Volume; -import com.cloud.storage.Volume.Event; -import com.cloud.storage.Volume.State; -import com.cloud.utils.component.ComponentContext; -import com.cloud.utils.fsm.StateListener; +import org.apache.log4j.Logger; public class VolumeStateListener implements StateListener<State, Event, Volume> { - protected static EventBus s_eventBus = null; protected ConfigurationDao _configDao; protected VMInstanceDao _vmInstanceDao; + @Inject + private EventDistributor eventDistributor; + private static final Logger s_logger = Logger.getLogger(VolumeStateListener.class); public VolumeStateListener(ConfigurationDao configDao, VMInstanceDao vmInstanceDao) { @@ -99,11 +100,6 @@ public class VolumeStateListener implements StateListener<State, Event, Volume> boolean configValue = Boolean.parseBoolean(value); if(!configValue) return; - try { - s_eventBus = ComponentContext.getComponent(EventBus.class); - } catch (NoSuchBeanDefinitionException nbe) { - return; // no provider is configured to provide events bus, so just return - } String resourceName = getEntityFromClassName(Volume.class.getName()); org.apache.cloudstack.framework.events.Event eventMsg = @@ -119,11 +115,7 @@ public class VolumeStateListener implements StateListener<State, Event, Volume> eventDescription.put("eventDateTime", eventDate); eventMsg.setDescription(eventDescription); - try { - s_eventBus.publish(eventMsg); - } catch (EventBusException e) { - s_logger.warn("Failed to state change event on the event bus."); - } + eventDistributor.publish(eventMsg); } private String getEntityFromClassName(String entityClassName) { diff --git a/server/src/main/java/com/cloud/vm/UserVmStateListener.java b/server/src/main/java/com/cloud/vm/UserVmStateListener.java index e9f7e7c5c72..3935f49c4c0 100644 --- a/server/src/main/java/com/cloud/vm/UserVmStateListener.java +++ b/server/src/main/java/com/cloud/vm/UserVmStateListener.java @@ -24,15 +24,6 @@ import java.util.Map; import javax.inject.Inject; -import com.cloud.server.ManagementService; -import com.cloud.utils.fsm.StateMachine2; -import com.cloud.vm.dao.UserVmDao; -import org.apache.log4j.Logger; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; - -import org.apache.cloudstack.framework.config.dao.ConfigurationDao; -import org.apache.cloudstack.framework.events.EventBus; - import com.cloud.configuration.Config; import com.cloud.event.EventCategory; import com.cloud.event.EventTypes; @@ -41,11 +32,16 @@ import com.cloud.event.dao.UsageEventDao; import com.cloud.network.dao.NetworkDao; import com.cloud.network.dao.NetworkVO; import com.cloud.service.dao.ServiceOfferingDao; -import com.cloud.utils.component.ComponentContext; +import com.cloud.server.ManagementService; import com.cloud.utils.fsm.StateListener; +import com.cloud.utils.fsm.StateMachine2; import com.cloud.vm.VirtualMachine.Event; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.dao.NicDao; +import com.cloud.vm.dao.UserVmDao; + +import org.apache.cloudstack.framework.config.dao.ConfigurationDao; +import org.apache.cloudstack.framework.events.EventDistributor; public class UserVmStateListener implements StateListener<State, VirtualMachine.Event, VirtualMachine> { @@ -56,9 +52,7 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine. @Inject protected UserVmDao _userVmDao; @Inject protected UserVmManager _userVmMgr; @Inject protected ConfigurationDao _configDao; - private static final Logger s_logger = Logger.getLogger(UserVmStateListener.class); - - protected static EventBus s_eventBus = null; + @Inject private EventDistributor eventDistributor; public UserVmStateListener(UsageEventDao usageEventDao, NetworkDao networkDao, NicDao nicDao, ServiceOfferingDao offeringDao, UserVmDao userVmDao, UserVmManager userVmMgr, ConfigurationDao configDao) { @@ -128,11 +122,6 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine. boolean configValue = Boolean.parseBoolean(value); if(!configValue) return; - try { - s_eventBus = ComponentContext.getComponent(EventBus.class); - } catch (NoSuchBeanDefinitionException nbe) { - return; // no provider is configured to provide events bus, so just return - } String resourceName = getEntityFromClassName(VirtualMachine.class.getName()); org.apache.cloudstack.framework.events.Event eventMsg = @@ -149,12 +138,7 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine. eventDescription.put("eventDateTime", eventDate); eventMsg.setDescription(eventDescription); - try { - s_eventBus.publish(eventMsg); - } catch (org.apache.cloudstack.framework.events.EventBusException e) { - s_logger.warn("Failed to publish state change event on the event bus."); - } - + eventDistributor.publish(eventMsg); } private String getEntityFromClassName(String entityClassName) {
