Updated Branches: refs/heads/master e589c5c7c -> 1df2f539b
Fixed cloud controller instance status event message delegator logic Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/99da1238 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/99da1238 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/99da1238 Branch: refs/heads/master Commit: 99da12388a53230e57158d45a1ba1c8defcff9a7 Parents: 3e4f30d Author: Imesh Gunaratne <[email protected]> Authored: Wed Dec 25 14:16:36 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Wed Dec 25 14:16:36 2013 +0530 ---------------------------------------------------------------------- .../impl/CloudControllerServiceImpl.java | 6 +- .../InstanceStatusEventMessageDelegator.java | 66 +++++++++++++++++++ .../topology/TopologyEventMessageDelegator.java | 69 -------------------- 3 files changed, 69 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/99da1238/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java index 090e5c3..2f8e278 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java @@ -32,8 +32,8 @@ import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublis import org.apache.stratos.cloud.controller.registry.RegistryManager; import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; import org.apache.stratos.cloud.controller.topic.TopologySynchronizerTask; +import org.apache.stratos.cloud.controller.topology.InstanceStatusEventMessageDelegator; import org.apache.stratos.cloud.controller.topology.TopologyBuilder; -import org.apache.stratos.cloud.controller.topology.TopologyEventMessageDelegator; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; @@ -132,9 +132,9 @@ public class CloudControllerServiceImpl implements CloudControllerService { private void startTopologyBuilder() { // initialize TopologyEventMessageProcessor Consumer - Thread topologyBuilder = new Thread(new TopologyEventMessageDelegator()); + Thread delegatorThread = new Thread(new InstanceStatusEventMessageDelegator()); // start consumer - topologyBuilder.start(); + delegatorThread.start(); } private TaskManager registerAndScheduleDataPublisherTask( http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/99da1238/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java new file mode 100644 index 0000000..b8a3e9d --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; + +import javax.jms.TextMessage; + +public class InstanceStatusEventMessageDelegator implements Runnable { + private static final Log log = LogFactory.getLog(InstanceStatusEventMessageDelegator.class); + + @Override + public void run() { + log.info("Instance status event message delegator started"); + + while (true) { + try { + TextMessage message = TopologyManager.getInstance().getSharedTopologyDiffQueue().take(); + + // retrieve the header + String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); + log.info(String.format("Instance status event message received from queue: %s", type)); + + if (InstanceStartedEvent.class.getName().equals(type)) { + // retrieve the actual message + String json = message.getText(); + TopologyBuilder.handleMemberStarted((InstanceStartedEvent) Util. + jsonToObject(json, InstanceStartedEvent.class)); + } else if (InstanceActivatedEvent.class.getName().equals(type)) { + // retrieve the actual message + String json = message.getText(); + TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util. + jsonToObject(json, InstanceActivatedEvent.class)); + } else { + log.warn("Event message received is not InstanceStartedEvent or InstanceActivatedEvent"); + } + } catch (Exception e) { + String error = "Failed to retrieve the instance status event message"; + log.error(error, e); + // Commenting throwing the error. Otherwise thread will not execute if an exception is thrown. + //throw new RuntimeException(error, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/99da1238/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java deleted file mode 100644 index 8a4eccd..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventMessageDelegator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; -import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; -import org.apache.stratos.messaging.util.Constants; -import org.apache.stratos.messaging.util.Util; - -import javax.jms.TextMessage; - -public class TopologyEventMessageDelegator implements Runnable { - private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class); - - @Override - public void run() { - log.info("Topology event message processor started"); - - while (true) { - try { - TextMessage message = TopologyManager.getInstance().getSharedTopologyDiffQueue().take(); - - // retrieve the header - String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); - // retrieve the actual message - String json = message.getText(); - - log.info(String.format("Event message received from queue: %s", type)); - - if(InstanceStartedEvent.class.getName().equals(type)) { - TopologyBuilder.handleMemberStarted((InstanceStartedEvent)Util. - jsonToObject(json, InstanceStartedEvent.class)); - } else if(InstanceActivatedEvent.class.getName().equals(type)) { - TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util. - jsonToObject(json, InstanceActivatedEvent.class)); - } else{ - log.warn("Event message received is not InstanceStartedEvent or InstanceActivatedEvent" ); - } - - if (log.isDebugEnabled()) { - log.debug(String.format("Event message received from queue: %s", type)); - } - } catch (Exception e) { - String error = "Failed to retrieve the topology event message."; - log.error(error, e); - // Commenting throwing the error. Otherwise thread will not execute if an exception is thrown. - //throw new RuntimeException(error, e); - } - } - } -}
