Repository: stratos Updated Branches: refs/heads/master 418ed02e8 -> c90eb9a72
making TopologyEventReceiver and ApplicationSignupEventReceiver a singleton Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/78db9f1e Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/78db9f1e Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/78db9f1e Branch: refs/heads/master Commit: 78db9f1ec301581d2deccd824bd4e4de43e9b21d Parents: 418ed02 Author: Isuru Haththotuwa <isu...@apache.org> Authored: Wed Dec 2 15:33:21 2015 +0530 Committer: Isuru Haththotuwa <isu...@apache.org> Committed: Thu Dec 24 20:03:42 2015 +0530 ---------------------------------------------------------------------- .../AutoscalerTopologyEventReceiver.java | 18 +++---- .../stratos/cartridge/agent/CartridgeAgent.java | 54 ++++++++++---------- .../agent/CartridgeAgentEventListeners.java | 43 ++++++++-------- .../agent/test/JavaCartridgeAgentTest.java | 6 +-- ...cerCommonApplicationSignUpEventReceiver.java | 11 ++-- ...LoadBalancerCommonTopologyEventReceiver.java | 37 +++++++------- .../extension/api/LoadBalancerExtension.java | 2 - .../internal/LoadBalancerServiceComponent.java | 18 +++---- .../StratosManagerServiceComponent.java | 4 +- .../StratosManagerTopologyEventReceiver.java | 18 +++---- .../message/receiver/StratosEventReceiver.java | 30 +++++++++++ .../signup/ApplicationSignUpEventReceiver.java | 37 ++++++++++---- .../topology/TopologyEventReceiver.java | 39 ++++++++++---- .../mock/iaas/services/impl/MockInstance.java | 6 +-- 14 files changed, 193 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index 8336f86..6fd64a7 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -60,18 +60,18 @@ public class AutoscalerTopologyEventReceiver { private ExecutorService executorService; public AutoscalerTopologyEventReceiver() { - this.topologyEventReceiver = new TopologyEventReceiver(); + this.topologyEventReceiver = TopologyEventReceiver.getInstance(); addEventListeners(); } - public void execute() { - //FIXME this activated before autoscaler deployer activated. - topologyEventReceiver.setExecutorService(getExecutorService()); - topologyEventReceiver.execute(); - if (log.isInfoEnabled()) { - log.info("Autoscaler topology receiver thread started"); - } - } +// public void execute() { +// //FIXME this activated before autoscaler deployer activated. +// // topologyEventReceiver.setExecutorService(getExecutorService()); +// //topologyEventReceiver.execute(); +// if (log.isInfoEnabled()) { +// log.info("Autoscaler topology receiver thread started"); +// } +// } private void addEventListeners() { // Listen to topology events that affect clusters http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java index 91f596e..18e6e0a 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java @@ -103,10 +103,10 @@ public class CartridgeAgent implements Runnable { } */ // Start application event receiver thread - registerApplicationEventListeners(); - if (log.isInfoEnabled()) { - log.info("Cartridge agent registering all event listeners ... done"); - } + //registerApplicationEventListeners(); +// if (log.isInfoEnabled()) { +// log.info("Cartridge agent registering all event listeners ... done"); +// } // Execute instance started shell script extensionHandler.onInstanceStartedEvent(); @@ -197,29 +197,29 @@ public class CartridgeAgent implements Runnable { } } - protected void registerTenantEventListeners() { - if (log.isDebugEnabled()) { - log.debug("registerTenantEventListeners before"); - } - - eventListenerns.startTenantEventReceiver(); - - if (log.isDebugEnabled()) { - log.debug("registerTenantEventListeners after"); - } - } - - protected void registerApplicationEventListeners() { - if (log.isDebugEnabled()) { - log.debug("registerApplicationListeners before"); - } - - eventListenerns.startApplicationsEventReceiver(); - - if (log.isDebugEnabled()) { - log.debug("registerApplicationEventListeners after"); - } - } +// protected void registerTenantEventListeners() { +// if (log.isDebugEnabled()) { +// log.debug("registerTenantEventListeners before"); +// } +// +// eventListenerns.startTenantEventReceiver(); +// +// if (log.isDebugEnabled()) { +// log.debug("registerTenantEventListeners after"); +// } +// } + +// protected void registerApplicationEventListeners() { +// if (log.isDebugEnabled()) { +// log.debug("registerApplicationListeners before"); +// } +// +// eventListenerns.startApplicationsEventReceiver(); +// +// if (log.isDebugEnabled()) { +// log.debug("registerApplicationEventListeners after"); +// } +// } protected void validateRequiredSystemProperties() { String jndiPropertiesDir = System.getProperty(CartridgeAgentConstants.JNDI_PROPERTIES_DIR); http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java index e6bb41b..103d2c7 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java @@ -72,11 +72,10 @@ public class CartridgeAgentEventListeners { if (log.isDebugEnabled()) { log.debug("Creating cartridge agent event listeners..."); } - this.applicationsEventReceiver = new ApplicationSignUpEventReceiver(); - this.applicationsEventReceiver.setExecutorService(eventListenerExecutorService); + this.applicationsEventReceiver = ApplicationSignUpEventReceiver.getInstance(); - this.topologyEventReceiver = new TopologyEventReceiver(); - this.topologyEventReceiver.setExecutorService(eventListenerExecutorService); + this.topologyEventReceiver = TopologyEventReceiver.getInstance(); + //this.topologyEventReceiver.setExecutorService(eventListenerExecutorService); this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver(); @@ -151,24 +150,24 @@ public class CartridgeAgentEventListeners { } - public void startApplicationsEventReceiver() { - - if (log.isDebugEnabled()) { - log.debug("Starting cartridge agent application event message receiver"); - } - - eventListenerExecutorService.submit(new Runnable() { - @Override - public void run() { - applicationsEventReceiver.execute(); - } - }); - - if (log.isInfoEnabled()) { - log.info("Cartridge agent application receiver thread started, waiting for event messages ..."); - } - - } +// public void startApplicationsEventReceiver() { +// +// if (log.isDebugEnabled()) { +// log.debug("Starting cartridge agent application event message receiver"); +// } +// +// eventListenerExecutorService.submit(new Runnable() { +// @Override +// public void run() { +// applicationsEventReceiver.execute(); +// } +// }); +// +// if (log.isInfoEnabled()) { +// log.info("Cartridge agent application receiver thread started, waiting for event messages ..."); +// } +// +// } private void addInstanceNotifierEventListeners() { http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java index 26b8728..430e0b8 100644 --- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java +++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java @@ -107,9 +107,9 @@ public class JavaCartridgeAgentTest { String agentHome = setupJavaAgent(); ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 5); - topologyEventReceiver = new TopologyEventReceiver(); - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); + topologyEventReceiver = TopologyEventReceiver.getInstance(); + //topologyEventReceiver.setExecutorService(executorService); + //topologyEventReceiver.execute(); instanceStatusEventReceiver = new InstanceStatusEventReceiver(); instanceStatusEventReceiver.setExecutorService(executorService); http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java index d5819dc..95c4867 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java @@ -41,19 +41,20 @@ import org.apache.stratos.messaging.message.receiver.application.signup.Applicat * Load balancer common application signup event receiver updates the topology in the given topology provider * with the hostnames found in application signup events. */ -public class LoadBalancerCommonApplicationSignUpEventReceiver extends ApplicationSignUpEventReceiver { +public class LoadBalancerCommonApplicationSignUpEventReceiver { private static final Log log = LogFactory.getLog(LoadBalancerCommonApplicationSignUpEventReceiver.class); - + private ApplicationSignUpEventReceiver applicationSignUpEventReceiver; private TopologyProvider topologyProvider; public LoadBalancerCommonApplicationSignUpEventReceiver(TopologyProvider topologyProvider) { + this.applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance(); this.topologyProvider = topologyProvider; addEventListeners(); } private void addEventListeners() { - addEventListener(new CompleteApplicationSignUpsEventListener() { + applicationSignUpEventReceiver.addEventListener(new CompleteApplicationSignUpsEventListener() { private boolean initialized = false; @Override @@ -96,7 +97,7 @@ public class LoadBalancerCommonApplicationSignUpEventReceiver extends Applicatio } }); - addEventListener(new ApplicationSignUpAddedEventListener() { + applicationSignUpEventReceiver.addEventListener(new ApplicationSignUpAddedEventListener() { @Override protected void onEvent(Event event) { try { @@ -110,7 +111,7 @@ public class LoadBalancerCommonApplicationSignUpEventReceiver extends Applicatio } }); - addEventListener(new ApplicationSignUpRemovedEventListener() { + applicationSignUpEventReceiver.addEventListener(new ApplicationSignUpRemovedEventListener() { @Override protected void onEvent(Event event) { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java index 4fb45a9..85142e3 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java @@ -38,15 +38,17 @@ import java.util.Properties; * Load balancer common topology receiver updates the topology in the given topology provider * according to topology events. */ -public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiver { +public class LoadBalancerCommonTopologyEventReceiver { private static final Log log = LogFactory.getLog(LoadBalancerCommonTopologyEventReceiver.class); private TopologyProvider topologyProvider; private boolean initialized; + private TopologyEventReceiver topologyEventReceiver; public LoadBalancerCommonTopologyEventReceiver(TopologyProvider topologyProvider) { this.topologyProvider = topologyProvider; + this.topologyEventReceiver = TopologyEventReceiver.getInstance(); addEventListeners(); } @@ -57,12 +59,12 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv } } - public void execute() { - super.execute(); - if (log.isInfoEnabled()) { - log.info("Load balancer topology receiver thread started"); - } - } +// public void execute() { +// super.execute(); +// if (log.isInfoEnabled()) { +// log.info("Load balancer topology receiver thread started"); +// } +// } public void initializeTopology() { if (initialized) { @@ -115,7 +117,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv * Add default event listeners for updating the topology on topology events */ public void addEventListeners() { - addEventListener(new CompleteTopologyEventListener() { + topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { @Override protected void onEvent(Event event) { if (!initialized) { @@ -124,7 +126,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv } }); - addEventListener(new MemberActivatedEventListener() { + topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { @@ -142,11 +144,10 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv if (networkPartitionIdFilter != null && !networkPartitionIdFilter.equals("")) { if (memberActivatedEvent.getNetworkPartitionId().equals(networkPartitionIdFilter)) { addMember(serviceName, clusterId, memberId); - } - else{ + } else { log.debug(String.format("Member exists in a different network partition." + - "[member id] %s [member network partition] %s [filter network partition] %s ", - memberId,memberActivatedEvent.getNetworkPartitionId(),networkPartitionIdFilter)); + "[member id] %s [member network partition] %s [filter network partition] %s ", + memberId, memberActivatedEvent.getNetworkPartitionId(), networkPartitionIdFilter)); } } else { addMember(serviceName, clusterId, memberId); @@ -159,7 +160,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv } }); - addEventListener(new MemberMaintenanceListener() { + topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { @Override protected void onEvent(Event event) { @@ -181,7 +182,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv } }); - addEventListener(new MemberSuspendedEventListener() { + topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() { @Override protected void onEvent(Event event) { @@ -203,7 +204,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv } }); - addEventListener(new MemberTerminatedEventListener() { + topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { @Override protected void onEvent(Event event) { @@ -224,7 +225,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv } }); - addEventListener(new ClusterRemovedEventListener() { + topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() { @Override protected void onEvent(Event event) { @@ -253,7 +254,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv } }); - addEventListener(new ServiceRemovedEventListener() { + topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() { @Override protected void onEvent(Event event) { http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java index ae2b6dd..e7a2071 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java @@ -201,8 +201,6 @@ public class LoadBalancerExtension { */ private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider); - applicationSignUpEventReceiver.setExecutorService(executorService); - applicationSignUpEventReceiver.execute(); if (log.isInfoEnabled()) { log.info("Application signup event receiver thread started"); } http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index a7761cd..cb74984 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -186,8 +186,8 @@ public class LoadBalancerServiceComponent { } applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider); - applicationSignUpEventReceiver.setExecutorService(executorService); - applicationSignUpEventReceiver.execute(); +// applicationSignUpEventReceiver.setExecutorService(executorService); +// applicationSignUpEventReceiver.execute(); if (log.isInfoEnabled()) { log.info("Application signup event receiver thread started"); } @@ -266,13 +266,13 @@ public class LoadBalancerServiceComponent { } // Terminate application signup event receiver - if (applicationSignUpEventReceiver != null) { - try { - applicationSignUpEventReceiver.terminate(); - } catch (Exception e) { - log.warn("An error occurred while terminating application signup event receiver", e); - } - } +// if (applicationSignUpEventReceiver != null) { +// try { +// applicationSignUpEventReceiver.terminate(); +// } catch (Exception e) { +// log.warn("An error occurred while terminating application signup event receiver", e); +// } +// } // Terminate domain mapping event receiver if (domainMappingEventReceiver != null) { http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java index 47f401a..76d39a7 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java @@ -211,8 +211,8 @@ public class StratosManagerServiceComponent { */ private void initializeTopologyEventReceiver() { topologyEventReceiver = new StratosManagerTopologyEventReceiver(); - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); +// topologyEventReceiver.setExecutorService(executorService); +// topologyEventReceiver.execute(); } /** http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java index 08ca3d6..51b21ac 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java @@ -23,19 +23,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; -public class StratosManagerTopologyEventReceiver extends TopologyEventReceiver { +public class StratosManagerTopologyEventReceiver { private static final Log log = LogFactory.getLog(StratosManagerTopologyEventReceiver.class); public StratosManagerTopologyEventReceiver() { } - @Override - public void execute() { - super.execute(); - - if (log.isInfoEnabled()) { - log.info("Stratos manager topology event receiver thread started"); - } - } +// @Override +// public void execute() { +// super.execute(); +// +// if (log.isInfoEnabled()) { +// log.info("Stratos manager topology event receiver thread started"); +// } +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java new file mode 100644 index 0000000..5ac89e6 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.receiver; + +import java.util.concurrent.ExecutorService; + +public class StratosEventReceiver { + + protected ExecutorService executorService; + + public StratosEventReceiver () { + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java index 55e3fd1..dde214d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java @@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.application.signup; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent; import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; import java.util.concurrent.ExecutorService; @@ -33,26 +35,41 @@ import java.util.concurrent.ExecutorService; /** * Application signup event receiver. */ -public class ApplicationSignUpEventReceiver { +public class ApplicationSignUpEventReceiver extends StratosEventReceiver { private static final Log log = LogFactory.getLog(ApplicationSignUpEventReceiver.class); private ApplicationSignUpEventMessageDelegator messageDelegator; private ApplicationSignUpEventMessageListener messageListener; private EventSubscriber eventSubscriber; - private ExecutorService executorService; + private static volatile ApplicationSignUpEventReceiver instance; - public ApplicationSignUpEventReceiver() { + private ApplicationSignUpEventReceiver() { + // TODO: make pool size configurable + this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100); ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue(); this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue); this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue); + execute(); + } + + public static ApplicationSignUpEventReceiver getInstance () { + if (instance == null) { + synchronized (ApplicationSignUpEventReceiver.class) { + if (instance == null) { + instance = new ApplicationSignUpEventReceiver(); + } + } + } + + return instance; } public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } - public void execute() { + private void execute() { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(), @@ -103,11 +120,11 @@ public class ApplicationSignUpEventReceiver { messageDelegator.terminate(); } - public ExecutorService getExecutorService() { - return executorService; - } +// public ExecutorService getExecutorService() { +// return executorService; +// } - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public void setExecutorService(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java index b841d0a..50e078a 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java @@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent; import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; import java.util.concurrent.ExecutorService; @@ -34,26 +36,41 @@ import java.util.concurrent.ExecutorService; * A thread for receiving topology information from message broker and * build topology in topology manager. */ -public class TopologyEventReceiver { +public class TopologyEventReceiver extends StratosEventReceiver { private static final Log log = LogFactory.getLog(TopologyEventReceiver.class); private TopologyEventMessageDelegator messageDelegator; private TopologyEventMessageListener messageListener; private EventSubscriber eventSubscriber; - private ExecutorService executorService; + private static volatile TopologyEventReceiver instance; - public TopologyEventReceiver() { + private TopologyEventReceiver() { + // TODO: make pool size configurable + this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue(); this.messageDelegator = new TopologyEventMessageDelegator(messageQueue); this.messageListener = new TopologyEventMessageListener(messageQueue); + execute(); + } + + public static TopologyEventReceiver getInstance () { + if (instance == null) { + synchronized (TopologyEventReceiver.class) { + if (instance == null) { + instance = new TopologyEventReceiver(); + } + } + } + + return instance; } public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } - public void execute() { + private void execute() { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener); @@ -101,11 +118,11 @@ public class TopologyEventReceiver { }); } - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public ExecutorService getExecutorService() { +// return executorService; +// } +// +// public void setExecutorService(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/78db9f1e/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java index 7b31861..c752f9e 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java @@ -108,7 +108,7 @@ public class MockInstance implements Serializable { } private void startTopologyEventReceiver() { - topologyEventReceiver = new TopologyEventReceiver(); + topologyEventReceiver = TopologyEventReceiver.getInstance(); topologyEventReceiver.addEventListener(new MemberInitializedEventListener() { @Override protected void onEvent(Event event) { @@ -151,8 +151,8 @@ public class MockInstance implements Serializable { } } }); - topologyEventReceiver.setExecutorService(eventListenerExecutorService); - topologyEventReceiver.execute(); +// topologyEventReceiver.setExecutorService(eventListenerExecutorService); +// topologyEventReceiver.execute(); if (log.isDebugEnabled()) { log.debug(String.format( "Mock instance topology event message receiver started for mock member [member-id] %s",