Repository: incubator-stratos
Updated Branches:
  refs/heads/master 270e3d1e4 -> e50c61e11


Introduced an event publisher pool to avoid large number of connections  being 
created to message broker


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/f3f2d136
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/f3f2d136
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/f3f2d136

Branch: refs/heads/master
Commit: f3f2d136dcb26fe65568655b435fc27538348bc2
Parents: c59ac28
Author: Imesh Gunaratne <[email protected]>
Authored: Fri Apr 18 22:43:21 2014 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Fri Apr 18 22:43:21 2014 +0530

----------------------------------------------------------------------
 .../apache/stratos/cartridge/agent/Main.java    | 20 +++++-
 .../publisher/CartridgeAgentEventPublisher.java | 10 +--
 .../internal/CloudControllerDSComponent.java    | 35 ++++++----
 .../runtime/FasterLookUpDataHolder.java         | 20 ------
 .../topology/TopologyEventPublisher.java        |  3 +-
 .../internal/ADCManagementServerComponent.java  |  6 +-
 .../stratos/manager/internal/DataHolder.java    | 12 ----
 .../InstanceNotificationPublisher.java          |  7 +-
 .../manager/publisher/TenantEventPublisher.java |  7 +-
 .../publisher/TenantSynzhronizerTask.java       |  3 +-
 .../utils/CartridgeSubscriptionUtils.java       |  5 +-
 .../broker/publish/EventPublisher.java          |  2 +-
 .../broker/publish/EventPublisherPool.java      | 68 ++++++++++++++++++++
 .../broker/publish/TopicPublisher.java          | 16 ++---
 14 files changed, 143 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
index 3bf73e7..a1be237 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
@@ -19,13 +19,15 @@
 
 package org.apache.stratos.cartridge.agent;
 
-import java.lang.reflect.Constructor;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
 import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.lang.reflect.Constructor;
 
 /**
  * Cartridge agent main class.
@@ -37,6 +39,20 @@ public class Main {
 
     public static void main(String[] args) {
         try {
+            // Add shutdown hook
+            final Thread mainThread = Thread.currentThread();
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    try {
+                        // Close event publisher connections to message broker
+                        
EventPublisherPool.close(Constants.INSTANCE_STATUS_TOPIC);
+                        mainThread.join();
+                    } catch (Exception e) {
+                        log.error(e);
+                    }
+                }
+            });
+
             // Configure log4j properties
             if(log.isDebugEnabled()) {
                 log.debug("Configuring log4j.properties file path");

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
index 367f61d..9c2e21f 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
@@ -27,6 +27,7 @@ import 
org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
 import 
org.apache.stratos.cartridge.agent.statistics.publisher.HealthStatisticsNotifier;
 import org.apache.stratos.cartridge.agent.util.ExtensionUtils;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import 
org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
 import 
org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
 import 
org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
@@ -55,7 +56,7 @@ public class CartridgeAgentEventPublisher {
                     CartridgeAgentConfiguration.getInstance().getPartitionId(),
                     CartridgeAgentConfiguration.getInstance().getMemberId());
 
-            EventPublisher eventPublisher = new 
EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
             eventPublisher.publish(event);
             setStarted(true);
             if (log.isInfoEnabled()) {
@@ -82,7 +83,8 @@ public class CartridgeAgentEventPublisher {
                     CartridgeAgentConfiguration.getInstance().getPartitionId(),
                     CartridgeAgentConfiguration.getInstance().getMemberId());
 
-            EventPublisher eventPublisher = new 
EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+            // Event publisher connection will
+            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
             eventPublisher.publish(event);
             if (log.isInfoEnabled()) {
                 log.info("Instance activated event published");
@@ -118,7 +120,7 @@ public class CartridgeAgentEventPublisher {
                     CartridgeAgentConfiguration.getInstance().getPartitionId(),
                     CartridgeAgentConfiguration.getInstance().getMemberId());
 
-            EventPublisher eventPublisher = new 
EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
             eventPublisher.publish(event);
             setReadyToShutdown(true);
             if (log.isInfoEnabled()) {
@@ -143,7 +145,7 @@ public class CartridgeAgentEventPublisher {
                     CartridgeAgentConfiguration.getInstance().getPartitionId(),
                     CartridgeAgentConfiguration.getInstance().getMemberId());
 
-            EventPublisher eventPublisher = new 
EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
             eventPublisher.publish(event);
             setMaintenance(true);
             if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index 9cb2869..77e3ac4 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -32,19 +32,16 @@ import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusE
 import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.util.Constants;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.ComponentContext;
-import org.wso2.carbon.context.PrivilegedCarbonContext;
 import org.wso2.carbon.ntask.core.service.TaskService;
 import org.wso2.carbon.registry.core.exceptions.RegistryException;
 import org.wso2.carbon.registry.core.service.RegistryService;
 import org.wso2.carbon.registry.core.session.UserRegistry;
 import org.wso2.carbon.utils.ConfigurationContextService;
-import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
-
-import java.util.List;
 
 /**
  * Registering Cloud Controller Service.
@@ -57,7 +54,24 @@ import java.util.List;
  *                interface=
  *                "org.wso2.carbon.registry.core.service.RegistryService"
  *                cardinality="1..1" policy="dynamic" bind="setRegistryService"
- *                unbind="unsetRegistryService"   
+ *                unbind="unsetR/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */egistryService"
  * @scr.reference name="config.context.service"
  *                interface="org.wso2.carbon.utils.ConfigurationContextService"
  *                cardinality="1..1" policy="dynamic"
@@ -87,7 +101,7 @@ public class CloudControllerDSComponent {
             Thread tdelegator = new Thread(delegator);
             tdelegator.start();
                
-               // Register cloud controller service
+               // Register cloud controller service                            
                       E
             BundleContext bundleContext = context.getBundleContext();
             
bundleContext.registerService(CloudControllerService.class.getName(), new 
CloudControllerServiceImpl(), null);
 
@@ -151,11 +165,8 @@ public class CloudControllerDSComponent {
     }
        
        protected void deactivate(ComponentContext ctx) {
-
-               List<EventPublisher> publishers = 
dataHolder.getAllEventPublishers();
-               for (EventPublisher topicPublisher : publishers) {
-                       topicPublisher.close();
-               }
+        // Close event publisher connections to message broker
+        EventPublisherPool.close(Constants.TOPOLOGY_TOPIC);
        }
        
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
index 970e2c0..9b05b5d 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
@@ -22,12 +22,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.pojo.*;
 import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -88,12 +86,6 @@ public class FasterLookUpDataHolder implements Serializable{
        private transient DataPublisherConfig dataPubConfig;
        private boolean enableTopologySync;
        private transient TopologyConfig topologyConfig;
-       
-       /**
-     * Key - name of the topic
-     * Value - corresponding EventPublisher
-     */
-    private transient Map<String, EventPublisher> topicToPublisherMap = new 
HashMap<String, EventPublisher>();
 
        private transient AsyncDataPublisher dataPublisher;
        private String streamId;
@@ -244,18 +236,6 @@ public class FasterLookUpDataHolder implements 
Serializable{
        public void setTopologyConfig(TopologyConfig topologyConfig) {
                this.topologyConfig = topologyConfig;
        }
-       
-       public EventPublisher getEventPublisher(String topic){
-       return topicToPublisherMap.get(topic);
-    }
-       
-       public List<EventPublisher> getAllEventPublishers() {
-               return new 
ArrayList<EventPublisher>(topicToPublisherMap.values());
-       }
-       
-    public void addEventPublisher(EventPublisher publisher, String topicName) {
-        topicToPublisherMap.put(topicName, publisher);
-    }
 
     public DataPublisherConfig getDataPubConfig() {
         return dataPubConfig;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index c039af7..86237d8 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -23,6 +23,7 @@ import org.apache.stratos.cloud.controller.pojo.Cartridge;
 import org.apache.stratos.cloud.controller.pojo.ClusterContext;
 import org.apache.stratos.cloud.controller.pojo.PortMapping;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Port;
 import org.apache.stratos.messaging.domain.topology.ServiceType;
@@ -162,7 +163,7 @@ public class TopologyEventPublisher {
     }
 
     public static void publishEvent(Event event) {
-        EventPublisher eventPublisher = new 
EventPublisher(Constants.TOPOLOGY_TOPIC);
+        EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TOPOLOGY_TOPIC);
         eventPublisher.publish(event);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index c843fdb..6857bd6 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -26,7 +26,7 @@ import 
org.apache.stratos.manager.publisher.TenantSynchronizerTaskScheduler;
 import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
 import 
org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver;
 import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
 import org.apache.stratos.messaging.util.Constants;
 import org.osgi.service.component.ComponentContext;
@@ -65,7 +65,6 @@ public class ADCManagementServerComponent {
     protected void activate(ComponentContext componentContext) throws 
Exception {
                try {
                        CartridgeConfigFileReader.readProperties();
-                       DataHolder.setEventPublisher(new 
EventPublisher(Constants.INSTANCE_NOTIFIER_TOPIC));
 
             // Schedule complete tenant event synchronizer
             if(log.isDebugEnabled()) {
@@ -172,6 +171,9 @@ public class ADCManagementServerComponent {
     }
 
     protected void deactivate(ComponentContext context) {
+        // Close event publisher connections to message broker
+        EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC);
+        EventPublisherPool.close(Constants.TENANT_TOPIC);
 
         //terminate Stratos Manager Topology Receiver
         stratosManagerTopologyEventReceiver.terminate();

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
index 496a54c..07f21de 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
@@ -20,7 +20,6 @@
 package org.apache.stratos.manager.internal;
 
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.wso2.carbon.registry.core.service.RegistryService;
 import org.wso2.carbon.user.core.service.RealmService;
 import org.wso2.carbon.utils.CarbonUtils;
@@ -34,8 +33,6 @@ public class DataHolder {
 
        private static RealmService realmService;
        private static RegistryService registryService;
-       //private static TopologyManagementService topologyMgtService;
-       private static EventPublisher eventPublisher;
 
        public static RealmService getRealmService() {
                return realmService;
@@ -70,13 +67,4 @@ public class DataHolder {
        public static void setRegistryService(RegistryService registryService) {
                DataHolder.registryService = registryService;
        }
-
-       public static EventPublisher getEventPublisher() {
-               return eventPublisher;
-       }
-
-       public static void setEventPublisher(EventPublisher eventPublisher) {
-               DataHolder.eventPublisher = eventPublisher;
-       }       
-       
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
index f8aea72..e10d4ff 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
@@ -20,13 +20,14 @@ package org.apache.stratos.manager.publisher;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.internal.DataHolder;
 import org.apache.stratos.manager.repository.Repository;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.Event;
 import 
org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
-import 
org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
 import 
org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent;
+import 
org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
+import org.apache.stratos.messaging.util.Constants;
 
 /**
  * Creating the relevant instance notification event and publish it to the 
instances.
@@ -38,7 +39,7 @@ public class InstanceNotificationPublisher {
     }
 
     private void publish(Event event) {
-        EventPublisher depsyncEventPublisher = DataHolder.getEventPublisher();
+        EventPublisher depsyncEventPublisher = 
EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC);
         depsyncEventPublisher.publish(event);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
index 2df631a..8213ed9 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
@@ -25,6 +25,7 @@ import org.apache.stratos.common.beans.TenantInfoBean;
 import org.apache.stratos.common.exception.StratosException;
 import org.apache.stratos.common.listeners.TenantMgtListener;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent;
 import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent;
@@ -49,7 +50,7 @@ public class TenantEventPublisher implements 
TenantMgtListener {
                 }
                 Tenant tenant = new Tenant(tenantInfo.getTenantId(), 
tenantInfo.getTenantDomain());
                 TenantCreatedEvent event = new TenantCreatedEvent(tenant);
-                EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
+                EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
                 eventPublisher.publish(event);
             }
             catch (Exception e) {
@@ -64,7 +65,7 @@ public class TenantEventPublisher implements 
TenantMgtListener {
                     log.info(String.format("Publishing tenant updated event: 
[tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), 
tenantInfo.getTenantDomain()));
                 }
                 TenantUpdatedEvent event = new 
TenantUpdatedEvent(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
-                EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
+                EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
                 eventPublisher.publish(event);
             }
             catch (Exception e) {
@@ -79,7 +80,7 @@ public class TenantEventPublisher implements 
TenantMgtListener {
                     log.info(String.format("Publishing tenant removed event: 
[tenant-id] %d", tenantId));
                 }
                 TenantRemovedEvent event = new TenantRemovedEvent(tenantId);
-                EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
+                EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
                 eventPublisher.publish(event);
             }
             catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
index af5cd5f..3eac3f5 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
@@ -25,6 +25,7 @@ import org.apache.stratos.manager.internal.DataHolder;
 import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
 import org.apache.stratos.manager.subscription.CartridgeSubscription;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
 import org.apache.stratos.messaging.util.Constants;
@@ -81,7 +82,7 @@ public class TenantSynzhronizerTask implements Task {
                 tenants.add(tenant);
             }
             CompleteTenantEvent event = new CompleteTenantEvent(tenants);
-            EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
+            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
             eventPublisher.publish(event);
         } catch (Exception e) {
             if (log.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
index cd50fd8..a5c5517 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
@@ -39,6 +39,7 @@ import org.apache.stratos.manager.repository.Repository;
 import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
 import org.apache.stratos.manager.subscriber.Subscriber;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
 import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
 import org.apache.stratos.messaging.util.Constants;
@@ -166,7 +167,7 @@ public class CartridgeSubscriptionUtils {
                                        log.info(String.format("Publishing 
tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
                                }
                                TenantSubscribedEvent subscribedEvent = new 
TenantSubscribedEvent(tenantId, serviceName);
-                               EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
+                               EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
                                eventPublisher.publish(subscribedEvent);
                        } catch (Exception e) {
                                if (log.isErrorEnabled()) {
@@ -196,7 +197,7 @@ public class CartridgeSubscriptionUtils {
                 log.info(String.format("Publishing tenant un-subscribed event: 
[tenant-id] %d [service] %s", tenantId, serviceName));
             }
             TenantUnSubscribedEvent event = new 
TenantUnSubscribedEvent(tenantId, serviceName);
-            EventPublisher eventPublisher = new 
EventPublisher(Constants.TENANT_TOPIC);
+            EventPublisher eventPublisher = 
EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
             eventPublisher.publish(event);
         } catch (Exception e) {
             if (log.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 1e11142..5d39956 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -34,7 +34,7 @@ public class EventPublisher extends TopicPublisher {
     /**
      * @param topicName topic name of this publisher instance.
      */
-    public EventPublisher(String topicName) {
+    EventPublisher(String topicName) {
         super(topicName);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
new file mode 100644
index 0000000..175d09b
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.broker.publish;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Event publisher instance pool.
+ */
+public class EventPublisherPool {
+    private static final Log log = LogFactory.getLog(EventPublisherPool.class);
+    private static Map<String, EventPublisher> topicNameEventPublisherMap = 
new HashMap<String, EventPublisher>();
+
+    public static EventPublisher getPublisher(String topicName) {
+        synchronized (EventPublisherPool.class) {
+            if(topicNameEventPublisherMap.containsKey(topicName)) {
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Event publisher fetched from 
pool: [topic] %s", topicName));
+                }
+                return topicNameEventPublisherMap.get(topicName);
+            }
+            EventPublisher eventPublisher = new EventPublisher(topicName);
+            topicNameEventPublisherMap.put(topicName, eventPublisher);
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Event publisher instance created: 
[topic] %s", topicName));
+            }
+            return eventPublisher;
+        }
+    }
+
+    public static void close(String topicName) {
+        synchronized (EventPublisherPool.class) {
+            if(topicNameEventPublisherMap.containsKey(topicName)) {
+                topicNameEventPublisherMap.get(topicName).close();
+                topicNameEventPublisherMap.remove(topicName);
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Event publisher closed and 
removed from pool: [topic] %s", topicName));
+                }
+            }
+            else {
+                if(log.isWarnEnabled()) {
+                    log.warn(String.format("Event publisher instance not found 
in pool: [topic] %s", topicName));
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/f3f2d136/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 6614e75..004be13 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -19,18 +19,18 @@
 
 package org.apache.stratos.messaging.broker.publish;
 
-import java.util.Enumeration;
-import java.util.Properties;
-
-import javax.jms.*;
-
+import com.google.gson.Gson;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.TopicConnector;
 import org.apache.stratos.messaging.publish.MessagePublisher;
 
-import com.google.gson.Gson;
-import org.apache.stratos.messaging.util.Constants;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import java.util.Enumeration;
+import java.util.Properties;
 
 /**
  * Any instance who needs to publish data to a topic, should communicate with
@@ -53,7 +53,7 @@ public class TopicPublisher extends MessagePublisher {
         * @param aTopicName
         *            topic name of this publisher instance.
         */
-       public TopicPublisher(String aTopicName) {
+       TopicPublisher(String aTopicName) {
                super(aTopicName);
                connector = new TopicConnector();
        }

Reply via email to