This is an automated email from the ASF dual-hosted git repository.

dahn pushed a commit to branch 
6778-if-kafka-is-turned-on-internal-subs-dont-work
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to 
refs/heads/6778-if-kafka-is-turned-on-internal-subs-dont-work by this push:
     new 29208135620 events via the distributor
29208135620 is described below

commit 2920813562035a71b47e719a8d42fcb3198447f4
Author: Daan Hoogland <[email protected]>
AuthorDate: Tue Sep 26 13:17:27 2023 +0200

    events via the distributor
---
 .../com/cloud/network/NetworkStateListener.java    | 24 +++-----------
 .../framework/events/EventDistributorImpl.java     |  1 +
 .../network/contrail/management/EventUtils.java    |  5 +++
 .../storage/listener/SnapshotStateListener.java    | 19 +++--------
 .../storage/listener/VolumeStateListener.java      | 38 +++++++++-------------
 .../java/com/cloud/vm/UserVmStateListener.java     | 32 +++++-------------
 6 files changed, 38 insertions(+), 81 deletions(-)

diff --git 
a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
 
b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
index 1e1251d8cdc..c506bd66cc9 100644
--- 
a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
+++ 
b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
@@ -25,15 +25,11 @@ import java.util.Map;
 import javax.inject.Inject;
 
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.apache.cloudstack.framework.events.EventDistributor;
 
 import com.cloud.event.EventCategory;
 import com.cloud.network.Network.Event;
 import com.cloud.network.Network.State;
-import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.fsm.StateListener;
 import com.cloud.utils.fsm.StateMachine2;
 
@@ -41,10 +37,8 @@ public class NetworkStateListener implements 
StateListener<State, Event, Network
 
     @Inject
     private ConfigurationDao _configDao;
-
-    private static EventBus s_eventBus = null;
-
-    private static final Logger s_logger = 
Logger.getLogger(NetworkStateListener.class);
+    @Inject
+    private EventDistributor eventDistributor;
 
     public NetworkStateListener(ConfigurationDao configDao) {
         _configDao = configDao;
@@ -72,11 +66,6 @@ public class NetworkStateListener implements 
StateListener<State, Event, Network
         boolean configValue = Boolean.parseBoolean(value);
         if(!configValue)
             return;
-        try {
-            s_eventBus = ComponentContext.getComponent(EventBus.class);
-        } catch (NoSuchBeanDefinitionException nbe) {
-            return; // no provider is configured to provide events bus, so 
just return
-        }
 
         String resourceName = getEntityFromClassName(Network.class.getName());
         org.apache.cloudstack.framework.events.Event eventMsg =
@@ -91,11 +80,8 @@ public class NetworkStateListener implements 
StateListener<State, Event, Network
         eventDescription.put("eventDateTime", eventDate);
 
         eventMsg.setDescription(eventDescription);
-        try {
-            s_eventBus.publish(eventMsg);
-        } catch (EventBusException e) {
-            s_logger.warn("Failed to publish state change event on the event 
bus.");
-        }
+
+        eventDistributor.publish(eventMsg);
     }
 
     private String getEntityFromClassName(String entityClassName) {
diff --git 
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
 
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
index 1b151eb363c..cf8e09a8370 100644
--- 
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
+++ 
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
@@ -45,6 +45,7 @@ public class EventDistributorImpl extends ManagerBase 
implements EventDistributo
 
     @Override
     public List<EventBusException> publish(Event event) {
+        LOGGER.info(String.format("publishing %s to %d event busses", (event 
== null ? "<none>" : event.getDescription()), eventBusses.size()));
         List<EventBusException> exceptions = new ArrayList<>();
         for (EventBus bus : eventBusses) {
             try {
diff --git 
a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
 
b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
index 78ec01344ca..d2f2fd2182a 100644
--- 
a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
+++ 
b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cloudstack.framework.events.EventDistributor;
 import org.apache.log4j.Logger;
 import org.springframework.beans.factory.NoSuchBeanDefinitionException;
 import org.springframework.stereotype.Component;
@@ -42,10 +43,14 @@ import com.cloud.server.ManagementService;
 import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.component.ComponentMethodInterceptor;
 
+import javax.inject.Inject;
+
 @Component
 public class EventUtils {
     private static final Logger s_logger = Logger.getLogger(EventUtils.class);
 
+    @Inject
+    private EventDistributor eventDistributor;
     protected static  EventBus s_eventBus = null;
 
     public EventUtils() {
diff --git 
a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java 
b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
index c68b05c5062..7a651526002 100644
--- a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
+++ b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
@@ -26,10 +26,8 @@ import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventDistributor;
 import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
 import org.springframework.stereotype.Component;
 
 import com.cloud.configuration.Config;
@@ -39,18 +37,18 @@ import com.cloud.storage.Snapshot;
 import com.cloud.storage.Snapshot.Event;
 import com.cloud.storage.Snapshot.State;
 import com.cloud.storage.SnapshotVO;
-import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.fsm.StateListener;
 import com.cloud.utils.fsm.StateMachine2;
 
 @Component
 public class SnapshotStateListener implements StateListener<State, Event, 
SnapshotVO> {
 
-    protected static EventBus s_eventBus = null;
     protected static ConfigurationDao s_configDao;
 
     @Inject
     private ConfigurationDao configDao;
+    @Inject
+    private EventDistributor eventDistributor;
 
     private static final Logger s_logger = 
Logger.getLogger(SnapshotStateListener.class);
 
@@ -83,11 +81,6 @@ public class SnapshotStateListener implements 
StateListener<State, Event, Snapsh
         if(!configValue) {
             return;
         }
-        try {
-            s_eventBus = ComponentContext.getComponent(EventBus.class);
-        } catch (NoSuchBeanDefinitionException nbe) {
-            return; // no provider is configured to provide events bus, so 
just return
-        }
 
         String resourceName = getEntityFromClassName(Snapshot.class.getName());
         org.apache.cloudstack.framework.events.Event eventMsg =
@@ -103,11 +96,7 @@ public class SnapshotStateListener implements 
StateListener<State, Event, Snapsh
         eventDescription.put("eventDateTime", eventDate);
 
         eventMsg.setDescription(eventDescription);
-        try {
-            s_eventBus.publish(eventMsg);
-        } catch (EventBusException e) {
-            s_logger.warn("Failed to publish state change event on the event 
bus.");
-        }
+        eventDistributor.publish(eventMsg);
     }
 
     private String getEntityFromClassName(String entityClassName) {
diff --git 
a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java 
b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
index d2a4dc93b2f..9f063fe8913 100644
--- a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
+++ b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
@@ -22,34 +22,35 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.inject.Inject;
+
+import com.cloud.configuration.Config;
+import com.cloud.event.EventCategory;
 import com.cloud.event.EventTypes;
 import com.cloud.event.UsageEventUtils;
+import com.cloud.server.ManagementService;
+import com.cloud.storage.Volume;
+import com.cloud.storage.Volume.Event;
+import com.cloud.storage.Volume.State;
+import com.cloud.utils.fsm.StateListener;
 import com.cloud.utils.fsm.StateMachine2;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
 import com.cloud.vm.dao.VMInstanceDao;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
 
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventDistributor;
 
-import com.cloud.configuration.Config;
-import com.cloud.event.EventCategory;
-import com.cloud.server.ManagementService;
-import com.cloud.storage.Volume;
-import com.cloud.storage.Volume.Event;
-import com.cloud.storage.Volume.State;
-import com.cloud.utils.component.ComponentContext;
-import com.cloud.utils.fsm.StateListener;
+import org.apache.log4j.Logger;
 
 public class VolumeStateListener implements StateListener<State, Event, 
Volume> {
 
-    protected static EventBus s_eventBus = null;
     protected ConfigurationDao _configDao;
     protected VMInstanceDao _vmInstanceDao;
 
+    @Inject
+    private EventDistributor eventDistributor;
+
     private static final Logger s_logger = 
Logger.getLogger(VolumeStateListener.class);
 
     public VolumeStateListener(ConfigurationDao configDao, VMInstanceDao 
vmInstanceDao) {
@@ -99,11 +100,6 @@ public class VolumeStateListener implements 
StateListener<State, Event, Volume>
         boolean configValue = Boolean.parseBoolean(value);
         if(!configValue)
             return;
-        try {
-            s_eventBus = ComponentContext.getComponent(EventBus.class);
-        } catch (NoSuchBeanDefinitionException nbe) {
-            return; // no provider is configured to provide events bus, so 
just return
-        }
 
         String resourceName = getEntityFromClassName(Volume.class.getName());
         org.apache.cloudstack.framework.events.Event eventMsg =
@@ -119,11 +115,7 @@ public class VolumeStateListener implements 
StateListener<State, Event, Volume>
         eventDescription.put("eventDateTime", eventDate);
 
         eventMsg.setDescription(eventDescription);
-        try {
-            s_eventBus.publish(eventMsg);
-        } catch (EventBusException e) {
-            s_logger.warn("Failed to state change event on the event bus.");
-        }
+        eventDistributor.publish(eventMsg);
     }
 
     private String getEntityFromClassName(String entityClassName) {
diff --git a/server/src/main/java/com/cloud/vm/UserVmStateListener.java 
b/server/src/main/java/com/cloud/vm/UserVmStateListener.java
index e9f7e7c5c72..3935f49c4c0 100644
--- a/server/src/main/java/com/cloud/vm/UserVmStateListener.java
+++ b/server/src/main/java/com/cloud/vm/UserVmStateListener.java
@@ -24,15 +24,6 @@ import java.util.Map;
 
 import javax.inject.Inject;
 
-import com.cloud.server.ManagementService;
-import com.cloud.utils.fsm.StateMachine2;
-import com.cloud.vm.dao.UserVmDao;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-
-import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-
 import com.cloud.configuration.Config;
 import com.cloud.event.EventCategory;
 import com.cloud.event.EventTypes;
@@ -41,11 +32,16 @@ import com.cloud.event.dao.UsageEventDao;
 import com.cloud.network.dao.NetworkDao;
 import com.cloud.network.dao.NetworkVO;
 import com.cloud.service.dao.ServiceOfferingDao;
-import com.cloud.utils.component.ComponentContext;
+import com.cloud.server.ManagementService;
 import com.cloud.utils.fsm.StateListener;
+import com.cloud.utils.fsm.StateMachine2;
 import com.cloud.vm.VirtualMachine.Event;
 import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.dao.NicDao;
+import com.cloud.vm.dao.UserVmDao;
+
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.events.EventDistributor;
 
 public class UserVmStateListener implements StateListener<State, 
VirtualMachine.Event, VirtualMachine> {
 
@@ -56,9 +52,7 @@ public class UserVmStateListener implements 
StateListener<State, VirtualMachine.
     @Inject protected UserVmDao _userVmDao;
     @Inject protected UserVmManager _userVmMgr;
     @Inject protected ConfigurationDao _configDao;
-    private static final Logger s_logger = 
Logger.getLogger(UserVmStateListener.class);
-
-    protected static EventBus s_eventBus = null;
+    @Inject private EventDistributor eventDistributor;
 
     public UserVmStateListener(UsageEventDao usageEventDao, NetworkDao 
networkDao, NicDao nicDao, ServiceOfferingDao offeringDao, UserVmDao userVmDao, 
UserVmManager userVmMgr,
             ConfigurationDao configDao) {
@@ -128,11 +122,6 @@ public class UserVmStateListener implements 
StateListener<State, VirtualMachine.
         boolean configValue = Boolean.parseBoolean(value);
         if(!configValue)
             return;
-        try {
-            s_eventBus = ComponentContext.getComponent(EventBus.class);
-        } catch (NoSuchBeanDefinitionException nbe) {
-            return; // no provider is configured to provide events bus, so 
just return
-        }
 
         String resourceName = 
getEntityFromClassName(VirtualMachine.class.getName());
         org.apache.cloudstack.framework.events.Event eventMsg =
@@ -149,12 +138,7 @@ public class UserVmStateListener implements 
StateListener<State, VirtualMachine.
         eventDescription.put("eventDateTime", eventDate);
 
         eventMsg.setDescription(eventDescription);
-        try {
-            s_eventBus.publish(eventMsg);
-        } catch (org.apache.cloudstack.framework.events.EventBusException e) {
-            s_logger.warn("Failed to publish state change event on the event 
bus.");
-        }
-
+        eventDistributor.publish(eventMsg);
     }
 
     private String getEntityFromClassName(String entityClassName) {

Reply via email to