Closing STRATOS-1544, STRATOS-1612, STRATOS-1611: topology, tenant, application model initialize optimization
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/60b80114 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/60b80114 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/60b80114 Branch: refs/heads/stratos-4.1.x Commit: 60b801144dcb05e3664353c412bc1c3fffd7c55c Parents: 00f624b Author: Akila Perera <[email protected]> Authored: Wed Nov 11 15:15:12 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Wed Nov 11 15:15:12 2015 +0530 ---------------------------------------------------------------------- .../ApplicationEventSynchronizer.java | 9 +- .../applications/topic/ApplicationBuilder.java | 13 -- .../topic/ApplicationsEventPublisher.java | 11 +- .../AutoscalerInitializerTopicReceiver.java | 72 ++++++++++ .../AutoscalerTopologyEventReceiver.java | 5 +- .../internal/AutoscalerServiceComponent.java | 81 +++++------ .../CloudControllerServiceComponent.java | 51 +++---- .../publisher/TopologyEventPublisher.java | 15 +- .../publisher/TopologyEventSynchronizer.java | 8 +- .../initializer/InitializerTopicReceiver.java | 72 ++++++++++ .../messaging/topology/TopologyBuilder.java | 144 +++++++++---------- .../messaging/topology/TopologyHolder.java | 118 +++++++++++++++ .../messaging/topology/TopologyManager.java | 118 --------------- .../impl/CloudControllerServiceImpl.java | 12 +- .../StratosManagerServiceComponent.java | 95 ++++++------ .../synchronizer/TenantEventSynchronizer.java | 6 +- .../StratosManagerInitializerTopicReceiver.java | 91 ++++++++++++ .../CompleteApplicationSignUpsRequestEvent.java | 26 ++++ .../CompleteApplicationsRequestEvent.java | 26 ++++ .../initializer/CompleteTenantRequestEvent.java | 26 ++++ .../CompleteTopologyRequestEvent.java | 27 ++++ .../event/initializer/InitializerEvent.java | 26 ++++ ...eApplicationSignUpsRequestEventListener.java | 24 ++++ ...ompleteApplicationsRequestEventListener.java | 24 ++++ .../CompleteTenantRequestEventListener.java | 24 ++++ .../CompleteTopologyRequestEventListener.java | 26 ++++ .../CompleteApplicationsMessageProcessor.java | 3 + ...plicationSignUpsRequestMessageProcessor.java | 54 +++++++ ...leteApplicationsRequestMessageProcessor.java | 54 +++++++ .../CompleteTenantRequestMessageProcessor.java | 53 +++++++ ...CompleteTopologyRequestMessageProcessor.java | 54 +++++++ .../InitializerMessageProcessorChain.java | 70 +++++++++ .../application/ApplicationsEventReceiver.java | 32 ++++- .../signup/ApplicationSignUpEventReceiver.java | 30 +++- .../InitializerEventMessageDelegator.java | 88 ++++++++++++ .../InitializerEventMessageListener.java | 48 +++++++ .../InitializerEventMessageQueue.java | 26 ++++ .../initializer/InitializerEventReceiver.java | 78 ++++++++++ .../receiver/tenant/TenantEventReceiver.java | 24 +++- .../topology/TopologyEventReceiver.java | 27 +++- .../stratos/messaging/util/MessagingUtil.java | 3 +- .../cartridge.agent/cartridge.agent/agent.py | 19 +-- .../cartridge.agent/constants.py | 3 + .../modules/event/instance/status/events.py | 18 ++- .../cartridge.agent/publisher.py | 22 ++- .../integration/tests/AgentStartupTestCase.java | 83 +++++++---- .../tests/PythonAgentIntegrationTest.java | 56 ++++---- .../integration/common/TopologyHandler.java | 111 +++++++++++--- .../src/test/resources/common/log4j.properties | 2 +- 49 files changed, 1648 insertions(+), 460 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java index fc7a528..562a6cb 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationEventSynchronizer.java @@ -21,7 +21,7 @@ package org.apache.stratos.autoscaler.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder; +import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; public class ApplicationEventSynchronizer implements Runnable { @@ -30,11 +30,8 @@ public class ApplicationEventSynchronizer implements Runnable { @Override public void run() { if (log.isDebugEnabled()) { - log.debug("Executing topology synchronization task"); - } - // publish to the topic - if (ApplicationHolder.getApplications() != null) { - ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications()); + log.debug("Executing applications synchronization task"); } + ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications()); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java index 165c4f8..5fd4d5a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java @@ -57,19 +57,6 @@ import java.util.Set; public class ApplicationBuilder { private static final Log log = LogFactory.getLog(ApplicationBuilder.class); - public static synchronized void handleCompleteApplication(Applications applications) { - if (log.isDebugEnabled()) { - log.debug("Handling complete application event"); - } - - try { - ApplicationHolder.acquireReadLock(); - ApplicationsEventPublisher.sendCompleteApplicationsEvent(applications); - } finally { - ApplicationHolder.releaseReadLock(); - } - } - /** * Create application clusters in cloud controller and send application created event. * http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java index f66e525..2ec6e78 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java @@ -20,6 +20,7 @@ package org.apache.stratos.autoscaler.applications.topic; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.application.Application; @@ -40,7 +41,15 @@ public class ApplicationsEventPublisher { private static final Log log = LogFactory.getLog(ApplicationsEventPublisher.class); public static void sendCompleteApplicationsEvent(Applications completeApplications) { - publishEvent(new CompleteApplicationsEvent(completeApplications)); + ApplicationHolder.acquireReadLock(); + try{ + if (log.isDebugEnabled()) { + log.debug("Publishing complete applications event..."); + } + publishEvent(new CompleteApplicationsEvent(completeApplications)); + }finally { + ApplicationHolder.releaseReadLock(); + } } public static void sendApplicationCreatedEvent(Application application) { http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java new file mode 100644 index 0000000..da6b270 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.event.receiver.initializer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; +import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.listener.initializer.CompleteApplicationsRequestEventListener; +import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver; + +import java.util.concurrent.ExecutorService; + +public class AutoscalerInitializerTopicReceiver { + private static final Log log = LogFactory.getLog(AutoscalerInitializerTopicReceiver.class); + private InitializerEventReceiver initializerEventReceiver; + private ExecutorService executorService; + + public AutoscalerInitializerTopicReceiver() { + this.initializerEventReceiver = new InitializerEventReceiver(); + addEventListeners(); + } + + public void execute() { + initializerEventReceiver.setExecutorService(executorService); + initializerEventReceiver.execute(); + if (log.isInfoEnabled()) { + log.info("Cloud controller initializer topic receiver started"); + } + } + + private void addEventListeners() { + initializerEventReceiver.addEventListener(new CompleteApplicationsRequestEventListener() { + @Override + protected void onEvent(Event event) { + if (log.isDebugEnabled()) { + log.debug("Handling CompleteApplicationsRequestEvent"); + } + try { + ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications()); + } catch (Exception e) { + log.error("Failed to process CompleteApplicationsRequestEvent", e); + } + } + }); + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index ef31b97..500b95a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -35,6 +35,8 @@ import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent; import org.apache.stratos.autoscaler.util.AutoscalerUtil; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.application.Application; import org.apache.stratos.messaging.domain.application.Applications; import org.apache.stratos.messaging.domain.instance.ClusterInstance; @@ -42,10 +44,12 @@ import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent; import org.apache.stratos.messaging.event.topology.*; import org.apache.stratos.messaging.listener.topology.*; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.MessagingUtil; import java.util.concurrent.ExecutorService; @@ -75,7 +79,6 @@ public class AutoscalerTopologyEventReceiver { if (log.isInfoEnabled()) { log.info("Autoscaler topology receiver thread started"); } - } private void addEventListeners() { http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java index 48ee481..5011dd2 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java @@ -26,6 +26,7 @@ import org.apache.stratos.autoscaler.algorithms.networkpartition.NetworkPartitio import org.apache.stratos.autoscaler.applications.ApplicationEventSynchronizer; import org.apache.stratos.autoscaler.context.AutoscalerContext; import org.apache.stratos.autoscaler.event.receiver.health.AutoscalerHealthStatEventReceiver; +import org.apache.stratos.autoscaler.event.receiver.initializer.AutoscalerInitializerTopicReceiver; import org.apache.stratos.autoscaler.event.receiver.topology.AutoscalerTopologyEventReceiver; import org.apache.stratos.autoscaler.exception.AutoScalerException; import org.apache.stratos.autoscaler.exception.AutoScalingPolicyAlreadyExistException; @@ -44,7 +45,6 @@ import org.apache.stratos.autoscaler.util.AutoscalerUtil; import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.common.Component; -import org.apache.stratos.common.services.ComponentActivationEventListener; import org.apache.stratos.common.services.ComponentStartUpSynchronizer; import org.apache.stratos.common.services.DistributedObjectProvider; import org.apache.stratos.common.threading.StratosThreadPool; @@ -68,9 +68,11 @@ import java.util.concurrent.TimeUnit; * cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService" * @scr.reference name="hazelcast.instance.service" interface="com.hazelcast.core.HazelcastInstance" * cardinality="0..1"policy="dynamic" bind="setHazelcastInstance" unbind="unsetHazelcastInstance" - * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider" + * @scr.reference name="distributedObjectProvider" + * interface="org.apache.stratos.common.services.DistributedObjectProvider" * cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider" - * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer" + * @scr.reference name="componentStartUpSynchronizer" + * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer" * cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer" * @scr.reference name="config.context.service" interface="org.wso2.carbon.utils.ConfigurationContextService" * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService" @@ -81,6 +83,7 @@ public class AutoscalerServiceComponent { private static final String AUTOSCALER_COORDINATOR_LOCK = "AUTOSCALER_COORDINATOR_LOCK"; private AutoscalerTopologyEventReceiver asTopologyReceiver; private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver; + private AutoscalerInitializerTopicReceiver autoscalerInitializerTopicReceiver; private ExecutorService executorService; private ScheduledExecutorService scheduler; @@ -90,25 +93,25 @@ public class AutoscalerServiceComponent { } try { XMLConfiguration conf = ConfUtil.getInstance(AutoscalerConstants.COMPONENTS_CONFIG).getConfiguration(); - int threadPoolSize = conf.getInt(AutoscalerConstants.THREAD_POOL_SIZE_KEY, - AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE); - executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, - threadPoolSize); + int threadPoolSize = conf + .getInt(AutoscalerConstants.THREAD_POOL_SIZE_KEY, AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE); + executorService = StratosThreadPool + .getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, threadPoolSize); int schedulerThreadPoolSize = conf.getInt(AutoscalerConstants.SCHEDULER_THREAD_POOL_SIZE_KEY, AutoscalerConstants.AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE); - scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID, - schedulerThreadPoolSize); + scheduler = StratosThreadPool + .getScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID, schedulerThreadPoolSize); Runnable autoscalerActivator = new Runnable() { @Override public void run() { try { - ComponentStartUpSynchronizer componentStartUpSynchronizer = - ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer(); + ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance() + .getComponentStartUpSynchronizer(); // Wait for cloud controller component to be activated - componentStartUpSynchronizer.waitForComponentActivation(Component.Autoscaler, - Component.CloudController); + componentStartUpSynchronizer + .waitForComponentActivation(Component.Autoscaler, Component.CloudController); ServiceReferenceHolder.getInstance().setExecutorService(executorService); @@ -119,8 +122,8 @@ public class AutoscalerServiceComponent { ServiceReferenceHolder.getInstance().getHazelcastInstance() .getLock(AUTOSCALER_COORDINATOR_LOCK).lock(); - log.info("Elected this member [" + ServiceReferenceHolder.getInstance().getHazelcastInstance() - .getCluster().getLocalMember().getUuid() + "] " + + log.info("Elected this member [" + ServiceReferenceHolder.getInstance() + .getHazelcastInstance().getCluster().getLocalMember().getUuid() + "] " + "as the autoscaler coordinator for the cluster"); AutoscalerContext.getInstance().setCoordinator(true); @@ -136,8 +139,8 @@ public class AutoscalerServiceComponent { } else { executeCoordinatorTasks(); } - componentStartUpSynchronizer.waitForAxisServiceActivation(Component.Autoscaler, - "AutoscalerService"); + componentStartUpSynchronizer + .waitForAxisServiceActivation(Component.Autoscaler, "AutoscalerService"); componentStartUpSynchronizer.setComponentStatus(Component.Autoscaler, true); if (log.isInfoEnabled()) { log.info("Autoscaler service component activated"); @@ -154,14 +157,14 @@ public class AutoscalerServiceComponent { } } - private void executeCoordinatorTasks() throws InvalidPolicyException, - InvalidDeploymentPolicyException, InvalidApplicationPolicyException, AutoScalingPolicyAlreadyExistException { + private void executeCoordinatorTasks() + throws InvalidPolicyException, InvalidDeploymentPolicyException, InvalidApplicationPolicyException, + AutoScalingPolicyAlreadyExistException { // Start topology receiver asTopologyReceiver = new AutoscalerTopologyEventReceiver(); asTopologyReceiver.setExecutorService(executorService); asTopologyReceiver.execute(); - if (log.isDebugEnabled()) { log.debug("Topology receiver executor service started"); } @@ -174,6 +177,14 @@ public class AutoscalerServiceComponent { log.debug("Health statistics receiver thread started"); } + // Start initializer receiver + autoscalerInitializerTopicReceiver = new AutoscalerInitializerTopicReceiver(); + autoscalerInitializerTopicReceiver.setExecutorService(executorService); + autoscalerInitializerTopicReceiver.execute(); + if (log.isDebugEnabled()) { + log.debug("Initializer receiver thread started"); + } + // Add AS policies to information model List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies(); Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator(); @@ -191,7 +202,6 @@ public class AutoscalerServiceComponent { PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy); } - // Add application policies to information model List<ApplicationPolicy> applicationPolicies = RegistryManager.getInstance(). retrieveApplicationPolicies(); @@ -202,9 +212,10 @@ public class AutoscalerServiceComponent { } // Add application policies to information model - List<NetworkPartitionAlgorithmContext> networkPartitionAlgorithmContexts = - RegistryManager.getInstance().retrieveNetworkPartitionAlgorithmContexts(); - Iterator<NetworkPartitionAlgorithmContext> networkPartitionAlgoCtxtIterator = networkPartitionAlgorithmContexts.iterator(); + List<NetworkPartitionAlgorithmContext> networkPartitionAlgorithmContexts = RegistryManager.getInstance() + .retrieveNetworkPartitionAlgorithmContexts(); + Iterator<NetworkPartitionAlgorithmContext> networkPartitionAlgoCtxtIterator = networkPartitionAlgorithmContexts + .iterator(); while (networkPartitionAlgoCtxtIterator.hasNext()) { NetworkPartitionAlgorithmContext algorithmContext = networkPartitionAlgoCtxtIterator.next(); AutoscalerContext.getInstance().addNetworkPartitionAlgorithmContext(algorithmContext); @@ -223,24 +234,6 @@ public class AutoscalerServiceComponent { if (log.isInfoEnabled()) { log.info("Scheduling tasks to publish applications"); } - - ComponentStartUpSynchronizer componentStartUpSynchronizer = - ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer(); - if (componentStartUpSynchronizer.isEnabled()) { - componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() { - @Override - public void activated(Component component) { - if (component == Component.StratosManager) { - scheduleEventSynchronizers(); - } - } - }); - } else { - scheduleEventSynchronizers(); - } - } - - private void scheduleEventSynchronizers() { Runnable applicationSynchronizer = new ApplicationEventSynchronizer(); scheduler.scheduleAtFixedRate(applicationSynchronizer, 0, 1, TimeUnit.MINUTES); } @@ -332,8 +325,8 @@ public class AutoscalerServiceComponent { } protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) { - ServiceReferenceHolder.getInstance().setAxisConfiguration( - cfgCtxService.getServerConfigContext().getAxisConfiguration()); + ServiceReferenceHolder.getInstance() + .setAxisConfiguration(cfgCtxService.getServerConfigContext().getAxisConfiguration()); } protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) { http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index 2e21e8e..808ac5c 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -29,11 +29,11 @@ import org.apache.stratos.cloud.controller.exception.CloudControllerException; import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventSynchronizer; import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationEventReceiver; import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver; +import org.apache.stratos.cloud.controller.messaging.receiver.initializer.InitializerTopicReceiver; import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver; import org.apache.stratos.cloud.controller.services.CloudControllerService; import org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl; import org.apache.stratos.common.Component; -import org.apache.stratos.common.services.ComponentActivationEventListener; import org.apache.stratos.common.services.ComponentStartUpSynchronizer; import org.apache.stratos.common.services.DistributedObjectProvider; import org.apache.stratos.common.threading.StratosThreadPool; @@ -57,9 +57,11 @@ import java.util.concurrent.TimeUnit; * @scr.component name="org.apache.stratos.cloud.controller" immediate="true" * @scr.reference name="hazelcast.instance.service" interface="com.hazelcast.core.HazelcastInstance" * cardinality="0..1"policy="dynamic" bind="setHazelcastInstance" unbind="unsetHazelcastInstance" - * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider" + * @scr.reference name="distributedObjectProvider" + * interface="org.apache.stratos.common.services.DistributedObjectProvider" * cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider" - * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer" + * @scr.reference name="componentStartUpSynchronizer" + * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer" * cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer" * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService" * cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService" @@ -79,6 +81,7 @@ public class CloudControllerServiceComponent { private ClusterStatusTopicReceiver clusterStatusTopicReceiver; private InstanceStatusTopicReceiver instanceStatusTopicReceiver; private ApplicationEventReceiver applicationEventReceiver; + private InitializerTopicReceiver initializerTopicReceiver; private ExecutorService executorService; private ScheduledExecutorService scheduler; @@ -88,15 +91,15 @@ public class CloudControllerServiceComponent { } try { executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE); - scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, - SCHEDULER_THREAD_POOL_SIZE); + scheduler = StratosThreadPool + .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, SCHEDULER_THREAD_POOL_SIZE); Runnable cloudControllerActivator = new Runnable() { @Override public void run() { try { - ComponentStartUpSynchronizer componentStartUpSynchronizer = - ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer(); + ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance() + .getComponentStartUpSynchronizer(); // Register cloud controller service BundleContext bundleContext = context.getBundleContext(); @@ -125,8 +128,8 @@ public class CloudControllerServiceComponent { executeCoordinatorTasks(); } - componentStartUpSynchronizer.waitForAxisServiceActivation(Component.CloudController, - "CloudControllerService"); + componentStartUpSynchronizer + .waitForAxisServiceActivation(Component.CloudController, "CloudControllerService"); componentStartUpSynchronizer.setComponentStatus(Component.CloudController, true); log.info("Cloud controller service component activated"); } catch (Exception e) { @@ -166,27 +169,17 @@ public class CloudControllerServiceComponent { log.info("Instance status event receiver thread started"); } + initializerTopicReceiver = new InitializerTopicReceiver(); + initializerTopicReceiver.setExecutorService(executorService); + initializerTopicReceiver.execute(); + if (log.isInfoEnabled()) { - log.info("Scheduling topology synchronizer task"); + log.info("Initializer event receiver thread started"); } - ComponentStartUpSynchronizer componentStartUpSynchronizer = - ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer(); - if (componentStartUpSynchronizer.isEnabled()) { - componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() { - @Override - public void activated(Component component) { - if (component == Component.StratosManager) { - scheduleEventSynchronizers(); - } - } - }); - } else { - scheduleEventSynchronizers(); + if (log.isInfoEnabled()) { + log.info("Scheduling topology synchronizer task"); } - } - - private void scheduleEventSynchronizers() { Runnable topologySynchronizer = new TopologyEventSynchronizer(); scheduler.scheduleAtFixedRate(topologySynchronizer, 0, 1, TimeUnit.MINUTES); } @@ -228,8 +221,8 @@ public class CloudControllerServiceComponent { } protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) { - ServiceReferenceHolder.getInstance().setAxisConfiguration( - cfgCtxService.getServerConfigContext().getAxisConfiguration()); + ServiceReferenceHolder.getInstance() + .setAxisConfiguration(cfgCtxService.getServerConfigContext().getAxisConfiguration()); } protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) { @@ -296,4 +289,4 @@ public class CloudControllerServiceComponent { log.warn("An error occurred while shutting down executor service", e); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java index b55d3a2..12f7685 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventPublisher.java @@ -25,6 +25,7 @@ import org.apache.stratos.cloud.controller.domain.Cartridge; import org.apache.stratos.cloud.controller.domain.ClusterContext; import org.apache.stratos.cloud.controller.domain.MemberContext; import org.apache.stratos.cloud.controller.domain.PortMapping; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; @@ -314,12 +315,16 @@ public class TopologyEventPublisher { } public static void sendCompleteTopologyEvent(Topology topology) { - CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); - - if (log.isDebugEnabled()) { - log.debug(String.format("Publishing complete topology event")); + TopologyHolder.acquireReadLock(); + try { + CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); + if (log.isDebugEnabled()) { + log.debug("Publishing complete topology event..."); + } + publishEvent(completeTopologyEvent); + } finally { + TopologyHolder.releaseReadLock(); } - publishEvent(completeTopologyEvent); } public static void sendClusterTerminatingEvent(ClusterInstanceTerminatingEvent clusterTerminatingEvent) { http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java index 832cae9..fcfd965 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologyEventSynchronizer.java @@ -22,7 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.config.CloudControllerConfig; import org.apache.stratos.cloud.controller.context.CloudControllerContext; -import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder; /** * Topology event synchronizer publishes complete topology event periodically. @@ -53,10 +53,8 @@ public class TopologyEventSynchronizer implements Runnable { try { // Publish complete topology event - if (TopologyManager.getTopology() != null) { - CloudControllerContext.getInstance().setTopologySyncRunning(true); - TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology()); - } + CloudControllerContext.getInstance().setTopologySyncRunning(true); + TopologyEventPublisher.sendCompleteTopologyEvent(TopologyHolder.getTopology()); } finally { CloudControllerContext.getInstance().setTopologySyncRunning(false); } http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java new file mode 100644 index 0000000..0f8538c --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.receiver.initializer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.listener.initializer.CompleteTopologyRequestEventListener; +import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver; + +import java.util.concurrent.ExecutorService; + +public class InitializerTopicReceiver { + private static final Log log = LogFactory.getLog(InitializerTopicReceiver.class); + private InitializerEventReceiver initializerEventReceiver; + private ExecutorService executorService; + + public InitializerTopicReceiver() { + this.initializerEventReceiver = new InitializerEventReceiver(); + addEventListeners(); + } + + public void execute() { + initializerEventReceiver.setExecutorService(executorService); + initializerEventReceiver.execute(); + if (log.isInfoEnabled()) { + log.info("Cloud controller initializer topic receiver started"); + } + } + + private void addEventListeners() { + initializerEventReceiver.addEventListener(new CompleteTopologyRequestEventListener() { + @Override + protected void onEvent(Event event) { + if (log.isDebugEnabled()) { + log.debug("Handling CompleteTopologyRequestEvent"); + } + try { + TopologyEventPublisher.sendCompleteTopologyEvent(TopologyHolder.getTopology()); + } catch (Exception e) { + log.error("Failed to process CompleteTopologyRequestEvent", e); + } + } + }); + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index 09670e0..da38337 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -60,12 +60,12 @@ public class TopologyBuilder { public static void handleServiceCreated(List<Cartridge> cartridgeList) throws RegistryException { Service service; - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); if (cartridgeList == null) { throw new RuntimeException("Cartridge list is empty"); } try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); for (Cartridge cartridge : cartridgeList) { if (!topology.serviceExists(cartridge.getType())) { ServiceType serviceType = cartridge.isMultiTenant() ? @@ -104,17 +104,17 @@ public class TopologyBuilder { } } topology.addService(service); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); } } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } TopologyEventPublisher.sendServiceCreateEvent(cartridgeList); } public static void handleServiceRemoved(List<Cartridge> cartridgeList) throws RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); for (Cartridge cartridge : cartridgeList) { Service service = topology.getService(cartridge.getType()); if (service == null) { @@ -122,11 +122,11 @@ public class TopologyBuilder { } if (service.getClusters().size() == 0) { try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); topology.removeService(cartridge.getType()); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList); } else { @@ -138,9 +138,9 @@ public class TopologyBuilder { public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) throws RegistryException { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); try { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); for (Cluster cluster : appClusters) { Service service = topology.getService(cluster.getServiceName()); if (service == null) { @@ -150,9 +150,9 @@ public class TopologyBuilder { service.addCluster(cluster); log.info("Cluster created: [cluster] " + cluster.getClusterId()); } - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } log.debug("Creating cluster port mappings: [application-id] " + appId); @@ -184,10 +184,10 @@ public class TopologyBuilder { public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData) throws RegistryException { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); CloudControllerContext context = CloudControllerContext.getInstance(); try { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); if (clusterData != null) { // remove clusters from CC topology model and remove runtime information @@ -208,9 +208,9 @@ public class TopologyBuilder { } else { log.info("No cluster data found for application " + appId + " to remove"); } - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } // Remove cluster port mappings of application @@ -220,9 +220,9 @@ public class TopologyBuilder { } public static void handleClusterReset(ClusterStatusClusterResetEvent event) throws RegistryException { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); try { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(event.getServiceName()); if (service == null) { throw new RuntimeException("Service " + event.getServiceName() + @@ -246,7 +246,7 @@ public class TopologyBuilder { if (context.isStateTransitionValid(status)) { context.setStatus(status); log.info("Cluster Created adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); //publishing data TopologyEventPublisher .sendClusterResetEvent(event.getAppId(), event.getServiceName(), event.getClusterId(), @@ -258,16 +258,16 @@ public class TopologyBuilder { } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } public static void handleClusterInstanceCreated(String serviceType, String clusterId, String alias, String instanceId, String partitionId, String networkPartitionId) throws RegistryException { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); try { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(serviceType); if (service == null) { throw new RuntimeException("Service " + serviceType + @@ -286,18 +286,18 @@ public class TopologyBuilder { clusterInstance.setNetworkPartitionId(networkPartitionId); clusterInstance.setPartitionId(partitionId); cluster.addInstanceContext(instanceId, clusterInstance); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = new ClusterInstanceCreatedEvent(serviceType, clusterId, clusterInstance); clusterInstanceCreatedEvent.setPartitionId(partitionId); TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } public static void handleClusterRemoved(ClusterContext ctxt) throws RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(ctxt.getCartridgeType()); String deploymentPolicy; if (service == null) { @@ -308,12 +308,12 @@ public class TopologyBuilder { ctxt.getCartridgeType())); } try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); Cluster cluster = service.removeCluster(ctxt.getClusterId()); deploymentPolicy = cluster.getDeploymentPolicyName(); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); } @@ -324,7 +324,7 @@ public class TopologyBuilder { * @param memberContext */ public static void handleMemberCreatedEvent(MemberContext memberContext) throws RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(memberContext.getCartridgeType()); String clusterId = memberContext.getClusterId(); Cluster cluster = service.getCluster(clusterId); @@ -340,14 +340,14 @@ public class TopologyBuilder { throw new RuntimeException(String.format("Member %s already exists", memberId)); } try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime); member.setStatus(MemberStatus.Created); member.setLbClusterId(lbClusterId); member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); cluster.addMember(member); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); //member created time Long timestamp = System.currentTimeMillis(); @@ -368,7 +368,7 @@ public class TopologyBuilder { } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } TopologyEventPublisher.sendMemberCreatedEvent(memberContext); } @@ -379,7 +379,7 @@ public class TopologyBuilder { * @param memberContext */ public static void handleMemberInitializedEvent(MemberContext memberContext) throws RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(memberContext.getCartridgeType()); if (service == null) { throw new RuntimeException(String.format("Service %s does not exist", memberContext.getCartridgeType())); @@ -397,7 +397,7 @@ public class TopologyBuilder { throw new RuntimeException(String.format("Member %s does not exist", memberContext.getMemberId())); } try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); // Set instance id returned by the IaaS member.setInstanceId(memberContext.getInstanceId()); @@ -430,7 +430,7 @@ public class TopologyBuilder { member.setStatus(MemberStatus.Initialized); log.info("Member status updated to initialized"); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); //member intialized time Long timestamp = System.currentTimeMillis(); TopologyEventPublisher.sendMemberInitializedEvent(memberContext); @@ -465,7 +465,7 @@ public class TopologyBuilder { } } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } @@ -482,7 +482,7 @@ public class TopologyBuilder { public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { try { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(instanceStartedEvent.getServiceName()); if (service == null) { throw new RuntimeException( @@ -504,7 +504,7 @@ public class TopologyBuilder { } try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.Starting)) { log.error("Invalid State Transition from " + member.getStatus() + " to " + @@ -513,7 +513,7 @@ public class TopologyBuilder { member.setStatus(MemberStatus.Starting); log.info("member started event adding status started"); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); //member started time Long timestamp = System.currentTimeMillis(); //memberStartedEvent. @@ -538,7 +538,7 @@ public class TopologyBuilder { } } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } catch (Exception e) { String message = String.format("Could not handle member started event: [application-id] %s " @@ -549,7 +549,7 @@ public class TopologyBuilder { } public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) throws RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(instanceActivatedEvent.getServiceName()); if (service == null) { throw new RuntimeException( @@ -579,7 +579,7 @@ public class TopologyBuilder { //TODO memberActivatedEvent.setApplicationId(null); try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.Active)) { log.error("Invalid state transition from [" + member.getStatus() + "] to [" + @@ -624,7 +624,7 @@ public class TopologyBuilder { memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs()); memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); //member activated time Long timestamp = System.currentTimeMillis(); @@ -648,13 +648,13 @@ public class TopologyBuilder { } } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) throws InvalidMemberException, InvalidCartridgeTypeException, RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); //update the status of the member @@ -684,7 +684,7 @@ public class TopologyBuilder { //member ReadyToShutDown state change time Long timestamp = null; try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { throw new RuntimeException("Invalid State Transition from " + member.getStatus() + " to " + @@ -693,10 +693,10 @@ public class TopologyBuilder { member.setStatus(MemberStatus.ReadyToShutDown); log.info("Member Ready to shut down event adding status started"); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); timestamp = System.currentTimeMillis(); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); //publishing member status to DAS. @@ -721,7 +721,7 @@ public class TopologyBuilder { public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) throws InvalidMemberException, InvalidCartridgeTypeException, RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); //update the status of the member if (service == null) { @@ -746,7 +746,7 @@ public class TopologyBuilder { instanceMaintenanceModeEvent.getClusterInstanceId(), instanceMaintenanceModeEvent.getMemberId(), instanceMaintenanceModeEvent.getNetworkPartitionId(), instanceMaintenanceModeEvent.getPartitionId()); try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { throw new RuntimeException( @@ -755,9 +755,9 @@ public class TopologyBuilder { member.setStatus(MemberStatus.In_Maintenance); log.info("member maintenance mode event adding status started"); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } //publishing data TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); @@ -775,7 +775,7 @@ public class TopologyBuilder { */ public static void handleMemberTerminated(String serviceName, String clusterId, String networkPartitionId, String partitionId, String memberId) throws RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(serviceName); Properties properties; if (service == null) { @@ -799,12 +799,12 @@ public class TopologyBuilder { //member terminated time Long timestamp = null; try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); properties = member.getProperties(); cluster.removeMember(member); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); timestamp = System.currentTimeMillis(); } /* @TODO leftover from grouping_poc*/ @@ -831,7 +831,7 @@ public class TopologyBuilder { public static void handleClusterActivatedEvent( ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) throws RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); //update the status of the cluster if (service == null) { @@ -857,7 +857,7 @@ public class TopologyBuilder { clusterStatusClusterActivatedEvent.getAppId(), clusterStatusClusterActivatedEvent.getServiceName(), clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId()); try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); Collection<KubernetesService> kubernetesServices = clusterContext .getKubernetesServices(clusterStatusClusterActivatedEvent.getInstanceId()); @@ -932,7 +932,7 @@ public class TopologyBuilder { if (context.isStateTransitionValid(status)) { context.setStatus(status); log.info("Cluster activated adding status started for " + cluster.getClusterId()); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); // publish event TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent); } else { @@ -942,7 +942,7 @@ public class TopologyBuilder { clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(), status)); } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } @@ -961,7 +961,7 @@ public class TopologyBuilder { public static void handleClusterInactivateEvent(ClusterStatusClusterInactivateEvent clusterInactivateEvent) throws RegistryException { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(clusterInactivateEvent.getServiceName()); //update the status of the cluster if (service == null) { @@ -980,7 +980,7 @@ public class TopologyBuilder { clusterInactivateEvent.getAppId(), clusterInactivateEvent.getServiceName(), clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId()); try { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId()); if (context == null) { throw new RuntimeException("Cluster Instance Context is not found for [cluster] " + @@ -991,7 +991,7 @@ public class TopologyBuilder { if (context.isStateTransitionValid(status)) { context.setStatus(status); log.info("Cluster Inactive adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); //publishing data TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1); } else { @@ -1001,15 +1001,15 @@ public class TopologyBuilder { context.getStatus(), status)); } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) throws RegistryException { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); try { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Service service = topology.getService(event.getServiceName()); //update the status of the cluster @@ -1036,7 +1036,7 @@ public class TopologyBuilder { log.info("Cluster Terminated adding status started for and removing the cluster instance" + cluster .getClusterId()); cluster.removeInstanceContext(event.getInstanceId()); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); //publishing data ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent( event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId()); @@ -1048,7 +1048,7 @@ public class TopologyBuilder { event.getInstanceId(), context.getStatus(), status)); } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } @@ -1056,10 +1056,10 @@ public class TopologyBuilder { public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) throws RegistryException { - TopologyManager.acquireWriteLock(); + TopologyHolder.acquireWriteLock(); try { - Topology topology = TopologyManager.getTopology(); + Topology topology = TopologyHolder.getTopology(); Cluster cluster = topology.getService(event.getServiceName()). getCluster(event.getClusterId()); @@ -1077,7 +1077,7 @@ public class TopologyBuilder { if (context.isStateTransitionValid(status)) { context.setStatus(status); log.info("Cluster Terminating started for " + cluster.getClusterId()); - TopologyManager.updateTopology(topology); + TopologyHolder.updateTopology(topology); //publishing data ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent( event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId()); @@ -1096,7 +1096,7 @@ public class TopologyBuilder { event.getInstanceId(), context.getStatus(), status)); } } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java new file mode 100644 index 0000000..d183ca0 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyHolder.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.topology; + +import com.google.gson.Gson; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.registry.RegistryManager; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.apache.stratos.cloud.controller.util.CloudControllerUtil; +import org.apache.stratos.common.concurrent.locks.ReadWriteLock; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.wso2.carbon.registry.core.exceptions.RegistryException; + +/** + * Persistence and retrieval of Topology from Registry + */ +public class TopologyHolder { + private static final Log log = LogFactory.getLog(TopologyHolder.class); + + private static volatile ReadWriteLock lock = new ReadWriteLock("topology-manager"); + private static volatile Topology topology; + + private TopologyHolder() { + } + + public static void acquireReadLock() { + lock.acquireReadLock(); + if (log.isDebugEnabled()) { + log.debug("Read lock acquired"); + } + } + + public static void releaseReadLock() { + lock.releaseReadLock(); + if (log.isDebugEnabled()) { + log.debug("Read lock released"); + } + } + + public static void acquireWriteLock() { + lock.acquireWriteLock(); + if (log.isDebugEnabled()) { + log.debug("Write lock acquired"); + } + } + + public static void releaseWriteLock() { + lock.releaseWriteLock(); + if (log.isDebugEnabled()) { + log.debug("Write lock released"); + } + } + + public static Topology getTopology() { + if (topology == null) { + synchronized (TopologyHolder.class) { + if (topology == null) { + if (log.isDebugEnabled()) { + log.debug("Trying to retrieve topology from registry"); + } + topology = CloudControllerUtil.retrieveTopology(); + if (topology == null) { + if (log.isDebugEnabled()) { + log.debug("Topology not found in registry, creating new"); + } + topology = new Topology(); + } + if (log.isDebugEnabled()) { + log.debug("Topology initialized"); + } + } + } + } + return topology; + } + + /** + * Update in-memory topology and persist it in registry. + * + * @param updatedTopology + */ + public static void updateTopology(Topology updatedTopology) throws RegistryException { + synchronized (TopologyHolder.class) { + if (log.isDebugEnabled()) { + log.debug("Updating topology"); + } + topology = updatedTopology; + RegistryManager.getInstance().persist(CloudControllerConstants.TOPOLOGY_RESOURCE, topology); + if (log.isDebugEnabled()) { + log.debug(String.format("Topology updated: %s", toJson(topology))); + } + } + + } + + private static String toJson(Object object) { + Gson gson = new Gson(); + return gson.toJson(object); + } +} + http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java deleted file mode 100644 index f6f6036..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.messaging.topology; - -import com.google.gson.Gson; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.registry.RegistryManager; -import org.apache.stratos.cloud.controller.util.CloudControllerConstants; -import org.apache.stratos.cloud.controller.util.CloudControllerUtil; -import org.apache.stratos.common.concurrent.locks.ReadWriteLock; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.wso2.carbon.registry.core.exceptions.RegistryException; - -/** - * Persistence and retrieval of Topology from Registry - */ -public class TopologyManager { - private static final Log log = LogFactory.getLog(TopologyManager.class); - - private static volatile ReadWriteLock lock = new ReadWriteLock("topology-manager"); - private static volatile Topology topology; - - private TopologyManager() { - } - - public static void acquireReadLock() { - lock.acquireReadLock(); - if (log.isDebugEnabled()) { - log.debug("Read lock acquired"); - } - } - - public static void releaseReadLock() { - lock.releaseReadLock(); - if (log.isDebugEnabled()) { - log.debug("Read lock released"); - } - } - - public static void acquireWriteLock() { - lock.acquireWriteLock(); - if (log.isDebugEnabled()) { - log.debug("Write lock acquired"); - } - } - - public static void releaseWriteLock() { - lock.releaseWriteLock(); - if (log.isDebugEnabled()) { - log.debug("Write lock released"); - } - } - - public static Topology getTopology() { - if (topology == null) { - synchronized (TopologyManager.class) { - if (topology == null) { - if (log.isDebugEnabled()) { - log.debug("Trying to retrieve topology from registry"); - } - topology = CloudControllerUtil.retrieveTopology(); - if (topology == null) { - if (log.isDebugEnabled()) { - log.debug("Topology not found in registry, creating new"); - } - topology = new Topology(); - } - if (log.isDebugEnabled()) { - log.debug("Topology initialized"); - } - } - } - } - return topology; - } - - /** - * Update in-memory topology and persist it in registry. - * - * @param updatedTopology - */ - public static void updateTopology(Topology updatedTopology) throws RegistryException { - synchronized (TopologyManager.class) { - if (log.isDebugEnabled()) { - log.debug("Updating topology"); - } - topology = updatedTopology; - RegistryManager.getInstance().persist(CloudControllerConstants.TOPOLOGY_RESOURCE, topology); - if (log.isDebugEnabled()) { - log.debug(String.format("Topology updated: %s", toJson(topology))); - } - } - - } - - private static String toJson(Object object) { - Gson gson = new Gson(); - return gson.toJson(object); - } -} - http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index 582e78f..d3fa92d 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -30,7 +30,7 @@ import org.apache.stratos.cloud.controller.domain.kubernetes.KubernetesMaster; import org.apache.stratos.cloud.controller.exception.*; import org.apache.stratos.cloud.controller.iaases.Iaas; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; -import org.apache.stratos.cloud.controller.messaging.topology.TopologyManager; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyHolder; import org.apache.stratos.cloud.controller.services.CloudControllerService; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; @@ -640,8 +640,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { } // check if status == active, if true, then this is a termination on member faulty - TopologyManager.acquireWriteLock(); - Topology topology = TopologyManager.getTopology(); + TopologyHolder.acquireWriteLock(); + Topology topology = TopologyHolder.getTopology(); org.apache.stratos.messaging.domain.topology.Service service = topology .getService(memberContext.getCartridgeType()); @@ -679,7 +679,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { log.error(message, e); throw new CloudControllerException(message, e); } finally { - TopologyManager.releaseWriteLock(); + TopologyHolder.releaseWriteLock(); } return true; } @@ -826,7 +826,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { log.error(msg); return; } - Collection<Member> members = TopologyManager.getTopology(). + Collection<Member> members = TopologyHolder.getTopology(). getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); //finding the responding members from the existing members in the topology. int sizeOfRespondingMembers = 0; @@ -872,7 +872,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { log.error(msg); return; } - Collection<Member> members = TopologyManager.getTopology(). + Collection<Member> members = TopologyHolder.getTopology(). getService(ctxt.getCartridgeType()).getCluster(clusterId_).getMembers(); while (members.size() > 0) {
