http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java index 5f7bd01..bd09d7e 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java @@ -22,7 +22,6 @@ import com.hazelcast.core.HazelcastInstance; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.Component; -import org.apache.stratos.common.services.ComponentActivationEventListener; import org.apache.stratos.common.services.ComponentStartUpSynchronizer; import org.apache.stratos.common.services.DistributedObjectProvider; import org.apache.stratos.common.threading.StratosThreadPool; @@ -31,6 +30,7 @@ import org.apache.stratos.manager.messaging.publisher.TenantEventPublisher; import org.apache.stratos.manager.messaging.publisher.synchronizer.ApplicationSignUpEventSynchronizer; import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantEventSynchronizer; import org.apache.stratos.manager.messaging.receiver.StratosManagerApplicationEventReceiver; +import org.apache.stratos.manager.messaging.receiver.StratosManagerInitializerTopicReceiver; import org.apache.stratos.manager.messaging.receiver.StratosManagerInstanceStatusEventReceiver; import org.apache.stratos.manager.messaging.receiver.StratosManagerTopologyEventReceiver; import org.apache.stratos.manager.user.management.TenantUserRoleManager; @@ -73,9 +73,11 @@ import java.util.concurrent.TimeUnit; * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService" * cardinality="1..1" policy="dynamic" bind="setTaskService" * unbind="unsetTaskService" - * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.services.DistributedObjectProvider" + * @scr.reference name="distributedObjectProvider" + * interface="org.apache.stratos.common.services.DistributedObjectProvider" * cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider" - * @scr.reference name="componentStartUpSynchronizer" interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer" + * @scr.reference name="componentStartUpSynchronizer" + * interface="org.apache.stratos.common.services.ComponentStartUpSynchronizer" * cardinality="1..1" policy="dynamic" bind="setComponentStartUpSynchronizer" unbind="unsetComponentStartUpSynchronizer" */ public class StratosManagerServiceComponent { @@ -89,6 +91,7 @@ public class StratosManagerServiceComponent { private StratosManagerTopologyEventReceiver topologyEventReceiver; private StratosManagerInstanceStatusEventReceiver instanceStatusEventReceiver; private StratosManagerApplicationEventReceiver applicationEventReceiver; + private StratosManagerInitializerTopicReceiver initializerTopicReceiver; private ExecutorService executorService; private ScheduledExecutorService scheduler; @@ -98,21 +101,21 @@ public class StratosManagerServiceComponent { } try { executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE); - scheduler = StratosThreadPool.getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, - SCHEDULER_THREAD_POOL_SIZE); + scheduler = StratosThreadPool + .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, SCHEDULER_THREAD_POOL_SIZE); Runnable stratosManagerActivator = new Runnable() { @Override public void run() { try { - ComponentStartUpSynchronizer componentStartUpSynchronizer = - ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer(); + ComponentStartUpSynchronizer componentStartUpSynchronizer = ServiceReferenceHolder.getInstance() + .getComponentStartUpSynchronizer(); // Wait for cloud controller and autoscaler components to be activated - componentStartUpSynchronizer.waitForComponentActivation(Component.StratosManager, - Component.CloudController); - componentStartUpSynchronizer.waitForComponentActivation(Component.StratosManager, - Component.Autoscaler); + componentStartUpSynchronizer + .waitForComponentActivation(Component.StratosManager, Component.CloudController); + componentStartUpSynchronizer + .waitForComponentActivation(Component.StratosManager, Component.Autoscaler); CartridgeConfigFileReader.readProperties(); if (StratosManagerContext.getInstance().isClustered()) { @@ -123,8 +126,8 @@ public class StratosManagerServiceComponent { ServiceReferenceHolder.getInstance().getHazelcastInstance() .getLock(STRATOS_MANAGER_COORDINATOR_LOCK).lock(); - String localMemberId = ServiceReferenceHolder.getInstance().getHazelcastInstance() - .getCluster().getLocalMember().getUuid(); + String localMemberId = ServiceReferenceHolder.getInstance() + .getHazelcastInstance().getCluster().getLocalMember().getUuid(); log.info("Elected this member [" + localMemberId + "] " + "as the stratos manager coordinator for the cluster"); @@ -149,8 +152,8 @@ public class StratosManagerServiceComponent { // Initialize application event receiver initializeApplicationEventReceiver(); - componentStartUpSynchronizer.waitForAxisServiceActivation(Component.StratosManager, - "StratosManagerService"); + componentStartUpSynchronizer + .waitForAxisServiceActivation(Component.StratosManager, "StratosManagerService"); componentStartUpSynchronizer.setComponentStatus(Component.StratosManager, true); if (log.isInfoEnabled()) { log.info("Stratos manager component is activated"); @@ -174,17 +177,25 @@ public class StratosManagerServiceComponent { * @throws UserStoreException * @throws UserManagerException */ - private void executeCoordinatorTasks(ComponentContext componentContext) throws UserStoreException, - UserManagerException { - + private void executeCoordinatorTasks(ComponentContext componentContext) + throws UserStoreException, UserManagerException { initializeTenantEventPublisher(componentContext); initializeInstanceStatusEventReceiver(); - registerComponentStartUpEventListeners(); - + initializeInitializerEventReceiver(); + Runnable tenantSynchronizer = new TenantEventSynchronizer(); + scheduler.scheduleAtFixedRate(tenantSynchronizer, 0, 1, TimeUnit.MINUTES); + Runnable applicationSignUpSynchronizer = new ApplicationSignUpEventSynchronizer(); + scheduler.scheduleAtFixedRate(applicationSignUpSynchronizer, 0, 1, TimeUnit.MINUTES); // Create internal/user Role at server start-up createInternalUserRole(componentContext); } + private void initializeInitializerEventReceiver() { + initializerTopicReceiver = new StratosManagerInitializerTopicReceiver(); + initializerTopicReceiver.setExecutorService(executorService); + initializerTopicReceiver.execute(); + } + /** * Initialize instance status event receiver */ @@ -219,16 +230,17 @@ public class StratosManagerServiceComponent { * @throws UserStoreException * @throws UserManagerException */ - private void createInternalUserRole(ComponentContext componentContext) throws UserStoreException, UserManagerException { + private void createInternalUserRole(ComponentContext componentContext) + throws UserStoreException, UserManagerException { RealmService realmService = ServiceReferenceHolder.getRealmService(); UserRealm realm = realmService.getBootstrapRealm(); UserStoreManager userStoreManager = realm.getUserStoreManager(); UserRoleCreator.createInternalUserRole(userStoreManager); TenantUserRoleManager tenantUserRoleManager = new TenantUserRoleManager(); - componentContext.getBundleContext().registerService( - org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(), - tenantUserRoleManager, null); + componentContext.getBundleContext() + .registerService(org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(), + tenantUserRoleManager, null); } /** @@ -242,44 +254,19 @@ public class StratosManagerServiceComponent { log.debug("Initializing tenant event publisher..."); } final TenantEventPublisher tenantEventPublisher = new TenantEventPublisher(); - componentContext.getBundleContext().registerService( - org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(), - tenantEventPublisher, null); + componentContext.getBundleContext() + .registerService(org.wso2.carbon.stratos.common.listeners.TenantMgtListener.class.getName(), + tenantEventPublisher, null); if (log.isInfoEnabled()) { log.info("Tenant event publisher initialized"); } } - private void registerComponentStartUpEventListeners() { - ComponentStartUpSynchronizer componentStartUpSynchronizer = - ServiceReferenceHolder.getInstance().getComponentStartUpSynchronizer(); - if (componentStartUpSynchronizer.isEnabled()) { - componentStartUpSynchronizer.addEventListener(new ComponentActivationEventListener() { - @Override - public void activated(Component component) { - if (component == Component.StratosManager) { - scheduleEventSynchronizers(); - } - } - }); - } else { - scheduleEventSynchronizers(); - } - } - - private void scheduleEventSynchronizers() { - Runnable tenantSynchronizer = new TenantEventSynchronizer(); - scheduler.scheduleAtFixedRate(tenantSynchronizer, 0, 1, TimeUnit.MINUTES); - - Runnable applicationSignUpSynchronizer = new ApplicationSignUpEventSynchronizer(); - scheduler.scheduleAtFixedRate(applicationSignUpSynchronizer, 0, 1, TimeUnit.MINUTES); - } - protected void setConfigurationContextService(ConfigurationContextService contextService) { ServiceReferenceHolder.setClientConfigContext(contextService.getClientConfigContext()); ServiceReferenceHolder.setServerConfigContext(contextService.getServerConfigContext()); - ServiceReferenceHolder.getInstance().setAxisConfiguration( - contextService.getServerConfigContext().getAxisConfiguration()); + ServiceReferenceHolder.getInstance() + .setAxisConfiguration(contextService.getServerConfigContext().getAxisConfiguration()); } protected void unsetConfigurationContextService(ConfigurationContextService contextService) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java index d7d8ef4..f4cfbd8 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/publisher/synchronizer/TenantEventSynchronizer.java @@ -42,9 +42,13 @@ public class TenantEventSynchronizer implements Runnable { @Override public void run() { + sendCompleteTenantEvent(); + } + + public static synchronized void sendCompleteTenantEvent(){ try { if (log.isDebugEnabled()) { - log.debug(String.format("Publishing complete tenant event")); + log.debug("Publishing complete tenant event"); } Tenant tenant; List<Tenant> tenants = new ArrayList<Tenant>(); http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java new file mode 100644 index 0000000..ed526a6 --- /dev/null +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.manager.messaging.receiver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.manager.components.ApplicationSignUpHandler; +import org.apache.stratos.manager.messaging.publisher.ApplicationSignUpEventPublisher; +import org.apache.stratos.manager.messaging.publisher.synchronizer.TenantEventSynchronizer; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.listener.initializer.CompleteApplicationSignUpsRequestEventListener; +import org.apache.stratos.messaging.listener.initializer.CompleteTenantRequestEventListener; +import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver; + +import java.util.concurrent.ExecutorService; + +public class StratosManagerInitializerTopicReceiver { + private static final Log log = LogFactory.getLog(StratosManagerInitializerTopicReceiver.class); + private InitializerEventReceiver initializerEventReceiver; + private ExecutorService executorService; + private ApplicationSignUpHandler applicationSignUpHandler; + + public StratosManagerInitializerTopicReceiver() { + this.initializerEventReceiver = new InitializerEventReceiver(); + applicationSignUpHandler = new ApplicationSignUpHandler(); + addEventListeners(); + } + + public void execute() { + initializerEventReceiver.setExecutorService(executorService); + initializerEventReceiver.execute(); + if (log.isInfoEnabled()) { + log.info("Cloud controller initializer topic receiver started"); + } + } + + private void addEventListeners() { + initializerEventReceiver.addEventListener(new CompleteTenantRequestEventListener() { + @Override + protected void onEvent(Event event) { + if (log.isDebugEnabled()) { + log.debug("Handling CompleteTenantRequestEvent"); + } + try { + TenantEventSynchronizer.sendCompleteTenantEvent(); + } catch (Exception e) { + log.error("Failed to process CompleteTenantRequestEvent", e); + } + } + }); + + initializerEventReceiver.addEventListener(new CompleteApplicationSignUpsRequestEventListener() { + @Override + protected void onEvent(Event event) { + if (log.isDebugEnabled()) { + log.debug("Handling CompleteApplicationSignUpsRequestEvent"); + } + try { + ApplicationSignUpEventPublisher + .publishCompleteApplicationSignUpsEvent(applicationSignUpHandler.getApplicationSignUps()); + } catch (Exception e) { + log.error("Failed to process CompleteApplicationSignUpsRequestEvent", e); + } + } + }); + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java new file mode 100644 index 0000000..ba912fd --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationSignUpsRequestEvent.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.initializer; + +import java.io.Serializable; + +public class CompleteApplicationSignUpsRequestEvent extends InitializerEvent implements Serializable { + public CompleteApplicationSignUpsRequestEvent() { + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java new file mode 100644 index 0000000..621b8a6 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteApplicationsRequestEvent.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.initializer; + +import java.io.Serializable; + +public class CompleteApplicationsRequestEvent extends InitializerEvent implements Serializable { + public CompleteApplicationsRequestEvent() { + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java new file mode 100644 index 0000000..32416a6 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTenantRequestEvent.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.initializer; + +import java.io.Serializable; + +public class CompleteTenantRequestEvent extends InitializerEvent implements Serializable { + public CompleteTenantRequestEvent() { + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java new file mode 100644 index 0000000..257bf44 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/CompleteTopologyRequestEvent.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.initializer; + +import java.io.Serializable; + +public class CompleteTopologyRequestEvent extends InitializerEvent implements Serializable { + public CompleteTopologyRequestEvent() { + + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java new file mode 100644 index 0000000..fa54bb2 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/initializer/InitializerEvent.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.initializer; + +import org.apache.stratos.messaging.event.Event; + +import java.io.Serializable; + +public class InitializerEvent extends Event implements Serializable { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java new file mode 100644 index 0000000..08e4c85 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationSignUpsRequestEventListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.initializer; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class CompleteApplicationSignUpsRequestEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java new file mode 100644 index 0000000..f1da2da --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteApplicationsRequestEventListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.initializer; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class CompleteApplicationsRequestEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java new file mode 100644 index 0000000..20bdf13 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTenantRequestEventListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.initializer; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class CompleteTenantRequestEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java new file mode 100644 index 0000000..cbac1c9 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/initializer/CompleteTopologyRequestEventListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.listener.initializer; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class CompleteTopologyRequestEventListener extends EventListener { + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java index 8346d84..17922dd 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/CompleteApplicationsMessageProcessor.java @@ -85,6 +85,9 @@ public class CompleteApplicationsMessageProcessor extends MessageProcessor { log.debug("No Application information found in Complete Applications event"); } } + if (log.isInfoEnabled()) { + log.info("Application topology initialized"); + } // Set topology initialized applications.setInitialized(true); http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java new file mode 100644 index 0000000..557de79 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationSignUpsRequestMessageProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.initializer; + +import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.MessagingUtil; + +public class CompleteApplicationSignUpsRequestMessageProcessor extends MessageProcessor { + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (CompleteApplicationSignUpsRequestEvent.class.getName().equals(type)) { + // Parse complete message and build event + CompleteApplicationSignUpsRequestEvent event = (CompleteApplicationSignUpsRequestEvent) MessagingUtil + .jsonToObject(message, CompleteApplicationSignUpsRequestEvent.class); + + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format( + "Failed to process message using available message processors: [type] %s [body] %s", type, + message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java new file mode 100644 index 0000000..ae80a68 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteApplicationsRequestMessageProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.initializer; + +import org.apache.stratos.messaging.event.initializer.CompleteApplicationsRequestEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.MessagingUtil; + +public class CompleteApplicationsRequestMessageProcessor extends MessageProcessor { + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (CompleteApplicationsRequestEvent.class.getName().equals(type)) { + // Parse complete message and build event + CompleteApplicationsRequestEvent event = (CompleteApplicationsRequestEvent) MessagingUtil + .jsonToObject(message, CompleteApplicationsRequestEvent.class); + + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format( + "Failed to process message using available message processors: [type] %s [body] %s", type, + message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java new file mode 100644 index 0000000..a6af9fd --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTenantRequestMessageProcessor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.initializer; + +import org.apache.stratos.messaging.event.initializer.CompleteTenantRequestEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.MessagingUtil; + +public class CompleteTenantRequestMessageProcessor extends MessageProcessor { + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (CompleteTenantRequestEvent.class.getName().equals(type)) { + // Parse complete message and build event + CompleteTenantRequestEvent event = (CompleteTenantRequestEvent) MessagingUtil + .jsonToObject(message, CompleteTenantRequestEvent.class); + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format( + "Failed to process message using available message processors: [type] %s [body] %s", type, + message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java new file mode 100644 index 0000000..73e4bf7 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/CompleteTopologyRequestMessageProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.initializer; + +import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.MessagingUtil; + +public class CompleteTopologyRequestMessageProcessor extends MessageProcessor { + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (CompleteTopologyRequestEvent.class.getName().equals(type)) { + // Parse complete message and build event + CompleteTopologyRequestEvent event = (CompleteTopologyRequestEvent) MessagingUtil + .jsonToObject(message, CompleteTopologyRequestEvent.class); + + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format( + "Failed to process message using available message processors: [type] %s [body] %s", type, + message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java new file mode 100644 index 0000000..f3e292f --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/initializer/InitializerMessageProcessorChain.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.initializer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.listener.initializer.CompleteApplicationSignUpsRequestEventListener; +import org.apache.stratos.messaging.listener.initializer.CompleteApplicationsRequestEventListener; +import org.apache.stratos.messaging.listener.initializer.CompleteTenantRequestEventListener; +import org.apache.stratos.messaging.listener.initializer.CompleteTopologyRequestEventListener; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; + +public class InitializerMessageProcessorChain extends MessageProcessorChain { + private static final Log log = LogFactory.getLog(InitializerMessageProcessorChain.class); + private CompleteTopologyRequestMessageProcessor completeTopologyRequestMessageProcessor; + private CompleteApplicationsRequestMessageProcessor completeApplicationsRequestMessageProcessor; + private CompleteTenantRequestMessageProcessor completeTenantRequestMessageProcessor; + private CompleteApplicationSignUpsRequestMessageProcessor completeApplicationSignUpsRequestMessageProcessor; + + @Override + protected void initialize() { + completeTopologyRequestMessageProcessor = new CompleteTopologyRequestMessageProcessor(); + add(completeTopologyRequestMessageProcessor); + + completeApplicationsRequestMessageProcessor = new CompleteApplicationsRequestMessageProcessor(); + add(completeApplicationsRequestMessageProcessor); + + completeTenantRequestMessageProcessor = new CompleteTenantRequestMessageProcessor(); + add(completeTenantRequestMessageProcessor); + + completeApplicationSignUpsRequestMessageProcessor = new CompleteApplicationSignUpsRequestMessageProcessor(); + add(completeApplicationSignUpsRequestMessageProcessor); + + if (log.isDebugEnabled()) { + log.debug("Initializer message processor chain initialized"); + } + } + + @Override + public void addEventListener(EventListener eventListener) { + if (eventListener instanceof CompleteTopologyRequestEventListener) { + completeTopologyRequestMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof CompleteApplicationsRequestEventListener) { + completeApplicationsRequestMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof CompleteTenantRequestEventListener) { + completeTenantRequestMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof CompleteApplicationSignUpsRequestEventListener) { + completeApplicationSignUpsRequestMessageProcessor.addEventListener(eventListener); + } else { + throw new RuntimeException("Unknown event listener"); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java index 6b9085f..d7e7196 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java @@ -20,7 +20,10 @@ package org.apache.stratos.messaging.message.receiver.application; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; +import org.apache.stratos.messaging.event.initializer.CompleteApplicationsRequestEvent; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.util.MessagingUtil; @@ -32,7 +35,6 @@ public class ApplicationsEventReceiver { private ApplicationsEventMessageDelegator messageDelegator; private ApplicationsEventMessageListener messageListener; private EventSubscriber eventSubscriber; - private boolean terminated; private ExecutorService executorService; public ApplicationsEventReceiver() { @@ -45,11 +47,11 @@ public class ApplicationsEventReceiver { messageDelegator.addEventListener(eventListener); } - public void execute() { try { // Start topic subscriber thread - eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(), messageListener); + eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(), + messageListener); executorService.execute(eventSubscriber); if (log.isDebugEnabled()) { @@ -62,8 +64,7 @@ public class ApplicationsEventReceiver { if (log.isDebugEnabled()) { log.debug("Application status event message delegator thread started"); } - - + initializeCompleteApplicationsModel(); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Application status failed", e); @@ -74,7 +75,26 @@ public class ApplicationsEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - terminated = true; + } + + public void initializeCompleteApplicationsModel() { + executorService.execute(new Runnable() { + @Override + public void run() { + while (!eventSubscriber.isSubscribed()) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + + CompleteApplicationsRequestEvent completeApplicationsRequestEvent + = new CompleteApplicationsRequestEvent(); + String topic = MessagingUtil.getMessageTopicName(completeApplicationsRequestEvent); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(completeApplicationsRequestEvent); + } + }); } public ExecutorService getExecutorService() { http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java index 7863ee4..55e3fd1 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java @@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.application.signup; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; +import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.util.MessagingUtil; @@ -49,15 +52,14 @@ public class ApplicationSignUpEventReceiver { messageDelegator.addEventListener(eventListener); } - public void execute() { try { // Start topic subscriber thread - eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(), messageListener); + eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(), + messageListener); // subscriber.setMessageListener(messageListener); executorService.execute(eventSubscriber); - if (log.isDebugEnabled()) { log.debug("Application signup event message receiver thread started"); } @@ -68,7 +70,7 @@ public class ApplicationSignUpEventReceiver { log.debug("Application signup event message delegator thread started"); } - + initializeCompleteApplicationSignUps(); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Application signup receiver failed", e); @@ -76,6 +78,26 @@ public class ApplicationSignUpEventReceiver { } } + public void initializeCompleteApplicationSignUps() { + executorService.execute(new Runnable() { + @Override + public void run() { + while (!eventSubscriber.isSubscribed()) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + + CompleteApplicationSignUpsRequestEvent completeApplicationSignUpsRequestEvent + = new CompleteApplicationSignUpsRequestEvent(); + String topic = MessagingUtil.getMessageTopicName(completeApplicationSignUpsRequestEvent); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(completeApplicationSignUpsRequestEvent); + } + }); + } + public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java new file mode 100644 index 0000000..ffd2ae4 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.initializer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.Message; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; +import org.apache.stratos.messaging.message.processor.initializer.InitializerMessageProcessorChain; + +public class InitializerEventMessageDelegator implements Runnable { + private static final Log log = LogFactory.getLog(InitializerEventMessageDelegator.class); + + private MessageProcessorChain processorChain; + private InitializerEventMessageQueue messageQueue; + private boolean terminated; + + public InitializerEventMessageDelegator(InitializerEventMessageQueue initializerEventMessageQueue) { + this.messageQueue = initializerEventMessageQueue; + this.processorChain = new InitializerMessageProcessorChain(); + } + + public void addEventListener(EventListener eventListener) { + processorChain.addEventListener(eventListener); + } + + @Override + public void run() { + try { + if (log.isInfoEnabled()) { + log.info("Initializer event message delegator started"); + } + + while (!terminated) { + try { + Message message = messageQueue.take(); + String type = message.getEventClassName(); + + // Retrieve the actual message + String json = message.getText(); + + if (log.isDebugEnabled()) { + log.debug(String.format("Initializer event message [%s] received from queue: %s", type, + messageQueue.getClass())); + } + + if (log.isDebugEnabled()) { + log.debug(String.format("Delegating initializer event message: %s", type)); + } + processorChain.process(type, json, null); + } catch (InterruptedException ignore) { + log.info("Shutting down initializer event message delegator..."); + terminate(); + } catch (Exception e) { + log.error("Failed to retrieve initializer event message", e); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Initializer event message delegator failed", e); + } + } + } + + /** + * Terminate initializer event message delegator thread. + */ + public void terminate() { + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java new file mode 100644 index 0000000..bdfe875 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageListener.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.initializer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.subscribe.MessageListener; +import org.apache.stratos.messaging.domain.Message; + +public class InitializerEventMessageListener implements MessageListener { + private static final Log log = LogFactory.getLog(InitializerEventMessageListener.class); + + private final InitializerEventMessageQueue messageQueue; + + public InitializerEventMessageListener(InitializerEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void messageReceived(Message message) { + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Initializer event message received: %s", message.getText())); + } + // Add received message to the queue + messageQueue.add(message); + + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java new file mode 100644 index 0000000..326c5b8 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageQueue.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.initializer; + +import org.apache.stratos.messaging.domain.Message; + +import java.util.concurrent.LinkedBlockingQueue; + +public class InitializerEventMessageQueue extends LinkedBlockingQueue<Message> { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java new file mode 100644 index 0000000..90d358c --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.initializer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.util.MessagingUtil; + +import java.util.concurrent.ExecutorService; + +public class InitializerEventReceiver { + private static final Log log = LogFactory.getLog(InitializerEventReceiver.class); + + private InitializerEventMessageDelegator messageDelegator; + private InitializerEventMessageListener messageListener; + private EventSubscriber eventSubscriber; + private ExecutorService executorService; + + public InitializerEventReceiver() { + InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue(); + this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue); + this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue); + } + + public void addEventListener(EventListener eventListener) { + messageDelegator.addEventListener(eventListener); + } + + public void execute() { + try { + // Start topic subscriber thread + eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INITIALIZER_TOPIC.getTopicName(), + messageListener); + executorService.execute(eventSubscriber); + + if (log.isDebugEnabled()) { + log.debug("Initializer event message delegator thread started"); + } + // Start initializer event message delegator thread + executorService.execute(messageDelegator); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Initializer receiver failed", e); + } + } + } + + public void terminate() { + eventSubscriber.terminate(); + messageDelegator.terminate(); + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java index 1d0529a..988a2ce 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java @@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.tenant; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; +import org.apache.stratos.messaging.event.initializer.CompleteTenantRequestEvent; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.util.MessagingUtil; @@ -68,7 +71,7 @@ public class TenantEventReceiver { log.debug("Tenant event message delegator thread started"); } - + initializeCompleteTenant(); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Tenant receiver failed", e); @@ -76,6 +79,25 @@ public class TenantEventReceiver { } } + public void initializeCompleteTenant() { + executorService.execute(new Runnable() { + @Override + public void run() { + while (!eventSubscriber.isSubscribed()) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + + CompleteTenantRequestEvent completeTenantRequestEvent = new CompleteTenantRequestEvent(); + String topic = MessagingUtil.getMessageTopicName(completeTenantRequestEvent); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(completeTenantRequestEvent); + } + }); + } + public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java index 58f2c36..b841d0a 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java @@ -21,7 +21,10 @@ package org.apache.stratos.messaging.message.receiver.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; +import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.util.MessagingUtil; @@ -50,15 +53,12 @@ public class TopologyEventReceiver { messageDelegator.addEventListener(eventListener); } - public void execute() { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener); - // subscriber.setMessageListener(messageListener); executorService.execute(eventSubscriber); - if (log.isDebugEnabled()) { log.debug("Topology event message receiver thread started"); } @@ -69,7 +69,7 @@ public class TopologyEventReceiver { log.debug("Topology event message delegator thread started"); } - + initializeCompleteTopology(); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Topology receiver failed", e); @@ -82,6 +82,25 @@ public class TopologyEventReceiver { messageDelegator.terminate(); } + public void initializeCompleteTopology() { + executorService.execute(new Runnable() { + @Override + public void run() { + while (!eventSubscriber.isSubscribed()) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + + CompleteTopologyRequestEvent completeTopologyRequestEvent = new CompleteTopologyRequestEvent(); + String topic = MessagingUtil.getMessageTopicName(completeTopologyRequestEvent); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(completeTopologyRequestEvent); + } + }); + } + public ExecutorService getExecutorService() { return executorService; } http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java index c4bfb9d..b3efbfb 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/MessagingUtil.java @@ -63,6 +63,7 @@ public class MessagingUtil { */ public static enum Topics { TOPOLOGY_TOPIC("topology/#"), + INITIALIZER_TOPIC("initializer/#"), HEALTH_STAT_TOPIC("summarized-health-stats/#"), INSTANCE_STATUS_TOPIC("instance/status/#"), INSTANCE_NOTIFIER_TOPIC("instance/notifier/#"), @@ -248,4 +249,4 @@ public class MessagingUtil { return UUID.randomUUID().toString().replace(HYPHEN_MINUS, EMPTY_SPACE).substring(START_INDEX, len); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index 54a7421..9c1159c 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -18,18 +18,15 @@ import threading -from subscriber import EventSubscriber import publisher -from modules.event.instance.notifier.events import * -from modules.event.tenant.events import * -from modules.event.topology.events import * +from logpublisher import * from modules.event.application.signup.events import * from modules.event.domain.mapping.events import * -from entity import * -from logpublisher import * -from config import Config from modules.event.eventhandler import EventHandler -import constants +from modules.event.instance.notifier.events import * +from modules.event.tenant.events import * +from modules.event.topology.events import * +from subscriber import EventSubscriber class CartridgeAgent(threading.Thread): @@ -71,6 +68,9 @@ class CartridgeAgent(threading.Thread): else: self.__event_handler.create_dummy_interface() + # request complete topology event from CC by publishing CompleteTopologyRequestEvent + publisher.publish_complete_topology_request_event() + # wait until complete topology message is received to get LB IP self.wait_for_complete_topology() @@ -88,6 +88,9 @@ class CartridgeAgent(threading.Thread): # start application signup event listener self.register_application_signup_event_listeners() + # request complete tenant event from CC by publishing CompleteTenantRequestEvent + publisher.publish_complete_tenant_request_event() + # Execute instance started shell script self.__event_handler.on_instance_started_event() http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py index 301cd47..8c7a7b0 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py @@ -82,6 +82,7 @@ TENANT_REPO_PATH = "tenant.repository.path" INSTANCE_NOTIFIER_TOPIC = "instance/#" HEALTH_STAT_TOPIC = "health/#" TOPOLOGY_TOPIC = "topology/#" +INITIALIZER_TOPIC = "initializer/" TENANT_TOPIC = "tenant/#" INSTANCE_STATUS_TOPIC = "instance/status/" APPLICATION_SIGNUP = "application/signup/#" @@ -98,6 +99,8 @@ INSTANCE_READY_TO_SHUTDOWN_EVENT = "InstanceReadyToShutdownEvent" INSTANCE_CLEANUP_CLUSTER_EVENT = "InstanceCleanupClusterEvent" INSTANCE_CLEANUP_MEMBER_EVENT = "InstanceCleanupMemberEvent" COMPLETE_TOPOLOGY_EVENT = "CompleteTopologyEvent" +COMPLETE_TOPOLOGY_REQUEST_EVENT = "CompleteTopologyRequestEvent" +COMPLETE_TENANT_REQUEST_EVENT = "CompleteTenantRequestEvent" COMPLETE_TENANT_EVENT = "CompleteTenantEvent" DOMAIN_MAPPING_ADDED_EVENT = "DomainMappingAddedEvent" DOMAIN_MAPPING_REMOVED_EVENT = "DomainMappingRemovedEvent" http://git-wip-us.apache.org/repos/asf/stratos/blob/60b80114/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py index db35987..f14e4af 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/status/events.py @@ -108,6 +108,22 @@ class InstanceReadyToShutdownEvent: return to_json(self) +class CompleteTopologyRequestEvent: + def __init__(self): + pass + + def to_json(self): + return to_json(self) + + +class CompleteTenantRequestEvent: + def __init__(self): + pass + + def to_json(self): + return to_json(self) + + def to_json(instance): """ common function to serialize status event object @@ -115,4 +131,4 @@ def to_json(instance): :return: serialized json string :rtype str """ - return json.dumps(instance, default=lambda o: o.__dict__, sort_keys=True, indent=4) \ No newline at end of file + return json.dumps(instance, default=lambda o: o.__dict__, sort_keys=True, indent=4)
