Repository: incubator-stratos Updated Branches: refs/heads/master 270e3d1e4 -> e50c61e11
Introduced an event publisher pool to avoid large number of connections being created to message broker Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f3f2d136 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f3f2d136 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f3f2d136 Branch: refs/heads/master Commit: f3f2d136dcb26fe65568655b435fc27538348bc2 Parents: c59ac28 Author: Imesh Gunaratne <[email protected]> Authored: Fri Apr 18 22:43:21 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Fri Apr 18 22:43:21 2014 +0530 ---------------------------------------------------------------------- .../apache/stratos/cartridge/agent/Main.java | 20 +++++- .../publisher/CartridgeAgentEventPublisher.java | 10 +-- .../internal/CloudControllerDSComponent.java | 35 ++++++---- .../runtime/FasterLookUpDataHolder.java | 20 ------ .../topology/TopologyEventPublisher.java | 3 +- .../internal/ADCManagementServerComponent.java | 6 +- .../stratos/manager/internal/DataHolder.java | 12 ---- .../InstanceNotificationPublisher.java | 7 +- .../manager/publisher/TenantEventPublisher.java | 7 +- .../publisher/TenantSynzhronizerTask.java | 3 +- .../utils/CartridgeSubscriptionUtils.java | 5 +- .../broker/publish/EventPublisher.java | 2 +- .../broker/publish/EventPublisherPool.java | 68 ++++++++++++++++++++ .../broker/publish/TopicPublisher.java | 16 ++--- 14 files changed, 143 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java index 3bf73e7..a1be237 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java @@ -19,13 +19,15 @@ package org.apache.stratos.cartridge.agent; -import java.lang.reflect.Constructor; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.PropertyConfigurator; import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration; import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.util.Constants; + +import java.lang.reflect.Constructor; /** * Cartridge agent main class. @@ -37,6 +39,20 @@ public class Main { public static void main(String[] args) { try { + // Add shutdown hook + final Thread mainThread = Thread.currentThread(); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + try { + // Close event publisher connections to message broker + EventPublisherPool.close(Constants.INSTANCE_STATUS_TOPIC); + mainThread.join(); + } catch (Exception e) { + log.error(e); + } + } + }); + // Configure log4j properties if(log.isDebugEnabled()) { log.debug("Configuring log4j.properties file path"); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java index 367f61d..9c2e21f 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java @@ -27,6 +27,7 @@ import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration; import org.apache.stratos.cartridge.agent.statistics.publisher.HealthStatisticsNotifier; import org.apache.stratos.cartridge.agent.util.ExtensionUtils; import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent; import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent; @@ -55,7 +56,7 @@ public class CartridgeAgentEventPublisher { CartridgeAgentConfiguration.getInstance().getPartitionId(), CartridgeAgentConfiguration.getInstance().getMemberId()); - EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); eventPublisher.publish(event); setStarted(true); if (log.isInfoEnabled()) { @@ -82,7 +83,8 @@ public class CartridgeAgentEventPublisher { CartridgeAgentConfiguration.getInstance().getPartitionId(), CartridgeAgentConfiguration.getInstance().getMemberId()); - EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC); + // Event publisher connection will + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); eventPublisher.publish(event); if (log.isInfoEnabled()) { log.info("Instance activated event published"); @@ -118,7 +120,7 @@ public class CartridgeAgentEventPublisher { CartridgeAgentConfiguration.getInstance().getPartitionId(), CartridgeAgentConfiguration.getInstance().getMemberId()); - EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); eventPublisher.publish(event); setReadyToShutdown(true); if (log.isInfoEnabled()) { @@ -143,7 +145,7 @@ public class CartridgeAgentEventPublisher { CartridgeAgentConfiguration.getInstance().getPartitionId(), CartridgeAgentConfiguration.getInstance().getMemberId()); - EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC); eventPublisher.publish(event); setMaintenance(true); if (log.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java index 9cb2869..77e3ac4 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java @@ -32,19 +32,16 @@ import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusE import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; -import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; +import org.apache.stratos.messaging.util.Constants; import org.osgi.framework.BundleContext; import org.osgi.service.component.ComponentContext; -import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.registry.core.exceptions.RegistryException; import org.wso2.carbon.registry.core.service.RegistryService; import org.wso2.carbon.registry.core.session.UserRegistry; import org.wso2.carbon.utils.ConfigurationContextService; -import org.wso2.carbon.utils.multitenancy.MultitenantConstants; - -import java.util.List; /** * Registering Cloud Controller Service. @@ -57,7 +54,24 @@ import java.util.List; * interface= * "org.wso2.carbon.registry.core.service.RegistryService" * cardinality="1..1" policy="dynamic" bind="setRegistryService" - * unbind="unsetRegistryService" + * unbind="unsetR/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */egistryService" * @scr.reference name="config.context.service" * interface="org.wso2.carbon.utils.ConfigurationContextService" * cardinality="1..1" policy="dynamic" @@ -87,7 +101,7 @@ public class CloudControllerDSComponent { Thread tdelegator = new Thread(delegator); tdelegator.start(); - // Register cloud controller service + // Register cloud controller service E BundleContext bundleContext = context.getBundleContext(); bundleContext.registerService(CloudControllerService.class.getName(), new CloudControllerServiceImpl(), null); @@ -151,11 +165,8 @@ public class CloudControllerDSComponent { } protected void deactivate(ComponentContext ctx) { - - List<EventPublisher> publishers = dataHolder.getAllEventPublishers(); - for (EventPublisher topicPublisher : publishers) { - topicPublisher.close(); - } + // Close event publisher connections to message broker + EventPublisherPool.close(Constants.TOPOLOGY_TOPIC); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java index 970e2c0..9b05b5d 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java @@ -22,12 +22,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.pojo.*; import org.apache.stratos.cloud.controller.registry.RegistryManager; -import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -88,12 +86,6 @@ public class FasterLookUpDataHolder implements Serializable{ private transient DataPublisherConfig dataPubConfig; private boolean enableTopologySync; private transient TopologyConfig topologyConfig; - - /** - * Key - name of the topic - * Value - corresponding EventPublisher - */ - private transient Map<String, EventPublisher> topicToPublisherMap = new HashMap<String, EventPublisher>(); private transient AsyncDataPublisher dataPublisher; private String streamId; @@ -244,18 +236,6 @@ public class FasterLookUpDataHolder implements Serializable{ public void setTopologyConfig(TopologyConfig topologyConfig) { this.topologyConfig = topologyConfig; } - - public EventPublisher getEventPublisher(String topic){ - return topicToPublisherMap.get(topic); - } - - public List<EventPublisher> getAllEventPublishers() { - return new ArrayList<EventPublisher>(topicToPublisherMap.values()); - } - - public void addEventPublisher(EventPublisher publisher, String topicName) { - topicToPublisherMap.put(topicName, publisher); - } public DataPublisherConfig getDataPubConfig() { return dataPubConfig; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java index c039af7..86237d8 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java @@ -23,6 +23,7 @@ import org.apache.stratos.cloud.controller.pojo.Cartridge; import org.apache.stratos.cloud.controller.pojo.ClusterContext; import org.apache.stratos.cloud.controller.pojo.PortMapping; import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Port; import org.apache.stratos.messaging.domain.topology.ServiceType; @@ -162,7 +163,7 @@ public class TopologyEventPublisher { } public static void publishEvent(Event event) { - EventPublisher eventPublisher = new EventPublisher(Constants.TOPOLOGY_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TOPOLOGY_TOPIC); eventPublisher.publish(event); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java index c843fdb..6857bd6 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java @@ -26,7 +26,7 @@ import org.apache.stratos.manager.publisher.TenantSynchronizerTaskScheduler; import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver; import org.apache.stratos.manager.utils.CartridgeConfigFileReader; -import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; import org.apache.stratos.messaging.util.Constants; import org.osgi.service.component.ComponentContext; @@ -65,7 +65,6 @@ public class ADCManagementServerComponent { protected void activate(ComponentContext componentContext) throws Exception { try { CartridgeConfigFileReader.readProperties(); - DataHolder.setEventPublisher(new EventPublisher(Constants.INSTANCE_NOTIFIER_TOPIC)); // Schedule complete tenant event synchronizer if(log.isDebugEnabled()) { @@ -172,6 +171,9 @@ public class ADCManagementServerComponent { } protected void deactivate(ComponentContext context) { + // Close event publisher connections to message broker + EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC); + EventPublisherPool.close(Constants.TENANT_TOPIC); //terminate Stratos Manager Topology Receiver stratosManagerTopologyEventReceiver.terminate(); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java index 496a54c..07f21de 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java @@ -20,7 +20,6 @@ package org.apache.stratos.manager.internal; import org.apache.axis2.context.ConfigurationContext; -import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.wso2.carbon.registry.core.service.RegistryService; import org.wso2.carbon.user.core.service.RealmService; import org.wso2.carbon.utils.CarbonUtils; @@ -34,8 +33,6 @@ public class DataHolder { private static RealmService realmService; private static RegistryService registryService; - //private static TopologyManagementService topologyMgtService; - private static EventPublisher eventPublisher; public static RealmService getRealmService() { return realmService; @@ -70,13 +67,4 @@ public class DataHolder { public static void setRegistryService(RegistryService registryService) { DataHolder.registryService = registryService; } - - public static EventPublisher getEventPublisher() { - return eventPublisher; - } - - public static void setEventPublisher(EventPublisher eventPublisher) { - DataHolder.eventPublisher = eventPublisher; - } - } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java index f8aea72..e10d4ff 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java @@ -20,13 +20,14 @@ package org.apache.stratos.manager.publisher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.manager.internal.DataHolder; import org.apache.stratos.manager.repository.Repository; import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; -import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent; +import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; +import org.apache.stratos.messaging.util.Constants; /** * Creating the relevant instance notification event and publish it to the instances. @@ -38,7 +39,7 @@ public class InstanceNotificationPublisher { } private void publish(Event event) { - EventPublisher depsyncEventPublisher = DataHolder.getEventPublisher(); + EventPublisher depsyncEventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC); depsyncEventPublisher.publish(event); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java index 2df631a..8213ed9 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java @@ -25,6 +25,7 @@ import org.apache.stratos.common.beans.TenantInfoBean; import org.apache.stratos.common.exception.StratosException; import org.apache.stratos.common.listeners.TenantMgtListener; import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent; import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent; @@ -49,7 +50,7 @@ public class TenantEventPublisher implements TenantMgtListener { } Tenant tenant = new Tenant(tenantInfo.getTenantId(), tenantInfo.getTenantDomain()); TenantCreatedEvent event = new TenantCreatedEvent(tenant); - EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); eventPublisher.publish(event); } catch (Exception e) { @@ -64,7 +65,7 @@ public class TenantEventPublisher implements TenantMgtListener { log.info(String.format("Publishing tenant updated event: [tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), tenantInfo.getTenantDomain())); } TenantUpdatedEvent event = new TenantUpdatedEvent(tenantInfo.getTenantId(), tenantInfo.getTenantDomain()); - EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); eventPublisher.publish(event); } catch (Exception e) { @@ -79,7 +80,7 @@ public class TenantEventPublisher implements TenantMgtListener { log.info(String.format("Publishing tenant removed event: [tenant-id] %d", tenantId)); } TenantRemovedEvent event = new TenantRemovedEvent(tenantId); - EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); eventPublisher.publish(event); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java index af5cd5f..3eac3f5 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java @@ -25,6 +25,7 @@ import org.apache.stratos.manager.internal.DataHolder; import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; import org.apache.stratos.manager.subscription.CartridgeSubscription; import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent; import org.apache.stratos.messaging.util.Constants; @@ -81,7 +82,7 @@ public class TenantSynzhronizerTask implements Task { tenants.add(tenant); } CompleteTenantEvent event = new CompleteTenantEvent(tenants); - EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); eventPublisher.publish(event); } catch (Exception e) { if (log.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java index cd50fd8..a5c5517 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java @@ -39,6 +39,7 @@ import org.apache.stratos.manager.repository.Repository; import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; import org.apache.stratos.manager.subscriber.Subscriber; import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent; import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent; import org.apache.stratos.messaging.util.Constants; @@ -166,7 +167,7 @@ public class CartridgeSubscriptionUtils { log.info(String.format("Publishing tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName)); } TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName); - EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); eventPublisher.publish(subscribedEvent); } catch (Exception e) { if (log.isErrorEnabled()) { @@ -196,7 +197,7 @@ public class CartridgeSubscriptionUtils { log.info(String.format("Publishing tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName)); } TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName); - EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); eventPublisher.publish(event); } catch (Exception e) { if (log.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java index 1e11142..5d39956 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java @@ -34,7 +34,7 @@ public class EventPublisher extends TopicPublisher { /** * @param topicName topic name of this publisher instance. */ - public EventPublisher(String topicName) { + EventPublisher(String topicName) { super(topicName); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java new file mode 100644 index 0000000..175d09b --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.broker.publish; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Event publisher instance pool. + */ +public class EventPublisherPool { + private static final Log log = LogFactory.getLog(EventPublisherPool.class); + private static Map<String, EventPublisher> topicNameEventPublisherMap = new HashMap<String, EventPublisher>(); + + public static EventPublisher getPublisher(String topicName) { + synchronized (EventPublisherPool.class) { + if(topicNameEventPublisherMap.containsKey(topicName)) { + if(log.isDebugEnabled()) { + log.debug(String.format("Event publisher fetched from pool: [topic] %s", topicName)); + } + return topicNameEventPublisherMap.get(topicName); + } + EventPublisher eventPublisher = new EventPublisher(topicName); + topicNameEventPublisherMap.put(topicName, eventPublisher); + if(log.isDebugEnabled()) { + log.debug(String.format("Event publisher instance created: [topic] %s", topicName)); + } + return eventPublisher; + } + } + + public static void close(String topicName) { + synchronized (EventPublisherPool.class) { + if(topicNameEventPublisherMap.containsKey(topicName)) { + topicNameEventPublisherMap.get(topicName).close(); + topicNameEventPublisherMap.remove(topicName); + if(log.isDebugEnabled()) { + log.debug(String.format("Event publisher closed and removed from pool: [topic] %s", topicName)); + } + } + else { + if(log.isWarnEnabled()) { + log.warn(String.format("Event publisher instance not found in pool: [topic] %s", topicName)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java index 6614e75..004be13 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java @@ -19,18 +19,18 @@ package org.apache.stratos.messaging.broker.publish; -import java.util.Enumeration; -import java.util.Properties; - -import javax.jms.*; - +import com.google.gson.Gson; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.connect.TopicConnector; import org.apache.stratos.messaging.publish.MessagePublisher; -import com.google.gson.Gson; -import org.apache.stratos.messaging.util.Constants; +import javax.jms.JMSException; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSession; +import java.util.Enumeration; +import java.util.Properties; /** * Any instance who needs to publish data to a topic, should communicate with @@ -53,7 +53,7 @@ public class TopicPublisher extends MessagePublisher { * @param aTopicName * topic name of this publisher instance. */ - public TopicPublisher(String aTopicName) { + TopicPublisher(String aTopicName) { super(aTopicName); connector = new TopicConnector(); }
