adding complete applications
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0f7d7b0d Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0f7d7b0d Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0f7d7b0d Branch: refs/heads/docker-grouping-merge Commit: 0f7d7b0d0db9f643fd232f0ef2689cf62553dee7 Parents: 7f05298 Author: reka <[email protected]> Authored: Fri Oct 31 14:53:49 2014 +0530 Committer: reka <[email protected]> Committed: Fri Oct 31 14:54:33 2014 +0530 ---------------------------------------------------------------------- .../domain/applications/Applications.java | 22 ++- .../locking/ApplicationLockHierarchy.java | 3 + .../applications/ApplicationCreatedEvent.java | 11 +- .../applications/CompleteApplicationsEvent.java | 44 ++++++ .../ClusterStatusClusterCreatedEvent.java | 52 ------- .../ClusterStatusClusterResettedEvent.java | 52 +++++++ .../event/topology/ClusterCreatedEvent.java | 26 +--- .../event/topology/ClusterResetEvent.java | 56 +++++++ .../CompleteApplicationsEventListener.java | 26 ++++ ...lusterStatusClusterCreatedEventListener.java | 24 --- .../ClusterStatusClusterResetEventListener.java | 24 +++ .../ApplicationCreatedMessageProcessor.java | 32 +--- .../CompleteApplicationsMessageProcessor.java | 111 ++++++++++++++ .../updater/ApplicationsUpdater.java | 140 ++++++++++++++++++ ...terStatusClusterCreatedMessageProcessor.java | 58 -------- ...usterStatusClusterResetMessageProcessor.java | 58 ++++++++ .../ClusterStatusMessageProcessorChain.java | 6 +- .../ClusterCreatedMessageProcessor.java | 35 +++-- .../topology/ClusterResetMessageProcessor.java | 146 +++++++++++++++++++ 19 files changed, 721 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java index 9455a4c..9e8cf3e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/Applications.java @@ -32,15 +32,33 @@ public class Applications implements Serializable { private Map<String, Application> applicationMap; + private boolean initialized; + public Applications () { this.applicationMap = new HashMap<String, Application>(); } public void addApplication (Application application) { - this.applicationMap.put(application.getUniqueIdentifier(), application); + this.getApplications().put(application.getUniqueIdentifier(), application); } public Application getApplication (String appId) { - return this.applicationMap.get(appId); + return this.getApplications().get(appId); + } + + public boolean isInitialized() { + return initialized; + } + + public void setInitialized(boolean initialized) { + this.initialized = initialized; + } + + public boolean applicationExists(String appId) { + return this.getApplications().containsKey(appId); + } + + public Map<String, Application> getApplications() { + return applicationMap; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java index 71bfe03..cc31892 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/applications/locking/ApplicationLockHierarchy.java @@ -85,4 +85,7 @@ public class ApplicationLockHierarchy { } } + public ApplicationLock getApplicationLock() { + return applicationLock; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java index dfb90f2..26ba7f2 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/ApplicationCreatedEvent.java @@ -18,6 +18,7 @@ */ package org.apache.stratos.messaging.event.applications; +import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.event.Event; import java.io.Serializable; @@ -28,13 +29,13 @@ import java.io.Serializable; public class ApplicationCreatedEvent extends Event implements Serializable { private static final long serialVersionUID = 2625412714611885089L; - private String appId; + private Application application; - public ApplicationCreatedEvent(String appId) { - this.appId = appId; + public ApplicationCreatedEvent(Application application) { + this.application = application; } - public String getAppId() { - return appId; + public Application getApplication() { + return application; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/CompleteApplicationsEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/CompleteApplicationsEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/CompleteApplicationsEvent.java new file mode 100644 index 0000000..25035ab --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/applications/CompleteApplicationsEvent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.applications; + +import org.apache.stratos.messaging.domain.applications.Applications; +import org.apache.stratos.messaging.event.topology.TopologyEvent; + +import java.io.Serializable; + +/** + * This event is fired periodically with the complete topology. It would be a + * starting point for subscribers to initialize the current state of the topology + * before receiving other topology events. + */ +public class CompleteApplicationsEvent extends TopologyEvent implements Serializable { + private static final long serialVersionUID = 8580862188444892004L; + + private final Applications applications; + + public CompleteApplicationsEvent(Applications applications) { + this.applications = applications; + } + + public Applications getApplications() { + return applications; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterCreatedEvent.java deleted file mode 100644 index d28cca1..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterCreatedEvent.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.messaging.event.cluster.status; - -import org.apache.stratos.messaging.event.Event; - -/** - * This event is fired by cartridge agent when it has started the server and - * applications are ready to serve the incoming requests. - */ -public class ClusterStatusClusterCreatedEvent extends Event { - private static final long serialVersionUID = 2625412714611885089L; - - private final String serviceName; - private final String clusterId; - private String appId; - - public ClusterStatusClusterCreatedEvent(String appId, String serviceName, String clusterId) { - this.serviceName = serviceName; - this.clusterId = clusterId; - this.appId = appId; - } - - public String getServiceName() { - return serviceName; - } - - public String getClusterId() { - return clusterId; - } - - public String getAppId() { - return appId; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterResettedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterResettedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterResettedEvent.java new file mode 100644 index 0000000..68c8e7f --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/cluster/status/ClusterStatusClusterResettedEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.cluster.status; + +import org.apache.stratos.messaging.event.Event; + +/** + * This event is fired by cartridge agent when it has started the server and + * applications are ready to serve the incoming requests. + */ +public class ClusterStatusClusterResettedEvent extends Event { + private static final long serialVersionUID = 2625412714611885089L; + + private final String serviceName; + private final String clusterId; + private String appId; + + public ClusterStatusClusterResettedEvent(String appId, String serviceName, String clusterId) { + this.serviceName = serviceName; + this.clusterId = clusterId; + this.appId = appId; + } + + public String getServiceName() { + return serviceName; + } + + public String getClusterId() { + return clusterId; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java index 70452ab..4c06f3b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterCreatedEvent.java @@ -28,31 +28,19 @@ import java.io.Serializable; public class ClusterCreatedEvent extends TopologyEvent implements Serializable { private static final long serialVersionUID = 2080623816272047762L; - private final String appId; - private final String serviceName; - private final String clusterId; + private final Cluster cluster; - - public ClusterCreatedEvent(String appId, String serviceName, String clusterId) { - this.appId = appId; - this.serviceName = serviceName; - this.clusterId = clusterId; + public ClusterCreatedEvent(Cluster cluster) { + this.cluster = cluster; } - public String getServiceName() { - return serviceName; - } - @Override public String toString() { - return "ClusterCreatedEvent [serviceName=" + serviceName + ", application=" + appId + "]"; - } - - public String getClusterId() { - return clusterId; + return "ClusterCreatedEvent [serviceName=" + cluster.getServiceName() + ", " + + "application=" + cluster.getAppId() + " , cluster= " + cluster.getClusterId() + " ]"; } - public String getAppId() { - return appId; + public Cluster getCluster() { + return cluster; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterResetEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterResetEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterResetEvent.java new file mode 100644 index 0000000..d4d6622 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterResetEvent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.event.Event; + +/** + * Cluster activated event will be sent by Autoscaler + */ +public class ClusterResetEvent extends Event { + + private final String serviceName; + private final String clusterId; + private String appId; + + public ClusterResetEvent(String appId, String serviceName, String clusterId) { + this.serviceName = serviceName; + this.clusterId = clusterId; + this.appId = appId; + } + + public String getServiceName() { + return serviceName; + } + + @Override + public String toString() { + return "ClusterActivatedEvent [serviceName=" + serviceName + ", clusterStatus=" + + "]"; + } + + public String getClusterId() { + return clusterId; + } + + public String getAppId() { + return appId; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/applications/CompleteApplicationsEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/applications/CompleteApplicationsEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/applications/CompleteApplicationsEventListener.java new file mode 100644 index 0000000..cd1bb24 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/applications/CompleteApplicationsEventListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.listener.applications; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class CompleteApplicationsEventListener extends EventListener { + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterCreatedEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterCreatedEventListener.java deleted file mode 100644 index 6ca5476..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterCreatedEventListener.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.listener.cluster.status; - -import org.apache.stratos.messaging.listener.EventListener; - -public abstract class ClusterStatusClusterCreatedEventListener extends EventListener{ -} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterResetEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterResetEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterResetEventListener.java new file mode 100644 index 0000000..375ae32 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/cluster/status/ClusterStatusClusterResetEventListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.cluster.status; + +import org.apache.stratos.messaging.listener.EventListener; + +public abstract class ClusterStatusClusterResetEventListener extends EventListener{ +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java index 92d61ba..db8e3c8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/ApplicationCreatedMessageProcessor.java @@ -21,10 +21,10 @@ package org.apache.stratos.messaging.message.processor.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.event.applications.ApplicationCreatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; import org.apache.stratos.messaging.util.Util; @@ -44,10 +44,10 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { @Override public boolean process(String type, String message, Object object) { - Topology topology = (Topology) object; + Applications applications = (Applications) object; if (ApplicationCreatedEvent.class.getName().equals(type)) { - if (!topology.isInitialized()) { + if (!applications.isInitialized()) { return false; } @@ -58,37 +58,24 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { } TopologyUpdater.acquireWriteLockForApplications(); - // since the Clusters will also get modified, acquire write locks for each Service Type - Set<ClusterDataHolder> clusterDataHolders = event.getApplication().getClusterDataRecursively(); - if (clusterDataHolders != null) { - for (ClusterDataHolder clusterData : clusterDataHolders) { - TopologyUpdater.acquireWriteLockForService(clusterData.getServiceType()); - } - } - try { - return doProcess(event, topology); + return doProcess(event, applications); } finally { - if (clusterDataHolders != null) { - for (ClusterDataHolder clusterData : clusterDataHolders) { - TopologyUpdater.releaseWriteLockForService(clusterData.getServiceType()); - } - } TopologyUpdater.releaseWriteLockForApplications(); } } else { if (nextProcessor != null) { // ask the next processor to take care of the message. - return nextProcessor.process(type, message, topology); + return nextProcessor.process(type, message, applications); } else { throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); } } } - private boolean doProcess (ApplicationCreatedEvent event,Topology topology) { + private boolean doProcess(ApplicationCreatedEvent event, Applications topology) { // check if required properties are available if (event.getApplication() == null) { @@ -109,9 +96,6 @@ public class ApplicationCreatedMessageProcessor extends MessageProcessor { } else { // add application and the clusters to Topology - for(Cluster cluster: event.getClusterList()) { - topology.getService(cluster.getServiceName()).addCluster(cluster); - } topology.addApplication(event.getApplication()); } http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java new file mode 100644 index 0000000..53c469b --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/CompleteApplicationsMessageProcessor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.Applications; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLock; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy; +import org.apache.stratos.messaging.event.applications.CompleteApplicationsEvent; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.applications.updater.ApplicationsUpdater; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class CompleteApplicationsMessageProcessor extends MessageProcessor { + + private static final Log log = LogFactory.getLog(CompleteApplicationsMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Applications applications = (Applications) object; + + if (CompleteApplicationsEvent.class.getName().equals(type)) { + // Parse complete message and build event + CompleteApplicationsEvent event = (CompleteApplicationsEvent) Util. + jsonToObject(message, CompleteApplicationsEvent.class); + + if (!applications.isInitialized()) { + ApplicationsUpdater.acquireWriteLock(); + + try { + doProcess(event, applications); + + } finally { + ApplicationsUpdater.releaseWriteLock(); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, applications); + } + return false; + } + } + + private void doProcess (CompleteApplicationsEvent event, Applications applications) { + // add existing Applications to Topology + Collection<Application> applicationsList = event.getApplications().getApplications().values(); + if (applicationsList != null && !applicationsList.isEmpty()) { + for (Application application : applicationsList) { + applications.addApplication(application); + if (log.isDebugEnabled()) { + log.debug("Application with id [ " + application.getUniqueIdentifier() + " ] added to Topology"); + } + } + } else { + if (log.isDebugEnabled()) { + log.debug("No Application information found in Complete Topology event"); + } + } + + if (log.isInfoEnabled()) { + log.info("Topology initialized"); + } + + // Set topology initialized + applications.setInitialized(true); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/updater/ApplicationsUpdater.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/updater/ApplicationsUpdater.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/updater/ApplicationsUpdater.java new file mode 100644 index 0000000..bbbfbf5 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/applications/updater/ApplicationsUpdater.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.processor.applications.updater; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.applications.locking.ApplicationLock; +import org.apache.stratos.messaging.domain.applications.locking.ApplicationLockHierarchy; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLock; +import org.apache.stratos.messaging.domain.topology.locking.TopologyLockHierarchy; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * Used to lock the Topology for writes by messaging component + * + * Acquire a write lock: + * + * From root level, acquire read lock, and acquire a write lock only for the + * relevant sub tree. + * + * Example 1: Acquiring write lock for a Cluster to modify the Cluster object - + * acquiring: + * public static void acquireWriteLockForCluster (String serviceName, String clusterId) + * + * releasing: + * public static void releaseWriteLockForCluster (String serviceName, String clusterId) + * + * Example 2: Acquiring write lock to add a new Cluster object - + * acquiring: + * public static void acquireWriteLockForService (String serviceName) + * + * releasing: + * public static void releaseWriteLockForService (String serviceName) + * + * Example 3: Acquiring the write lock to add a deploy a Cartridge (add a new Service) + * acquire: + * public static void acquireWriteLockForServices() + * + * release: + * public static void releaseWriteLockForServices() + */ + +public class ApplicationsUpdater { + + private static final Log log = LogFactory.getLog(ApplicationsUpdater.class); + + private static volatile ApplicationLockHierarchy applicationLockHierarchy = + ApplicationLockHierarchy.getInstance(); + + // Top level locks - should be used to lock the entire Topology + + /** + * Acquires write lock for the Complete Topology + */ + public static void acquireWriteLock() { + if(log.isDebugEnabled()) { + log.debug("Write lock acquired for Topology"); + } + applicationLockHierarchy.getApplicationLock().acquireWriteLock(); + } + + /** + * Releases write lock for the Complete Topology + */ + public static void releaseWriteLock() { + if(log.isDebugEnabled()) { + log.debug("Write lock released for Topology"); + } + applicationLockHierarchy.getApplicationLock().releaseWritelock(); + } + + /** + * Acquires write lock for the Application + * + * @param appId Application id + */ + public static void acquireWriteLockForApplication (String appId) { + + // acquire read lock for all Applications + TopologyManager.acquireReadLockForApplications(); + + ApplicationLock applicationLock = applicationLockHierarchy.getLock(appId); + if (applicationLock == null) { + handleLockNotFound("Topology lock not found for Application " + appId); + + } else { + // now, lock Application + applicationLock.acquireWriteLock(); + if(log.isDebugEnabled()) { + log.debug("Write lock acquired for Application " + appId); + } + } + } + + /** + * Releases write lock for the Application + * + * @param appId Application id + */ + public static void releaseWriteLockForApplication (String appId) { + + ApplicationLock applicationLock = applicationLockHierarchy.getLock(appId); + if (applicationLock == null) { + handleLockNotFound("Topology lock not found for Application " + appId); + + } else { + // release App lock + applicationLock.releaseWritelock(); + if(log.isDebugEnabled()) { + log.debug("Write lock released for Application " + appId); + } + } + + // release read lock for all Applications + TopologyManager.releaseReadLockForApplications(); + } + + private static void handleLockNotFound (String errorMsg) { + log.warn(errorMsg); + //throw new RuntimeException(errorMsg); + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java deleted file mode 100644 index 9b4780b..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterCreatedMessageProcessor.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.processor.cluster.status; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterCreatedEvent; -import org.apache.stratos.messaging.message.processor.MessageProcessor; -import org.apache.stratos.messaging.util.Util; - - -public class ClusterStatusClusterCreatedMessageProcessor extends MessageProcessor { - private static final Log log = LogFactory.getLog(ClusterStatusClusterCreatedMessageProcessor.class); - private MessageProcessor nextProcessor; - - @Override - public void setNext(MessageProcessor nextProcessor) { - this.nextProcessor = nextProcessor; - } - - @Override - public boolean process(String type, String message, Object object) { - if (ClusterStatusClusterCreatedEvent.class.getName().equals(type)) { - // Parse complete message and build event - ClusterStatusClusterCreatedEvent event = (ClusterStatusClusterCreatedEvent) Util. - jsonToObject(message, ClusterStatusClusterCreatedEvent.class); - - if(log.isDebugEnabled()) { - log.debug("Received AppStatusClusterCreatedEvent: " + event.toString()); - } - // Notify event listeners - notifyEventListeners(event); - return true; - } else { - if (nextProcessor != null) { - return nextProcessor.process(type, message, object); - } else { - throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterResetMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterResetMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterResetMessageProcessor.java new file mode 100644 index 0000000..b5bf301 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusClusterResetMessageProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.cluster.status.ClusterStatusClusterResettedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + + +public class ClusterStatusClusterResetMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(ClusterStatusClusterResetMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (ClusterStatusClusterResettedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ClusterStatusClusterResettedEvent event = (ClusterStatusClusterResettedEvent) Util. + jsonToObject(message, ClusterStatusClusterResettedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received AppStatusClusterCreatedEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java index 29556ec..42092bc 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/cluster/status/ClusterStatusMessageProcessorChain.java @@ -32,13 +32,13 @@ public class ClusterStatusMessageProcessorChain extends MessageProcessorChain { private ClusterStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor; - private ClusterStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor; + private ClusterStatusClusterResetMessageProcessor clusterCreatedMessageProcessor; private ClusterStatusClusterInactivateMessageProcessor clusterInactivateMessageProcessor; private ClusterStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor; private ClusterStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor; @Override protected void initialize() { - clusterCreatedMessageProcessor = new ClusterStatusClusterCreatedMessageProcessor(); + clusterCreatedMessageProcessor = new ClusterStatusClusterResetMessageProcessor(); add(clusterCreatedMessageProcessor); clusterActivatedMessageProcessor = new ClusterStatusClusterActivatedMessageProcessor(); @@ -60,7 +60,7 @@ public class ClusterStatusMessageProcessorChain extends MessageProcessorChain { @Override public void addEventListener(EventListener eventListener) { - if(eventListener instanceof ClusterStatusClusterCreatedEventListener) { + if(eventListener instanceof ClusterStatusClusterResetEventListener) { clusterCreatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ClusterStatusClusterInactivateEventListener) { clusterInactivateMessageProcessor.addEventListener(eventListener); http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java index b8cd80f..0ff303c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterCreatedMessageProcessor.java @@ -50,13 +50,13 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor { // Parse complete message and build event ClusterCreatedEvent event = (ClusterCreatedEvent) Util.jsonToObject(message, ClusterCreatedEvent.class); - - TopologyUpdater.acquireWriteLockForService(event.getServiceName()); + String serviceName = event.getCluster().getServiceName(); + TopologyUpdater.acquireWriteLockForService(serviceName); try { return doProcess(event, topology); } finally { - TopologyUpdater.releaseWriteLockForService(event.getServiceName()); + TopologyUpdater.releaseWriteLockForService(serviceName); } } else { @@ -70,13 +70,15 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor { } private boolean doProcess (ClusterCreatedEvent event,Topology topology) { - + Cluster cluster = event.getCluster(); + String serviceName = cluster.getServiceName(); + String clusterId = cluster.getClusterId(); // Apply service filter if (TopologyServiceFilter.getInstance().isActive()) { - if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(serviceName)) { // Service is excluded, do not update topology or fire event if (log.isDebugEnabled()) { - log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + log.debug(String.format("Service is excluded: [service] %s", serviceName)); } return false; } @@ -84,10 +86,10 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor { // Apply cluster filter if (TopologyClusterFilter.getInstance().isActive()) { - if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(clusterId)) { // Cluster is excluded, do not update topology or fire event if (log.isDebugEnabled()) { - log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); + log.debug(String.format("Cluster is excluded: [cluster] %s", clusterId)); } return false; } @@ -105,27 +107,24 @@ public class ClusterCreatedMessageProcessor extends MessageProcessor { throw new RuntimeException("Host name/s not found in cluster created event"); }*/ // Validate event against the existing topology - Service service = topology.getService(event.getServiceName()); + Service service = topology.getService(serviceName); if (service == null) { if (log.isWarnEnabled()) { log.warn(String.format("Service does not exist: [service] %s", - event.getServiceName())); + serviceName)); } return false; } - if (service.clusterExists(event.getClusterId())) { + if (service.clusterExists(clusterId)) { if (log.isWarnEnabled()) { - log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(), - event.getClusterId())); + log.warn(String.format("Cluster already exists in service: [service] %s " + + "[cluster] %s",serviceName , + clusterId)); } } else { // Apply changes to the topology - Cluster cluster = service.getCluster(event.getClusterId()); - if (!cluster.isStateTransitionValid(ClusterStatus.Created)) { - log.error("Invalid State Transition from " + cluster.getStatus() + " to " + ClusterStatus.Created + " " + - "for cluster " + cluster.getClusterId()); - } + service.addCluster(cluster); cluster.setStatus(ClusterStatus.Created); if (log.isInfoEnabled()) { log.info(String.format("Cluster created: %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/0f7d7b0d/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java new file mode 100644 index 0000000..3cfb2dc --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.event.topology.ClusterResetEvent; +import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; +import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +public class ClusterResetMessageProcessor extends MessageProcessor { + + private static final Log log = LogFactory.getLog(ClusterResetMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + + Topology topology = (Topology) object; + if (ClusterResetEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) { + return false; + } + + // Parse complete message and build event + ClusterResetEvent event = (ClusterResetEvent) Util. + jsonToObject(message, ClusterResetEvent.class); + + TopologyUpdater.acquireWriteLockForService(event.getServiceName()); + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForService(event.getServiceName()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (ClusterResetEvent event,Topology topology) { + + // Apply service filter + if (TopologyServiceFilter.getInstance().isActive()) { + if (TopologyServiceFilter.getInstance().serviceNameExcluded(event.getServiceName())) { + // Service is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Service is excluded: [service] %s", event.getServiceName())); + } + return false; + } + } + + // Apply cluster filter + if (TopologyClusterFilter.getInstance().isActive()) { + if (TopologyClusterFilter.getInstance().clusterIdExcluded(event.getClusterId())) { + // Cluster is excluded, do not update topology or fire event + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster is excluded: [cluster] %s", event.getClusterId())); + } + return false; + } + } + + // Validate event properties + /*Cluster cluster = event.getCluster(); + + if(cluster == null) { + String msg = "Cluster object of cluster created event is null."; + log.error(msg); + throw new RuntimeException(msg); + } + if (cluster.getHostNames().isEmpty()) { + throw new RuntimeException("Host name/s not found in cluster created event"); + }*/ + // Validate event against the existing topology + Service service = topology.getService(event.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service does not exist: [service] %s", + event.getServiceName())); + } + return false; + } + if (service.clusterExists(event.getClusterId())) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(), + event.getClusterId())); + } + } else { + + // Apply changes to the topology + Cluster cluster = service.getCluster(event.getClusterId()); + if (!cluster.isStateTransitionValid(ClusterStatus.Created)) { + log.error("Invalid State Transition from " + cluster.getStatus() + " to " + ClusterStatus.Created + " " + + "for cluster " + cluster.getClusterId()); + } + cluster.setStatus(ClusterStatus.Created); + if (log.isInfoEnabled()) { + log.info(String.format("Cluster reset as Created: %s", + cluster.toString())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +}
