Updated Branches: refs/heads/master 07f805074 -> 19d950035
refactoring ArtifactUpdatePublisher and adding instance clean up notification event publishing Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/19d95003 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/19d95003 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/19d95003 Branch: refs/heads/master Commit: 19d950035fb4ed5c462498d2cdbb80d58bd53cde Parents: 07f8050 Author: rekathiru <[email protected]> Authored: Thu Jan 9 21:46:35 2014 +0530 Committer: rekathiru <[email protected]> Committed: Thu Jan 9 21:46:35 2014 +0530 ---------------------------------------------------------------------- .../listener/InstanceStatusListener.java | 8 +- .../manager/CartridgeSubscriptionManager.java | 2 +- .../publisher/ArtifactUpdatePublisher.java | 65 ------- .../InstanceNotificationPublisher.java | 76 ++++++++ .../InstanceCleanupNotificationService.java | 37 ++++ .../service/RepoNotificationService.java | 179 ++++++++++--------- .../SubscriptionMultiTenantBehaviour.java | 6 +- 7 files changed, 211 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/19d95003/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java index 78338c1..b3483ab 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java @@ -20,7 +20,7 @@ package org.apache.stratos.manager.listener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.manager.publisher.ArtifactUpdatePublisher; +import org.apache.stratos.manager.publisher.InstanceNotificationPublisher; import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; import org.apache.stratos.manager.subscription.CartridgeSubscription; import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; @@ -62,7 +62,7 @@ public class InstanceStatusListener implements MessageListener { /*CartridgeSubscriptionInfo subscription = PersistenceManager.getSubscriptionFromClusterId(clusterId); if (subscription.getRepository() != null) { - ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(subscription.getRepository(), clusterId, String.valueOf(subscription.getTenantId())); + InstanceNotificationPublisher publisher = new InstanceNotificationPublisher(subscription.getRepository(), clusterId, String.valueOf(subscription.getTenantId())); publisher.publish(); } else { @@ -78,9 +78,9 @@ public class InstanceStatusListener implements MessageListener { // send an ArtifactUpdatedEvent event. If this is a multitenant cartridge, sending this event // will be done in SubscriptionMultiTenantBehaviour#createSubscription method if (!cartridgeSubscription.getCartridgeInfo().getMultiTenant() && cartridgeSubscription.getRepository() != null) { - ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(cartridgeSubscription.getRepository(), clusterId, + InstanceNotificationPublisher publisher = new InstanceNotificationPublisher(); + publisher.sendArtifactUpdateEvent(cartridgeSubscription.getRepository(), clusterId, String.valueOf(cartridgeSubscription.getSubscriber().getTenantId())); - publisher.publish(); } else { if(log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/19d95003/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java index a8aa6fc..fbcb76b 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java @@ -218,7 +218,7 @@ public class CartridgeSubscriptionManager { log.info(" Multitenant --> Publishing Artifact update event -- "); log.info(" Values : cluster id - " + cartridgeSubscription.getClusterDomain() + " tenant - " + cartridgeSubscription.getSubscriber().getTenantId()); - ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(cartridgeSubscription.getRepository(), + InstanceNotificationPublisher publisher = new InstanceNotificationPublisher(cartridgeSubscription.getRepository(), cartridgeSubscription.getClusterDomain(), // clusterId String.valueOf(cartridgeSubscription.getSubscriber().getTenantId())); publisher.publish(); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/19d95003/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/ArtifactUpdatePublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/ArtifactUpdatePublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/ArtifactUpdatePublisher.java deleted file mode 100644 index 6cb2022..0000000 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/ArtifactUpdatePublisher.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.manager.publisher; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.manager.internal.DataHolder; -import org.apache.stratos.manager.repository.Repository; -import org.apache.stratos.messaging.broker.publish.EventPublisher; -import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; - -public class ArtifactUpdatePublisher { - - private static final Log log = LogFactory.getLog(ArtifactUpdatePublisher.class); - - private Repository repository; - private String clusterId; - private String tenantId; - - public ArtifactUpdatePublisher(Repository repository, String clusterId, String tenantId) { - this.repository = repository; - this.clusterId = clusterId; - this.tenantId = tenantId; - - } - - public void publish() { - EventPublisher depsyncEventPublisher = DataHolder.getEventPublisher(); - log.info("publishing ** "); - depsyncEventPublisher.publish(createArtifactUpdateEvent()); - } - - private ArtifactUpdatedEvent createArtifactUpdateEvent() { - ArtifactUpdatedEvent artifactUpdateEvent = new ArtifactUpdatedEvent(); - artifactUpdateEvent.setClusterId(clusterId); - artifactUpdateEvent.setRepoUserName(repository.getUserName()); - artifactUpdateEvent.setRepoPassword(repository.getPassword()); - artifactUpdateEvent.setRepoURL(repository.getUrl()); - artifactUpdateEvent.setTenantId(tenantId); - - log.info("Creating artifact updated event "); - log.info("cluster Id : " + artifactUpdateEvent.getClusterId()); - log.info("repo url : " + artifactUpdateEvent.getRepoURL()); - log.info("repo username : " + artifactUpdateEvent.getRepoUserName()); - log.info("repo pwd : " + artifactUpdateEvent.getRepoPassword()); - log.info("tenant Id : " + artifactUpdateEvent.getTenantId()); - return artifactUpdateEvent; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/19d95003/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java new file mode 100644 index 0000000..662bb0d --- /dev/null +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.manager.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.manager.internal.DataHolder; +import org.apache.stratos.manager.repository.Repository; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; +import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupEvent; + + +/** + * Creating the relevant instance notification event and publish it to the instances. + */ +public class InstanceNotificationPublisher { + private static final Log log = LogFactory.getLog(InstanceNotificationPublisher.class); + + public InstanceNotificationPublisher() { + } + + private void publish(Event event) { + EventPublisher depsyncEventPublisher = DataHolder.getEventPublisher(); + log.info("publishing ** "); + depsyncEventPublisher.publish(event); + } + + /** + * Publishing the artifact update event to the instances + * + * @param repository + * @param clusterId + * @param tenantId + */ + public void sendArtifactUpdateEvent(Repository repository, String clusterId, String tenantId) { + ArtifactUpdatedEvent artifactUpdateEvent = new ArtifactUpdatedEvent(); + artifactUpdateEvent.setClusterId(clusterId); + artifactUpdateEvent.setRepoUserName(repository.getUserName()); + artifactUpdateEvent.setRepoPassword(repository.getPassword()); + artifactUpdateEvent.setRepoURL(repository.getUrl()); + artifactUpdateEvent.setTenantId(tenantId); + + log.info(String.format("Publishing artifact updated event: [cluster] %s " + + "[repo-URL] %s [repo-username] %s [repo-password] %s [tenant-id] %s", + clusterId, repository.getUrl(), repository.getUserName(), repository.getPassword(), tenantId)); + publish(artifactUpdateEvent); + } + + /** + * Publishing the instance termination notification to the instances + * + * @param memberId + */ + public void sendInstanceCleanupEvent(String memberId) { + log.info(String.format("Publishing Instance Cleanup Event: [member] %s", memberId)); + publish(new InstanceCleanupEvent(memberId)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/19d95003/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/InstanceCleanupNotificationService.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/InstanceCleanupNotificationService.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/InstanceCleanupNotificationService.java new file mode 100644 index 0000000..bf6ecfd --- /dev/null +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/InstanceCleanupNotificationService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.manager.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.manager.publisher.InstanceNotificationPublisher; +import org.wso2.carbon.core.AbstractAdmin; + +/** + * This service will get invoked by Autoscaler when it trying to scale down an instance + * in order for the instance to perform clean up task before the actual termination. + */ +public class InstanceCleanupNotificationService extends AbstractAdmin { + private static final Log log = LogFactory.getLog(InstanceCleanupNotificationService.class); + + public void sendInstanceCleanupNotificationOnTermination(String memberId) { + //sending the notification event to the instance + new InstanceNotificationPublisher().sendInstanceCleanupEvent(memberId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/19d95003/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/RepoNotificationService.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/RepoNotificationService.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/RepoNotificationService.java index 28a627c..efe36bb 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/RepoNotificationService.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/service/RepoNotificationService.java @@ -19,9 +19,6 @@ package org.apache.stratos.manager.service; -import java.util.List; -import java.util.UUID; - import org.apache.axis2.clustering.ClusteringAgent; import org.apache.axis2.clustering.ClusteringFault; import org.apache.axis2.clustering.management.GroupManagementAgent; @@ -29,99 +26,103 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.manager.dao.CartridgeSubscriptionInfo; import org.apache.stratos.manager.internal.DataHolder; -import org.apache.stratos.manager.publisher.ArtifactUpdatePublisher; +import org.apache.stratos.manager.publisher.InstanceNotificationPublisher; import org.apache.stratos.manager.utils.CartridgeConstants; import org.apache.stratos.manager.utils.PersistenceManager; import org.wso2.carbon.core.deployment.SynchronizeGitRepositoryRequest; +import java.util.List; +import java.util.UUID; + public class RepoNotificationService { - private static final Log log = LogFactory.getLog(RepoNotificationService.class); - - - public void notifyRepoUpdate(String tenantDomain, String cartridgeAlias) throws Exception { - // FIXME Throwing generic Exception is wrong - log.info("Updating repository of tenant : " + tenantDomain + " , cartridge: " + - cartridgeAlias); - - CartridgeSubscriptionInfo subscription = null; - try { - subscription = PersistenceManager.getSubscription(tenantDomain, cartridgeAlias); - } catch (Exception e) { - String msg = "Failed to find subscription for " + cartridgeAlias + ". " - + (e.getMessage() != null ? e.getMessage() : ""); - log.error(msg, e); - throw new Exception(msg, e); - } - - if (subscription == null) { - String msg = "Tenant " + tenantDomain + " is not subscribed for " + cartridgeAlias; - log.error(msg); - throw new Exception("You have not subscribed for " + cartridgeAlias); - } - - try { - handleRepoSynch(subscription); - } catch (Exception e) { - String msg = "Failed to synchronize the repository for " + cartridgeAlias + ". " - + (e.getMessage() != null ? e.getMessage() : ""); - log.error(msg, e); - throw new Exception(msg, e); - } - - } - - public void synchronize(String repositoryURL) throws Exception { - - log.info(" repository URL received : " + repositoryURL); - List<CartridgeSubscriptionInfo> subscription = PersistenceManager.getSubscription(repositoryURL); - for (CartridgeSubscriptionInfo cartridgeSubscriptionInfo : subscription) { - handleRepoSynch(cartridgeSubscriptionInfo); + private static final Log log = LogFactory.getLog(RepoNotificationService.class); + + + public void notifyRepoUpdate(String tenantDomain, String cartridgeAlias) throws Exception { + // FIXME Throwing generic Exception is wrong + log.info("Updating repository of tenant : " + tenantDomain + " , cartridge: " + + cartridgeAlias); + + CartridgeSubscriptionInfo subscription = null; + try { + subscription = PersistenceManager.getSubscription(tenantDomain, cartridgeAlias); + } catch (Exception e) { + String msg = "Failed to find subscription for " + cartridgeAlias + ". " + + (e.getMessage() != null ? e.getMessage() : ""); + log.error(msg, e); + throw new Exception(msg, e); + } + + if (subscription == null) { + String msg = "Tenant " + tenantDomain + " is not subscribed for " + cartridgeAlias; + log.error(msg); + throw new Exception("You have not subscribed for " + cartridgeAlias); + } + + try { + handleRepoSynch(subscription); + } catch (Exception e) { + String msg = "Failed to synchronize the repository for " + cartridgeAlias + ". " + + (e.getMessage() != null ? e.getMessage() : ""); + log.error(msg, e); + throw new Exception(msg, e); + } + + } + + public void synchronize(String repositoryURL) throws Exception { + + log.info(" repository URL received : " + repositoryURL); + List<CartridgeSubscriptionInfo> subscription = PersistenceManager.getSubscription(repositoryURL); + for (CartridgeSubscriptionInfo cartridgeSubscriptionInfo : subscription) { + handleRepoSynch(cartridgeSubscriptionInfo); + } + } + + private void handleRepoSynch(CartridgeSubscriptionInfo subscription) throws Exception { + if (subscription == null) { + throw new Exception("Cannot synchronize repository. subscription is null"); } - } - - private void handleRepoSynch(CartridgeSubscriptionInfo subscription) throws Exception { - if (subscription == null) { - throw new Exception("Cannot synchronize repository. subscription is null"); - } - - if (CartridgeConstants.PROVIDER_NAME_WSO2.equals(subscription.getProvider())) { - createAndSendClusterMessage(subscription.getTenantId(), subscription.getTenantDomain(), - UUID.randomUUID(), subscription.getClusterDomain(), - subscription.getClusterSubdomain()); - - } else { - new ArtifactUpdatePublisher(subscription.getRepository(), - subscription.getClusterDomain(), - String.valueOf(subscription.getTenantId())).publish();; - } - } - - private void createAndSendClusterMessage(int tenantId, String tenantDomain, UUID uuid, - String clusterDomain, String clusterSubdomain) { - - SynchronizeGitRepositoryRequest request = - new SynchronizeGitRepositoryRequest(tenantId, - tenantDomain, - uuid); - - ClusteringAgent clusteringAgent = - DataHolder.getServerConfigContext() - .getAxisConfiguration().getClusteringAgent(); - GroupManagementAgent groupMgtAgent = - clusteringAgent.getGroupManagementAgent(clusterDomain, - clusterSubdomain); - - try { - log.info("Sending Request to.. " + clusterDomain + " : " + clusterSubdomain); - groupMgtAgent.send(request); - - } catch (ClusteringFault e) { - e.printStackTrace(); - } - - - } + + if (CartridgeConstants.PROVIDER_NAME_WSO2.equals(subscription.getProvider())) { + createAndSendClusterMessage(subscription.getTenantId(), subscription.getTenantDomain(), + UUID.randomUUID(), subscription.getClusterDomain(), + subscription.getClusterSubdomain()); + + } else { + InstanceNotificationPublisher notificationHandler = new InstanceNotificationPublisher(); + notificationHandler.sendArtifactUpdateEvent(subscription.getRepository(), + subscription.getClusterDomain(), + String.valueOf(subscription.getTenantId())); + } + } + + private void createAndSendClusterMessage(int tenantId, String tenantDomain, UUID uuid, + String clusterDomain, String clusterSubdomain) { + + SynchronizeGitRepositoryRequest request = + new SynchronizeGitRepositoryRequest(tenantId, + tenantDomain, + uuid); + + ClusteringAgent clusteringAgent = + DataHolder.getServerConfigContext() + .getAxisConfiguration().getClusteringAgent(); + GroupManagementAgent groupMgtAgent = + clusteringAgent.getGroupManagementAgent(clusterDomain, + clusterSubdomain); + + try { + log.info("Sending Request to.. " + clusterDomain + " : " + clusterSubdomain); + groupMgtAgent.send(request); + + } catch (ClusteringFault e) { + e.printStackTrace(); + } + + + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/19d95003/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java index 00caf16..b28ff36 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/tenancy/SubscriptionMultiTenantBehaviour.java @@ -26,7 +26,7 @@ import org.apache.stratos.manager.exception.ADCException; import org.apache.stratos.manager.exception.AlreadySubscribedException; import org.apache.stratos.manager.exception.NotSubscribedException; import org.apache.stratos.manager.exception.UnregisteredCartridgeException; -import org.apache.stratos.manager.publisher.ArtifactUpdatePublisher; +import org.apache.stratos.manager.publisher.InstanceNotificationPublisher; import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; import org.apache.stratos.manager.subscription.CartridgeSubscription; import org.apache.stratos.manager.utils.CartridgeConstants; @@ -75,10 +75,10 @@ public class SubscriptionMultiTenantBehaviour extends SubscriptionTenancyBehavio log.info(" Multitenant --> Publishing Artifact update event -- "); log.info(" Values : cluster id - " + cartridgeSubscription.getClusterDomain() + " tenant - " + cartridgeSubscription.getSubscriber().getTenantId()); - ArtifactUpdatePublisher publisher = new ArtifactUpdatePublisher(cartridgeSubscription.getRepository(), + InstanceNotificationPublisher publisher = new InstanceNotificationPublisher(); + publisher.sendArtifactUpdateEvent(cartridgeSubscription.getRepository(), cartridgeSubscription.getClusterDomain(), // clusterId String.valueOf(cartridgeSubscription.getSubscriber().getTenantId())); - publisher.publish(); } else { if(log.isDebugEnabled()) {
