http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java index dcb8be4..cc9f34f 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java @@ -1,18 +1,18 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -21,14 +21,11 @@ package org.apache.stratos.manager.internal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.manager.listener.InstanceStatusListener; -import org.apache.stratos.manager.listener.TenantUserRoleCreator; import org.apache.stratos.manager.publisher.TenantEventPublisher; import org.apache.stratos.manager.publisher.TenantSynchronizerTaskScheduler; import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager; import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver; import org.apache.stratos.manager.utils.CartridgeConfigFileReader; -import org.apache.stratos.manager.utils.UserRoleCreator; -import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; import org.apache.stratos.messaging.util.Constants; import org.osgi.service.component.ComponentContext; @@ -38,7 +35,9 @@ import org.wso2.carbon.user.core.service.RealmService; import org.wso2.carbon.utils.ConfigurationContextService; /** - * @scr.component name="org.wso2.carbon.hosting.mgt.internal.ADCManagementServerComponent" + * @scr.component + * name= + * "org.wso2.carbon.hosting.mgt.internal.ADCManagementServerComponent" * immediate="true" * @scr.reference name="config.context.service" * interface="org.wso2.carbon.utils.ConfigurationContextService" @@ -54,138 +53,141 @@ import org.wso2.carbon.utils.ConfigurationContextService; * "org.wso2.carbon.registry.core.service.RegistryService" * cardinality="1..1" policy="dynamic" bind="setRegistryService" * unbind="unsetRegistryService" - * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService" + * @scr.reference name="ntask.component" + * interface="org.wso2.carbon.ntask.core.service.TaskService" * cardinality="1..1" policy="dynamic" bind="setTaskService" * unbind="unsetTaskService" */ public class ADCManagementServerComponent { - private static final Log log = LogFactory.getLog(ADCManagementServerComponent.class); - private StratosManagerTopologyEventReceiver stratosManagerTopologyEventReceiver; + private static final Log log = LogFactory.getLog(ADCManagementServerComponent.class); + private StratosManagerTopologyEventReceiver stratosManagerTopologyEventReceiver; - protected void activate(ComponentContext componentContext) throws Exception { + protected void activate(ComponentContext componentContext) throws Exception { try { CartridgeConfigFileReader.readProperties(); - // Schedule complete tenant event synchronizer - if(log.isDebugEnabled()) { - log.debug("Scheduling tenant synchronizer task..."); - } - TenantSynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService()); - - // Register tenant event publisher - if(log.isDebugEnabled()) { - log.debug("Starting tenant event publisher..."); - } - TenantEventPublisher tenantEventPublisher = new TenantEventPublisher(); - componentContext.getBundleContext().registerService( - org.apache.stratos.common.listeners.TenantMgtListener.class.getName(), - tenantEventPublisher, null); - - // Start instance status topic subscriber - if(log.isDebugEnabled()) { - log.debug("Starting instance status topic subscriber..."); - } - TopicSubscriber subscriber = new TopicSubscriber(Constants.INSTANCE_STATUS_TOPIC); - subscriber.setMessageListener(new InstanceStatusListener()); - Thread tsubscriber = new Thread(subscriber); + // Schedule complete tenant event synchronizer + if (log.isDebugEnabled()) { + log.debug("Scheduling tenant synchronizer task..."); + } + TenantSynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance() + .getTaskService()); + + // Register tenant event publisher + if (log.isDebugEnabled()) { + log.debug("Starting tenant event publisher..."); + } + TenantEventPublisher tenantEventPublisher = new TenantEventPublisher(); + componentContext.getBundleContext() + .registerService(org.apache.stratos.common.listeners.TenantMgtListener.class.getName(), + tenantEventPublisher, null); + + // Start instance status topic subscriber + if (log.isDebugEnabled()) { + log.debug("Starting instance status topic subscriber..."); + } + TopicSubscriber subscriber = new TopicSubscriber(Constants.INSTANCE_STATUS_TOPIC); + subscriber.setMessageListener(new InstanceStatusListener()); + Thread tsubscriber = new Thread(subscriber); tsubscriber.start(); - //Create a Tenant-User Role at server start-up - UserRoleCreator.CreateTenantUserRole(); + // initializing the topology event subscriber + /* + * TopicSubscriber topologyTopicSubscriber = new + * TopicSubscriber(Constants.TOPOLOGY_TOPIC); + * topologyTopicSubscriber.setMessageListener(new + * TopologyEventListner()); + * Thread topologyTopicSubscriberThread = new + * Thread(topologyTopicSubscriber); + * topologyTopicSubscriberThread.start(); + * + * //Starting Topology Receiver + * TopologyReceiver topologyReceiver = new TopologyReceiver(); + * Thread topologyReceiverThread = new Thread(topologyReceiver); + * topologyReceiverThread.start(); + */ + + stratosManagerTopologyEventReceiver = new StratosManagerTopologyEventReceiver(); + Thread topologyReceiverThread = new Thread(stratosManagerTopologyEventReceiver); + topologyReceiverThread.start(); + log.info("Topology receiver thread started"); + + // retrieve persisted CartridgeSubscriptions + new DataInsertionAndRetrievalManager().cachePersistedSubscriptions(); + + // Component activated successfully + log.info("ADC management server component is activated"); + + } catch (Exception e) { + if (log.isFatalEnabled()) { + log.fatal("Could not activate ADC management server component", e); + } + } + } - TenantUserRoleCreator tenantUserRoleCreator = new TenantUserRoleCreator(); - componentContext.getBundleContext().registerService( - org.apache.stratos.common.listeners.TenantMgtListener.class.getName(), - tenantUserRoleCreator, null); + protected void setConfigurationContextService(ConfigurationContextService contextService) { + DataHolder.setClientConfigContext(contextService.getClientConfigContext()); + DataHolder.setServerConfigContext(contextService.getServerConfigContext()); - //initializing the topology event subscriber - /*TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); - topologyTopicSubscriber.setMessageListener(new TopologyEventListner()); - Thread topologyTopicSubscriberThread = new Thread(topologyTopicSubscriber); - topologyTopicSubscriberThread.start(); + } - //Starting Topology Receiver - TopologyReceiver topologyReceiver = new TopologyReceiver(); - Thread topologyReceiverThread = new Thread(topologyReceiver); - topologyReceiverThread.start();*/ + protected void unsetConfigurationContextService(ConfigurationContextService contextService) { + DataHolder.setClientConfigContext(null); + DataHolder.setServerConfigContext(null); + } - stratosManagerTopologyEventReceiver = new StratosManagerTopologyEventReceiver(); - Thread topologyReceiverThread = new Thread(stratosManagerTopologyEventReceiver); - topologyReceiverThread.start(); - log.info("Topology receiver thread started"); + protected void setRealmService(RealmService realmService) { + // keeping the realm service in the DataHolder class + DataHolder.setRealmService(realmService); + } - // retrieve persisted CartridgeSubscriptions - new DataInsertionAndRetrievalManager().cachePersistedSubscriptions(); + protected void unsetRealmService(RealmService realmService) { + } - //Component activated successfully - log.info("ADC management server component is activated"); - + protected void setRegistryService(RegistryService registryService) { + try { + DataHolder.setRegistryService(registryService); } catch (Exception e) { - if(log.isFatalEnabled()) { - log.fatal("Could not activate ADC management server component", e); - } + log.error("Cannot retrieve governance registry", e); } } - protected void setConfigurationContextService(ConfigurationContextService contextService) { - DataHolder.setClientConfigContext(contextService.getClientConfigContext()); - DataHolder.setServerConfigContext(contextService.getServerConfigContext()); - - } - - protected void unsetConfigurationContextService(ConfigurationContextService contextService) { - DataHolder.setClientConfigContext(null); - DataHolder.setServerConfigContext(null); - } - - protected void setRealmService(RealmService realmService) { - // keeping the realm service in the DataHolder class - DataHolder.setRealmService(realmService); - } - - protected void unsetRealmService(RealmService realmService) { - } - - protected void setRegistryService(RegistryService registryService) { - try { - DataHolder.setRegistryService(registryService); - } catch (Exception e) { - log.error("Cannot retrieve governance registry", e); - } - } - - protected void unsetRegistryService(RegistryService registryService) { - } - - /*protected void setTopologyManagementService(TopologyManagementService topologyMgtService) { - DataHolder.setTopologyMgtService(topologyMgtService); - } - - protected void unsetTopologyManagementService(TopologyManagementService topologyMgtService) { - }*/ - - protected void setTaskService(TaskService taskService) { - if (log.isDebugEnabled()) { - log.debug("Setting the task service"); - } - ServiceReferenceHolder.getInstance().setTaskService(taskService); - } - - protected void unsetTaskService(TaskService taskService) { - if (log.isDebugEnabled()) { - log.debug("Un-setting the task service"); - } - ServiceReferenceHolder.getInstance().setTaskService(null); - } - - protected void deactivate(ComponentContext context) { - // Close event publisher connections to message broker - EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC); - EventPublisherPool.close(Constants.TENANT_TOPIC); - - //terminate Stratos Manager Topology Receiver - stratosManagerTopologyEventReceiver.terminate(); - } + protected void unsetRegistryService(RegistryService registryService) { + } + + /* + * protected void setTopologyManagementService(TopologyManagementService + * topologyMgtService) { + * DataHolder.setTopologyMgtService(topologyMgtService); + * } + * + * protected void unsetTopologyManagementService(TopologyManagementService + * topologyMgtService) { + * } + */ + + protected void setTaskService(TaskService taskService) { + if (log.isDebugEnabled()) { + log.debug("Setting the task service"); + } + ServiceReferenceHolder.getInstance().setTaskService(taskService); + } + + protected void unsetTaskService(TaskService taskService) { + if (log.isDebugEnabled()) { + log.debug("Un-setting the task service"); + } + ServiceReferenceHolder.getInstance().setTaskService(null); + } + + protected void deactivate(ComponentContext context) { + // Close event publisher connections to message broker + // EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC); + // EventPublisherPool.close(Constants.TENANT_TOPIC); + + // terminate Stratos Manager Topology Receiver + stratosManagerTopologyEventReceiver.terminate(); + } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java index 2b68de6..d730073 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/listener/InstanceStatusListener.java @@ -1,23 +1,28 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.manager.listener; +import java.util.Set; + +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.manager.publisher.InstanceNotificationPublisher; @@ -26,71 +31,108 @@ import org.apache.stratos.manager.subscription.CartridgeSubscription; import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; import org.apache.stratos.messaging.util.Constants; import org.apache.stratos.messaging.util.Util; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.TextMessage; -import java.util.Set; +public class InstanceStatusListener implements MqttCallback { + + private static final Log log = LogFactory.getLog(InstanceStatusListener.class); + + @Override + public void connectionLost(Throwable arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void messageArrived(String arg0, MqttMessage message) throws Exception { + if (message instanceof MqttMessage) { + + TextMessage receivedMessage = new ActiveMQTextMessage(); + System.out.println("instance notifier messege received...."); + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + "org.apache.stratos.messaging.event.".concat(arg0.replace("/", + "."))); + + if (log.isInfoEnabled()) { + log.info("Instance status message received"); + } + + try { + String type = receivedMessage.getStringProperty(Constants.EVENT_CLASS_NAME); + if (log.isInfoEnabled()) { + log.info(String.format("Event class name: %s ", type)); + } + // If member started event is received publish artifact update + // message + // To do a git clone + if (InstanceStartedEvent.class.getName().equals(type)) { + String json = receivedMessage.getText(); + InstanceStartedEvent event = + (InstanceStartedEvent) Util.jsonToObject(json, + InstanceStartedEvent.class); + String clusterId = event.getClusterId(); + if (log.isInfoEnabled()) { + log.info("Cluster id: " + clusterId); + } + + Set<CartridgeSubscription> cartridgeSubscriptions = + new DataInsertionAndRetrievalManager().getCartridgeSubscriptionForCluster(clusterId); + if (cartridgeSubscriptions == null || cartridgeSubscriptions.isEmpty()) { + // No subscriptions, return + if (log.isDebugEnabled()) { + log.debug("No subscription information found for cluster id " + + clusterId); + } + return; + } + + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + // We need to send this event for all types, single + // tenant + // and multi tenant. + // In an autoscaling scenario, we need to send this + // event + // for all existing subscriptions for the newly spawned + // instance + // Also in a case of restarting the agent, this event + // needs + // to be sent for all subscriptions for the existing + // instance + if (cartridgeSubscription.getRepository() != null) { + InstanceNotificationPublisher publisher = + new InstanceNotificationPublisher(); + publisher.sendArtifactUpdateEvent(cartridgeSubscription.getRepository(), + clusterId, + String.valueOf(cartridgeSubscription.getSubscriber() + .getTenantId())); + + } else { + if (log.isDebugEnabled()) { + log.debug("No repository found for subscription with alias: " + + cartridgeSubscription.getAlias() + ", type: " + + cartridgeSubscription.getType() + + ". Not sending the Artifact Updated event"); + } + } + } + + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Could not process instance status message", e); + } + } + } -public class InstanceStatusListener implements MessageListener { - - private static final Log log = LogFactory - .getLog(InstanceStatusListener.class); - - @Override - public void onMessage(Message message) { - TextMessage receivedMessage = (TextMessage) message; - if(log.isInfoEnabled()) { - log.info("Instance status message received"); - } - - try { - String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); - if(log.isInfoEnabled()) { - log.info(String.format("Event class name: %s ", type)); - } - // If member started event is received publish artifact update message - // To do a git clone - if (InstanceStartedEvent.class.getName().equals(type)) { - String json = receivedMessage.getText(); - InstanceStartedEvent event = (InstanceStartedEvent) Util.jsonToObject(json, InstanceStartedEvent.class); - String clusterId = event.getClusterId(); - if(log.isInfoEnabled()) { - log.info("Cluster id: " + clusterId); - } - - Set<CartridgeSubscription> cartridgeSubscriptions = new DataInsertionAndRetrievalManager().getCartridgeSubscriptionForCluster(clusterId); - if (cartridgeSubscriptions == null || cartridgeSubscriptions.isEmpty()) { - // No subscriptions, return - if (log.isDebugEnabled()) { - log.debug("No subscription information found for cluster id " + clusterId); - } - return; - } - - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - // We need to send this event for all types, single tenant and multi tenant. - // In an autoscaling scenario, we need to send this event for all existing subscriptions for the newly spawned instance - // Also in a case of restarting the agent, this event needs to be sent for all subscriptions for the existing instance - if (cartridgeSubscription.getRepository() != null) { - InstanceNotificationPublisher publisher = new InstanceNotificationPublisher(); - publisher.sendArtifactUpdateEvent(cartridgeSubscription.getRepository(), clusterId, - String.valueOf(cartridgeSubscription.getSubscriber().getTenantId())); - - } else { - if(log.isDebugEnabled()) { - log.debug("No repository found for subscription with alias: " + cartridgeSubscription.getAlias() + ", type: " + cartridgeSubscription.getType()+ - ". Not sending the Artifact Updated event"); - } - } - } - - } - } catch (Exception e) { - if(log.isErrorEnabled()) { - log.error("Could not process instance status message", e); - } - } - } + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java index 6a885e8..c932e98 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java @@ -1,18 +1,18 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -27,56 +27,59 @@ import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent; import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; -import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; /** - * Creating the relevant instance notification event and publish it to the instances. + * Creating the relevant instance notification event and publish it to the + * instances. */ public class InstanceNotificationPublisher { - private static final Log log = LogFactory.getLog(InstanceNotificationPublisher.class); + private static final Log log = LogFactory.getLog(InstanceNotificationPublisher.class); - public InstanceNotificationPublisher() { - } + public InstanceNotificationPublisher() { + } - private void publish(Event event) { - EventPublisher depsyncEventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC); - depsyncEventPublisher.publish(event); - } + private void publish(Event event) { + String topic = Util.getMessageTopicName(event); + EventPublisher depsyncEventPublisher = EventPublisherPool.getPublisher(topic); + depsyncEventPublisher.publish(event); + } - /** - * Publishing the artifact update event to the instances - * - * @param repository - * @param clusterId - * @param tenantId - */ - public void sendArtifactUpdateEvent(Repository repository, String clusterId, String tenantId) { - ArtifactUpdatedEvent artifactUpdateEvent = new ArtifactUpdatedEvent(); - artifactUpdateEvent.setClusterId(clusterId); - artifactUpdateEvent.setRepoUserName(repository.getUserName()); - artifactUpdateEvent.setRepoPassword(repository.getPassword()); - artifactUpdateEvent.setRepoURL(repository.getUrl()); - artifactUpdateEvent.setTenantId(tenantId); - artifactUpdateEvent.setCommitEnabled(repository.isCommitEnabled()); + /** + * Publishing the artifact update event to the instances + * + * @param repository + * @param clusterId + * @param tenantId + */ + public void sendArtifactUpdateEvent(Repository repository, String clusterId, String tenantId) { + ArtifactUpdatedEvent artifactUpdateEvent = new ArtifactUpdatedEvent(); + artifactUpdateEvent.setClusterId(clusterId); + artifactUpdateEvent.setRepoUserName(repository.getUserName()); + artifactUpdateEvent.setRepoPassword(repository.getPassword()); + artifactUpdateEvent.setRepoURL(repository.getUrl()); + artifactUpdateEvent.setTenantId(tenantId); + artifactUpdateEvent.setCommitEnabled(repository.isCommitEnabled()); - log.info(String.format("Publishing artifact updated event: [cluster] %s " + - "[repo-URL] %s [repo-username] %s [tenant-id] %s", - clusterId, repository.getUrl(), repository.getUserName(), tenantId)); - publish(artifactUpdateEvent); - } + log.info(String.format("Publishing artifact updated event: [cluster] %s " + + "[repo-URL] %s [repo-username] %s [tenant-id] %s", + clusterId, + repository.getUrl(), repository.getUserName(), tenantId)); + publish(artifactUpdateEvent); + } - /** - * Publishing the instance termination notification to the instances - * - * @param memberId - */ - public void sendInstanceCleanupEventForMember(String memberId) { - log.info(String.format("Publishing Instance Cleanup Event: [member] %s", memberId)); - publish(new InstanceCleanupMemberEvent(memberId)); - } + /** + * Publishing the instance termination notification to the instances + * + * @param memberId + */ + public void sendInstanceCleanupEventForMember(String memberId) { + log.info(String.format("Publishing Instance Cleanup Event: [member] %s", memberId)); + publish(new InstanceCleanupMemberEvent(memberId)); + } - public void sendInstanceCleanupEventForCluster(String clusterId) { - log.info(String.format("Publishing Instance Cleanup Event: [cluster] %s", clusterId)); - publish(new InstanceCleanupClusterEvent(clusterId)); - } + public void sendInstanceCleanupEventForCluster(String clusterId) { + log.info(String.format("Publishing Instance Cleanup Event: [cluster] %s", clusterId)); + publish(new InstanceCleanupClusterEvent(clusterId)); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java index 8213ed9..0f7816e 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -30,7 +30,7 @@ import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent; import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent; import org.apache.stratos.messaging.event.tenant.TenantUpdatedEvent; -import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; /** * Tenant event publisher to publish tenant events to the message broker by @@ -38,78 +38,83 @@ import org.apache.stratos.messaging.util.Constants; */ public class TenantEventPublisher implements TenantMgtListener { - private static final Log log = LogFactory.getLog(TenantEventPublisher.class); - private static final int EXEC_ORDER = 1; + private static final Log log = LogFactory.getLog(TenantEventPublisher.class); + private static final int EXEC_ORDER = 1; + @Override + public void onTenantCreate(TenantInfoBean tenantInfo) throws StratosException { + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing tenant created event: [tenant-id] %d [tenant-domain] %s", + tenantInfo.getTenantId(), tenantInfo.getTenantDomain())); + } + Tenant tenant = new Tenant(tenantInfo.getTenantId(), tenantInfo.getTenantDomain()); + TenantCreatedEvent event = new TenantCreatedEvent(tenant); + String topic = Util.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(event); + } catch (Exception e) { + log.error("Could not publish tenant created event", e); + } + } - @Override - public void onTenantCreate(TenantInfoBean tenantInfo) throws StratosException { - try { - if(log.isDebugEnabled()) { - log.debug(String.format("Publishing tenant created event: [tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), tenantInfo.getTenantDomain())); - } - Tenant tenant = new Tenant(tenantInfo.getTenantId(), tenantInfo.getTenantDomain()); - TenantCreatedEvent event = new TenantCreatedEvent(tenant); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); - eventPublisher.publish(event); - } - catch (Exception e) { - log.error("Could not publish tenant created event", e); - } - } + @Override + public void onTenantUpdate(TenantInfoBean tenantInfo) throws StratosException { + try { + if (log.isInfoEnabled()) { + log.info(String.format("Publishing tenant updated event: [tenant-id] %d [tenant-domain] %s", + tenantInfo.getTenantId(), tenantInfo.getTenantDomain())); + } + TenantUpdatedEvent event = + new TenantUpdatedEvent(tenantInfo.getTenantId(), + tenantInfo.getTenantDomain()); + String topic = Util.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(event); + } catch (Exception e) { + log.error("Could not publish tenant updated event"); + } + } - @Override - public void onTenantUpdate(TenantInfoBean tenantInfo) throws StratosException { - try { - if(log.isInfoEnabled()) { - log.info(String.format("Publishing tenant updated event: [tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), tenantInfo.getTenantDomain())); - } - TenantUpdatedEvent event = new TenantUpdatedEvent(tenantInfo.getTenantId(), tenantInfo.getTenantDomain()); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); - eventPublisher.publish(event); - } - catch (Exception e) { - log.error("Could not publish tenant updated event"); - } - } + @Override + public void onTenantDelete(int tenantId) { + try { + if (log.isInfoEnabled()) { + log.info(String.format("Publishing tenant removed event: [tenant-id] %d", tenantId)); + } + TenantRemovedEvent event = new TenantRemovedEvent(tenantId); + String topic = Util.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(event); + } catch (Exception e) { + log.error("Could not publish tenant removed event"); + } + } - @Override - public void onTenantDelete(int tenantId) { - try { - if(log.isInfoEnabled()) { - log.info(String.format("Publishing tenant removed event: [tenant-id] %d", tenantId)); - } - TenantRemovedEvent event = new TenantRemovedEvent(tenantId); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); - eventPublisher.publish(event); - } - catch (Exception e) { - log.error("Could not publish tenant removed event"); - } - } + @Override + public void onTenantRename(int tenantId, String oldDomainName, String newDomainName) + throws StratosException { + } - @Override - public void onTenantRename(int tenantId, String oldDomainName, String newDomainName) throws StratosException { - } + @Override + public void onTenantInitialActivation(int tenantId) throws StratosException { + } - @Override - public void onTenantInitialActivation(int tenantId) throws StratosException { - } + @Override + public void onTenantActivation(int tenantId) throws StratosException { + } - @Override - public void onTenantActivation(int tenantId) throws StratosException { - } + @Override + public void onTenantDeactivation(int tenantId) throws StratosException { + } - @Override - public void onTenantDeactivation(int tenantId) throws StratosException { - } + @Override + public void onSubscriptionPlanChange(int tenantId, String oldPlan, String newPlan) + throws StratosException { + } - @Override - public void onSubscriptionPlanChange(int tenantId, String oldPlan, String newPlan) throws StratosException { - } - - @Override - public int getListenerOrder() { - return EXEC_ORDER; - } + @Override + public int getListenerOrder() { + return EXEC_ORDER; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java index 81ec432..78f587c 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java @@ -1,24 +1,30 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.manager.publisher; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.beans.TenantInfoBean; @@ -31,88 +37,95 @@ import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.domain.tenant.Subscription; import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent; -import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; import org.apache.stratos.tenant.mgt.util.TenantMgtUtil; import org.wso2.carbon.ntask.core.Task; import org.wso2.carbon.user.core.tenant.TenantManager; -import java.util.*; - /** * Tenant synchronizer task for publishing complete tenant event periodically * to message broker. */ public class TenantSynzhronizerTask implements Task { - private static final Log log = LogFactory.getLog(TenantSynzhronizerTask.class); + private static final Log log = LogFactory.getLog(TenantSynzhronizerTask.class); + + @Override + public void init() { + } + + @Override + public void execute() { + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing complete tenant event")); + } + Tenant tenant; + List<Tenant> tenants = new ArrayList<Tenant>(); + TenantManager tenantManager = DataHolder.getRealmService().getTenantManager(); + org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants(); + for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) { + // Create tenant + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s", + carbonTenant.getId(), carbonTenant.getDomain())); + } + tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain()); - @Override - public void init() { - } + if (!org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance() + .tenantExists(carbonTenant.getId())) { + // if the tenant is not already there in TenantManager, + // trigger TenantCreatedEvent + TenantInfoBean tenantBean = new TenantInfoBean(); + tenantBean.setTenantId(carbonTenant.getId()); + tenantBean.setTenantDomain(carbonTenant.getDomain()); + TenantMgtUtil.triggerAddTenant(tenantBean); + // add tenant to Tenant Manager + org.apache.stratos.messaging.message.receiver.tenant.TenantManager.getInstance() + .addTenant(tenant); + } - @Override - public void execute() { - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Publishing complete tenant event")); - } - Tenant tenant; - List<Tenant> tenants = new ArrayList<Tenant>(); - TenantManager tenantManager = DataHolder.getRealmService().getTenantManager(); - org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants(); - for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) { - // Create tenant - if(log.isDebugEnabled()) { - log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s", carbonTenant.getId(), carbonTenant.getDomain())); - } - tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain()); - - if (!org.apache.stratos.messaging.message.receiver.tenant.TenantManager - .getInstance().tenantExists(carbonTenant.getId())) { - // if the tenant is not already there in TenantManager, - // trigger TenantCreatedEvent - TenantInfoBean tenantBean = new TenantInfoBean(); - tenantBean.setTenantId(carbonTenant.getId()); - tenantBean.setTenantDomain(carbonTenant.getDomain()); - TenantMgtUtil.triggerAddTenant(tenantBean); - // add tenant to Tenant Manager - org.apache.stratos.messaging.message.receiver.tenant.TenantManager - .getInstance().addTenant(tenant); - } - - // Add subscriptions - ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - //List<CartridgeSubscriptionInfo> cartridgeSubscriptions = PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId()); - ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - Collection<CartridgeSubscription> cartridgeSubscriptions = new DataInsertionAndRetrievalManager().getCartridgeSubscriptions(tenant.getTenantId()); - if (cartridgeSubscriptions != null && !cartridgeSubscriptions.isEmpty()) { - for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { - if(log.isDebugEnabled()) { - log.debug(String.format("Tenant subscription found: [tenant-id] %d [tenant-domain] %s [service] %s", - carbonTenant.getId(), carbonTenant.getDomain(), cartridgeSubscription.getType())); - } - HashSet<String> clusterIds = new HashSet<String>(); - clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain()); - Subscription subscription = new Subscription(cartridgeSubscription.getType(), clusterIds); - for(SubscriptionDomain subscriptionDomain : cartridgeSubscription.getSubscriptionDomains()) { - subscription.addSubscriptionDomain(subscriptionDomain.getDomainName(), subscriptionDomain.getApplicationContext()); - } - tenant.addSubscription(subscription); - } - } - tenants.add(tenant); - } - CompleteTenantEvent event = new CompleteTenantEvent(tenants); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC); - eventPublisher.publish(event); - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Could not publish complete tenant event", e); - } - } - } + // Add subscriptions + // /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // List<CartridgeSubscriptionInfo> cartridgeSubscriptions = + // PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId()); + // /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + Collection<CartridgeSubscription> cartridgeSubscriptions = + new DataInsertionAndRetrievalManager().getCartridgeSubscriptions(tenant.getTenantId()); + if (cartridgeSubscriptions != null && !cartridgeSubscriptions.isEmpty()) { + for (CartridgeSubscription cartridgeSubscription : cartridgeSubscriptions) { + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant subscription found: [tenant-id] %d [tenant-domain] %s [service] %s", + carbonTenant.getId(), carbonTenant.getDomain(), + cartridgeSubscription.getType())); + } + HashSet<String> clusterIds = new HashSet<String>(); + clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain()); + Subscription subscription = + new Subscription( + cartridgeSubscription.getType(), + clusterIds); + for (SubscriptionDomain subscriptionDomain : cartridgeSubscription.getSubscriptionDomains()) { + subscription.addSubscriptionDomain(subscriptionDomain.getDomainName(), + subscriptionDomain.getApplicationContext()); + } + tenant.addSubscription(subscription); + } + } + tenants.add(tenant); + } + CompleteTenantEvent event = new CompleteTenantEvent(tenants); + String topic = Util.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(event); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Could not publish complete tenant event", e); + } + } + } - @Override - public void setProperties(Map<String, String> stringStringMap) { - } + @Override + public void setProperties(Map<String, String> stringStringMap) { + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/pom.xml ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/pom.xml b/components/org.apache.stratos.messaging/pom.xml index b2d851b..e7c79d9 100644 --- a/components/org.apache.stratos.messaging/pom.xml +++ b/components/org.apache.stratos.messaging/pom.xml @@ -73,7 +73,21 @@ <artifactId>java-xmlbuilder</artifactId> <version>0.6</version> </dependency> - + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>mqtt-client</artifactId> + <version>0.4.0</version> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-core</artifactId> + <version>5.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <version>5.9.1</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java new file mode 100644 index 0000000..e4a74c8 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.broker.connect; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.util.Util; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; + +import java.io.File; +import java.util.Properties; + +/** + * This class is responsible for loading the mqtt config file from the + * classpath + * Initialize the topic connection. + * and initialize the topic connection. Later if some other object needs a topic + * session, this object is capable of providing one. + */ +public class MQTTConnector { + + public static final String MQTTURL = "defaultValue"; + public static final String CLIENT_ID = "Startos"; + public static final String TMPFILELOCATION = "/tmp"; + private static MqttClient topicClient; + + private static MqttClient topicClientSub; + private static final Log log = LogFactory.getLog(MQTTConnector.class); + private static String configFileLocation = System.getProperty("jndi.properties.dir"); + private static Properties mqttProp = + Util.getProperties(configFileLocation + File.separator + + "mqtttopic.properties"); + + public static synchronized MqttClient getMQTTConClient() { + + if (topicClient == null) { + + + String broker = mqttProp.getProperty("mqtturl", MQTTURL); + + String clientId = mqttProp.getProperty("clientID", CLIENT_ID); + MemoryPersistence persistence = new MemoryPersistence(); + + try { + topicClient = new MqttClient(broker, clientId, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + if (log.isDebugEnabled()) { + log.debug("MQTT client connected"); + } + + } catch (MqttException me) { + + log.error("Failed to initiate autoscaler service client. ", me); + } + + } + return topicClient; + + } + + public static synchronized MqttClient getMQTTSubClient(String identifier) { + // if (topicClientSub == null) { + + String broker = mqttProp.getProperty("mqtturl", MQTTURL); + + String tempFile = mqttProp.getProperty("tempfilelocation", TMPFILELOCATION); + // Creating new default persistence for mqtt client + MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence(tempFile); + + try { + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + // mqtt client with specific url and a random client id + topicClientSub = new MqttClient(broker, identifier, persistence); + + if (log.isDebugEnabled()) { + log.debug("MQTT client connected"); + } + + } catch (MqttException me) { + + log.error("Failed to initiate autoscaler service client. ", me); + } + + // } + return topicClientSub; + + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java deleted file mode 100644 index ebb339d..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.messaging.broker.connect; - -import java.util.Properties; -import java.io.File; - -import javax.jms.JMSException; -import javax.jms.QueueSession; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import javax.jms.TopicSession; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.apache.stratos.messaging.util.Util; -import org.apache.stratos.messaging.util.Constants; - -/** - * This class is responsible for loading the jndi.properties file from the - * classpath - * and initialize the topic connection. Later if some other object needs a topic - * session, this object is capable of providing one. - * - * @author nirmal - */ -public class TopicConnector { - - private TopicConnection topicConnection; - private String jndiPropFileDir; - private Topic topic; - - public TopicConnector() { - jndiPropFileDir = System.getProperty("jndi.properties.dir"); - } - - public void init(String topicName) throws Exception { - InitialContext ctx; - Properties environment = Util.getProperties(jndiPropFileDir + File.separator + "jndi.properties"); - environment.put(Constants.REQUEST_BASE_CONTEXT, "true"); // always returns the base context. - ctx = new InitialContext(environment); - // Lookup connection factory - String connectionFactoryName = environment.get("connectionfactoryName").toString(); - TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(connectionFactoryName); - // Lookup the topic - try { - setTopic((Topic) ctx.lookup(topicName)); - } catch (NamingException e) { - } - topicConnection = connFactory.createTopicConnection(); - topicConnection.start(); - } - - /** - * Provides a new topic session. - * - * @return topic session instance - * @throws JMSException if unable to create a topic session - */ - public TopicSession newSession() throws Exception { - return topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE); - } - - public void close() throws JMSException { - if (topicConnection == null) { - return; - } - topicConnection.close(); - } - - public Topic getTopic() { - return topic; - } - - public void setTopic(Topic topic) { - this.topic = topic; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java index 7ea98d4..e6bce76 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -20,14 +20,11 @@ package org.apache.stratos.messaging.broker.heartbeat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.broker.connect.TopicConnector; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.event.ping.PingEvent; import org.apache.stratos.messaging.util.Constants; import org.apache.stratos.messaging.util.Util; -import javax.jms.JMSException; - /** * This health checker runs forever, and is responsible for checking the * connection @@ -37,50 +34,52 @@ import javax.jms.JMSException; public class TopicHealthChecker implements Runnable { private static final Log log = LogFactory.getLog(TopicHealthChecker.class); - private String topicName; - private boolean terminated; + private final String topicName; + private boolean terminated; - public TopicHealthChecker(String name) { + public TopicHealthChecker(String name) { topicName = name; } @Override public void run() { - if(log.isDebugEnabled()){ - log.debug(topicName + " topic health checker is running... " ); - } - TopicConnector testConnector = new TopicConnector(); + if (log.isDebugEnabled()) { + log.debug(topicName + " topic health checker is running... "); + } + while (!terminated) { try { - // Health checker needs to run with the smallest possible time interval (configurable) - // to detect a connection drop. Otherwise the subscriber will not - // get reconnected after a connection drop. + // Health checker needs to run with the smallest possible time + // interval (configurable) + // to detect a connection drop. Otherwise the subscriber will + // not + // get reconnected after a connection drop. Thread.sleep(Util.getAveragePingInterval()); - testConnector.init(topicName); - // A ping event is published to detect a session timeout - EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), false); + + // A ping event is published to detect a session timeout + EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), + false); } catch (Exception e) { // Implies connection is not established // sleep for configured failover ping interval and retry try { - log.error(topicName + " topic health checker is failed and will try to subscribe again in "+Util.getFailoverPingInterval()/1000+" seconds."); - Thread.sleep(Util.getFailoverPingInterval()); + log.error(topicName + + " topic health checker is failed and will try to subscribe again in " + + Util.getFailoverPingInterval() / 1000 + " seconds."); + Thread.sleep(Util.getFailoverPingInterval()); break; } catch (InterruptedException ignore) { } } finally { - try { - testConnector.close(); - } catch (JMSException ignore) { - } + } } } - public void terminate() { - terminated = true; - } + public void terminate() { + terminated = true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java index 2150494..2ba4df4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -20,37 +20,35 @@ package org.apache.stratos.messaging.broker.publish; import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.util.Constants; - -import java.util.Properties; /** - * Defines logic for publishing events to a given topic in the message broker. - * A message header will be used to send the event class name to be used by the - * subscriber to identify the event. + * Defines logic for publishing events to a given topic in the message broker. + * A message header will be used to send the event class name to be used by the + * subscriber to identify the event. */ public class EventPublisher extends TopicPublisher { - /** - * @param topicName topic name of this publisher instance. - */ - EventPublisher(String topicName) { - super(topicName); - } + /** + * @param topicName + * topic name of this publisher instance. + */ + EventPublisher(String topicName) { + super(topicName); + } + + /** + * + * @param event + * event to be published + */ + public void publish(Event event) { + publish(event, true); + } - /** - * - * @param event event to be published - */ - public void publish(Event event) { - publish(event, true); - } + public void publish(Event event, boolean retry) { + synchronized (EventPublisher.class) { - public void publish(Event event, boolean retry) { - synchronized (EventPublisher.class) { - Properties headers = new Properties(); - headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName()); - super.publish(event, headers, retry); - } - } + super.publish(event, retry); + } + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java index c0074f5..ff6ba8d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java @@ -1,36 +1,31 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.messaging.broker.publish; -import com.google.gson.Gson; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.broker.connect.TopicConnector; -import org.apache.stratos.messaging.publish.MessagePublisher; +import org.apache.stratos.messaging.broker.connect.MQTTConnector; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; -import javax.jms.JMSException; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSession; -import java.util.Enumeration; -import java.util.Properties; +import com.google.gson.Gson; /** * Any instance who needs to publish data to a topic, should communicate with @@ -39,27 +34,28 @@ import java.util.Properties; * published * to JSON format, before publishing. * - * @author nirmal + * * */ -public class TopicPublisher extends MessagePublisher { +public class TopicPublisher { private static final Log log = LogFactory.getLog(TopicPublisher.class); - private TopicSession topicSession; - private TopicConnector connector; - private javax.jms.TopicPublisher topicPublisher = null; - private boolean initialized; + + private static final int QOS = 2; + + public static TopicPublisher topicPub; + private boolean initialized; + private final String topic; /** * @param aTopicName * topic name of this publisher instance. */ TopicPublisher(String aTopicName) { - super(aTopicName); - connector = new TopicConnector(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher connector created: [topic] %s", getName())); - } + this.topic = aTopicName; + if (log.isDebugEnabled()) { + log.debug(String.format("Topic publisher connector created: [topic] %s", topic)); + } } /** @@ -67,139 +63,58 @@ public class TopicPublisher extends MessagePublisher { * lost, this will perform re-subscription periodically, until a connection * obtained. */ + public void publish(Object messageObj, boolean retry) { - publish(messageObj, null, retry); - } - - public void publish(Object messageObj, Properties headers, boolean retry) { - synchronized (TopicPublisher.class) { - Gson gson = new Gson(); - String message = gson.toJson(messageObj); - boolean published = false; - while(!published) { - - try { - doPublish(message, headers); - published = true; - } catch (Exception e) { - initialized = false; - if(log.isErrorEnabled()) { - log.error("Error while publishing to the topic: " + getName(), e); - } - if(!retry) { - if(log.isDebugEnabled()) { - log.debug("Retry disabled for topic " + getName()); - } - throw new RuntimeException(e); - } - - if(log.isInfoEnabled()) { - log.info("Will try to re-publish in 60 sec"); - } - try { - Thread.sleep(60000); - } catch (InterruptedException ignore) { - } - } - } - } + + synchronized (TopicPublisher.class) { + Gson gson = new Gson(); + String message = gson.toJson(messageObj); + boolean published = false; + while (!published) { + + try { + MqttClient mqttClient = MQTTConnector.getMQTTConClient(); + + MqttMessage mqttMSG = new MqttMessage(message.getBytes()); + + mqttMSG.setQos(QOS); + + mqttClient.connect(); + mqttClient.publish(topic, mqttMSG); + mqttClient.disconnect(); + published = true; + } catch (Exception e) { + initialized = false; + if (log.isErrorEnabled()) { + log.error("Error while publishing to the topic: " + topic, e); + } + if (!retry) { + if (log.isDebugEnabled()) { + log.debug("Retry disabled for topic " + topic); + } + throw new RuntimeException(e); + } + + if (log.isInfoEnabled()) { + log.info("Will try to re-publish in 60 sec"); + } + try { + Thread.sleep(60000); + } catch (InterruptedException ignore) { + } + } + } + } } public void close() { - synchronized (TopicPublisher.class) { - // closes all sessions/connections - try { - if(topicPublisher != null) { - topicPublisher.close(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher closed: [topic] %s", getName())); - } - } - if(topicSession != null) { - topicSession.close(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher session closed: [topic] %s", getName())); - } - } - if(connector != null) { - connector.close(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher connector closed: [topic] %s", getName())); - } - } - } catch (JMSException ignore) { - } - } - } + synchronized (TopicPublisher.class) { + // closes all sessions/connections + try { - private void doPublish(String message, Properties headers) throws Exception, JMSException { - if(!initialized) { - // Initialize a topic connection to the message broker - connector.init(getName()); - initialized = true; - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher connector initialized: [topic] %s", getName())); - } - } - - try { - // Create a new session - topicSession = createSession(connector); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher session created: [topic] %s", getName())); - } - // Create a publisher from session - topicPublisher = createPublisher(topicSession); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher created: [topic] %s", getName())); - } - - // Create text message - TextMessage textMessage = topicSession.createTextMessage(message); - - if (headers != null) { - // Add header properties - @SuppressWarnings("rawtypes") - Enumeration e = headers.propertyNames(); - - while (e.hasMoreElements()) { - String key = (String) e.nextElement(); - textMessage.setStringProperty(key, headers.getProperty(key)); + } catch (Exception ignore) { } } - - topicPublisher.publish(textMessage); - if (log.isDebugEnabled()) { - log.debug(String.format("Message published: [topic] %s [header] %s [body] %s", getName(), (headers != null) ? headers.toString() : "null", message)); - } - } - finally { - if(topicPublisher != null) { - topicPublisher.close(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher closed: [topic] %s", getName())); - } - } - if(topicSession != null) { - topicSession.close(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic publisher session closed: [topic] %s", getName())); - } - } - } - } - - private TopicSession createSession(TopicConnector topicConnector) throws Exception { - // Create a new session - return topicConnector.newSession(); - } - - private javax.jms.TopicPublisher createPublisher(TopicSession topicSession) throws Exception, JMSException { - Topic topic = connector.getTopic(); - if (topic == null) { - // if the topic doesn't exist, create it. - topic = topicSession.createTopic(getName()); - } - return topicSession.createPublisher(topic); } + }
