This is an automated email from the ASF dual-hosted git repository.
dahn pushed a commit to branch
6778-if-kafka-is-turned-on-internal-subs-dont-work
in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to
refs/heads/6778-if-kafka-is-turned-on-internal-subs-dont-work by this push:
new 29208135620 events via the distributor
29208135620 is described below
commit 2920813562035a71b47e719a8d42fcb3198447f4
Author: Daan Hoogland <[email protected]>
AuthorDate: Tue Sep 26 13:17:27 2023 +0200
events via the distributor
---
.../com/cloud/network/NetworkStateListener.java | 24 +++-----------
.../framework/events/EventDistributorImpl.java | 1 +
.../network/contrail/management/EventUtils.java | 5 +++
.../storage/listener/SnapshotStateListener.java | 19 +++--------
.../storage/listener/VolumeStateListener.java | 38 +++++++++-------------
.../java/com/cloud/vm/UserVmStateListener.java | 32 +++++-------------
6 files changed, 38 insertions(+), 81 deletions(-)
diff --git
a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
index 1e1251d8cdc..c506bd66cc9 100644
---
a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
+++
b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
@@ -25,15 +25,11 @@ import java.util.Map;
import javax.inject.Inject;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.apache.cloudstack.framework.events.EventDistributor;
import com.cloud.event.EventCategory;
import com.cloud.network.Network.Event;
import com.cloud.network.Network.State;
-import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
@@ -41,10 +37,8 @@ public class NetworkStateListener implements
StateListener<State, Event, Network
@Inject
private ConfigurationDao _configDao;
-
- private static EventBus s_eventBus = null;
-
- private static final Logger s_logger =
Logger.getLogger(NetworkStateListener.class);
+ @Inject
+ private EventDistributor eventDistributor;
public NetworkStateListener(ConfigurationDao configDao) {
_configDao = configDao;
@@ -72,11 +66,6 @@ public class NetworkStateListener implements
StateListener<State, Event, Network
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
- try {
- s_eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so
just return
- }
String resourceName = getEntityFromClassName(Network.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
@@ -91,11 +80,8 @@ public class NetworkStateListener implements
StateListener<State, Event, Network
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (EventBusException e) {
- s_logger.warn("Failed to publish state change event on the event
bus.");
- }
+
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {
diff --git
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
index 1b151eb363c..cf8e09a8370 100644
---
a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
+++
b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
@@ -45,6 +45,7 @@ public class EventDistributorImpl extends ManagerBase
implements EventDistributo
@Override
public List<EventBusException> publish(Event event) {
+ LOGGER.info(String.format("publishing %s to %d event busses", (event
== null ? "<none>" : event.getDescription()), eventBusses.size()));
List<EventBusException> exceptions = new ArrayList<>();
for (EventBus bus : eventBusses) {
try {
diff --git
a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
index 78ec01344ca..d2f2fd2182a 100644
---
a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
+++
b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.ArrayList;
import java.util.List;
+import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
@@ -42,10 +43,14 @@ import com.cloud.server.ManagementService;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ComponentMethodInterceptor;
+import javax.inject.Inject;
+
@Component
public class EventUtils {
private static final Logger s_logger = Logger.getLogger(EventUtils.class);
+ @Inject
+ private EventDistributor eventDistributor;
protected static EventBus s_eventBus = null;
public EventUtils() {
diff --git
a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
index c68b05c5062..7a651526002 100644
--- a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
+++ b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
@@ -26,10 +26,8 @@ import javax.annotation.PostConstruct;
import javax.inject.Inject;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import com.cloud.configuration.Config;
@@ -39,18 +37,18 @@ import com.cloud.storage.Snapshot;
import com.cloud.storage.Snapshot.Event;
import com.cloud.storage.Snapshot.State;
import com.cloud.storage.SnapshotVO;
-import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
@Component
public class SnapshotStateListener implements StateListener<State, Event,
SnapshotVO> {
- protected static EventBus s_eventBus = null;
protected static ConfigurationDao s_configDao;
@Inject
private ConfigurationDao configDao;
+ @Inject
+ private EventDistributor eventDistributor;
private static final Logger s_logger =
Logger.getLogger(SnapshotStateListener.class);
@@ -83,11 +81,6 @@ public class SnapshotStateListener implements
StateListener<State, Event, Snapsh
if(!configValue) {
return;
}
- try {
- s_eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so
just return
- }
String resourceName = getEntityFromClassName(Snapshot.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
@@ -103,11 +96,7 @@ public class SnapshotStateListener implements
StateListener<State, Event, Snapsh
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (EventBusException e) {
- s_logger.warn("Failed to publish state change event on the event
bus.");
- }
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {
diff --git
a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
index d2a4dc93b2f..9f063fe8913 100644
--- a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
+++ b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
@@ -22,34 +22,35 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import javax.inject.Inject;
+
+import com.cloud.configuration.Config;
+import com.cloud.event.EventCategory;
import com.cloud.event.EventTypes;
import com.cloud.event.UsageEventUtils;
+import com.cloud.server.ManagementService;
+import com.cloud.storage.Volume;
+import com.cloud.storage.Volume.Event;
+import com.cloud.storage.Volume.State;
+import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.VMInstanceVO;
import com.cloud.vm.VirtualMachine;
import com.cloud.vm.dao.VMInstanceDao;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventDistributor;
-import com.cloud.configuration.Config;
-import com.cloud.event.EventCategory;
-import com.cloud.server.ManagementService;
-import com.cloud.storage.Volume;
-import com.cloud.storage.Volume.Event;
-import com.cloud.storage.Volume.State;
-import com.cloud.utils.component.ComponentContext;
-import com.cloud.utils.fsm.StateListener;
+import org.apache.log4j.Logger;
public class VolumeStateListener implements StateListener<State, Event,
Volume> {
- protected static EventBus s_eventBus = null;
protected ConfigurationDao _configDao;
protected VMInstanceDao _vmInstanceDao;
+ @Inject
+ private EventDistributor eventDistributor;
+
private static final Logger s_logger =
Logger.getLogger(VolumeStateListener.class);
public VolumeStateListener(ConfigurationDao configDao, VMInstanceDao
vmInstanceDao) {
@@ -99,11 +100,6 @@ public class VolumeStateListener implements
StateListener<State, Event, Volume>
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
- try {
- s_eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so
just return
- }
String resourceName = getEntityFromClassName(Volume.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
@@ -119,11 +115,7 @@ public class VolumeStateListener implements
StateListener<State, Event, Volume>
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (EventBusException e) {
- s_logger.warn("Failed to state change event on the event bus.");
- }
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {
diff --git a/server/src/main/java/com/cloud/vm/UserVmStateListener.java
b/server/src/main/java/com/cloud/vm/UserVmStateListener.java
index e9f7e7c5c72..3935f49c4c0 100644
--- a/server/src/main/java/com/cloud/vm/UserVmStateListener.java
+++ b/server/src/main/java/com/cloud/vm/UserVmStateListener.java
@@ -24,15 +24,6 @@ import java.util.Map;
import javax.inject.Inject;
-import com.cloud.server.ManagementService;
-import com.cloud.utils.fsm.StateMachine2;
-import com.cloud.vm.dao.UserVmDao;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-
-import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-
import com.cloud.configuration.Config;
import com.cloud.event.EventCategory;
import com.cloud.event.EventTypes;
@@ -41,11 +32,16 @@ import com.cloud.event.dao.UsageEventDao;
import com.cloud.network.dao.NetworkDao;
import com.cloud.network.dao.NetworkVO;
import com.cloud.service.dao.ServiceOfferingDao;
-import com.cloud.utils.component.ComponentContext;
+import com.cloud.server.ManagementService;
import com.cloud.utils.fsm.StateListener;
+import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.VirtualMachine.Event;
import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.dao.NicDao;
+import com.cloud.vm.dao.UserVmDao;
+
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.events.EventDistributor;
public class UserVmStateListener implements StateListener<State,
VirtualMachine.Event, VirtualMachine> {
@@ -56,9 +52,7 @@ public class UserVmStateListener implements
StateListener<State, VirtualMachine.
@Inject protected UserVmDao _userVmDao;
@Inject protected UserVmManager _userVmMgr;
@Inject protected ConfigurationDao _configDao;
- private static final Logger s_logger =
Logger.getLogger(UserVmStateListener.class);
-
- protected static EventBus s_eventBus = null;
+ @Inject private EventDistributor eventDistributor;
public UserVmStateListener(UsageEventDao usageEventDao, NetworkDao
networkDao, NicDao nicDao, ServiceOfferingDao offeringDao, UserVmDao userVmDao,
UserVmManager userVmMgr,
ConfigurationDao configDao) {
@@ -128,11 +122,6 @@ public class UserVmStateListener implements
StateListener<State, VirtualMachine.
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
- try {
- s_eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so
just return
- }
String resourceName =
getEntityFromClassName(VirtualMachine.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
@@ -149,12 +138,7 @@ public class UserVmStateListener implements
StateListener<State, VirtualMachine.
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (org.apache.cloudstack.framework.events.EventBusException e) {
- s_logger.warn("Failed to publish state change event on the event
bus.");
- }
-
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {