merge with new changes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f70aa9ed Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f70aa9ed Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f70aa9ed Branch: refs/heads/master Commit: f70aa9edc4d0e32d3b726b99eca217dfd4d3a475 Parents: 4e73393 Author: gayan <[email protected]> Authored: Tue Dec 2 17:42:21 2014 +0530 Committer: gayan <[email protected]> Committed: Tue Dec 2 17:42:21 2014 +0530 ---------------------------------------------------------------------- .../AutoscalerTopologyEventReceiver.java | 498 +------------------ .../internal/AutoscalerServerComponent.java | 230 ++------- .../CloudControllerServiceComponent.java | 143 +----- .../internal/LoadBalancerServiceComponent.java | 29 +- .../extension/FaultHandlingWindowProcessor.java | 217 -------- 5 files changed, 87 insertions(+), 1030 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index 1f14542..f4a5169 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -27,8 +27,11 @@ import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory; import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; +import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException; +import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException; import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException; import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; +import org.apache.stratos.autoscaler.monitor.MonitorFactory; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent; @@ -63,34 +66,22 @@ public class AutoscalerTopologyEventReceiver { private boolean topologyInitialized; private ExecutorService executorService; -<<<<<<< HEAD - public AutoscalerTopologyEventReceiver() { - this.topologyEventReceiver = new TopologyEventReceiver(); - addEventListeners(); - } - + public AutoscalerTopologyEventReceiver() { + this.topologyEventReceiver = new TopologyEventReceiver(); + addEventListeners(); + } - public void execute() { - //FIXME this activated before autoscaler deployer activated. + public void execute() { + //FIXME this activated before autoscaler deployer activated. - topologyEventReceiver.setExecutorService(getExecutorService()); - topologyEventReceiver.execute(); + topologyEventReceiver.setExecutorService(getExecutorService()); + topologyEventReceiver.execute(); - if (log.isInfoEnabled()) { - log.info("Autoscaler topology receiver thread started"); - } + if (log.isInfoEnabled()) { + log.info("Autoscaler topology receiver thread started"); + } - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - if (log.isInfoEnabled()) { - log.info("Autoscaler topology receiver thread terminated"); - } - } + } private boolean allClustersInitialized(Application application) { boolean allClustersInitialized = false; @@ -524,461 +515,6 @@ public class AutoscalerTopologyEventReceiver { topologyEventReceiver.terminate(); terminated = true; } -======= - public AutoscalerTopologyEventReceiver() { - this.topologyEventReceiver = new TopologyEventReceiver(); - addEventListeners(); - } - - public void execute() { - //FIXME this activated before autoscaler deployer activated. - - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); - - if (log.isInfoEnabled()) { - log.info("Autoscaler topology receiver thread started"); - } - - } - - private boolean allClustersInitialized(Application application) { - boolean allClustersInitialized = false; - for (ClusterDataHolder holder : application.getClusterDataRecursively()) { - TopologyManager.acquireReadLockForCluster(holder.getServiceType(), - holder.getClusterId()); - - try { - Topology topology = TopologyManager.getTopology(); - if (topology != null) { - Service service = topology.getService(holder.getServiceType()); - if (service != null) { - if (service.clusterExists(holder.getClusterId())) { - allClustersInitialized = true; - return allClustersInitialized; - } else { - if (log.isDebugEnabled()) { - log.debug("[Cluster] " + holder.getClusterId() + " is not found in " + - "the Topology"); - } - allClustersInitialized = false; - } - } else { - if (log.isDebugEnabled()) { - log.debug("Service is null in the CompleteTopologyEvent"); - } - } - } else { - if (log.isDebugEnabled()) { - log.debug("Topology is null in the CompleteTopologyEvent"); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(holder.getServiceType(), - holder.getClusterId()); - } - } - return allClustersInitialized; - } - - private void addEventListeners() { - // Listen to topology events that affect clusters - topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { - @Override - protected void onEvent(Event event) { - if (!topologyInitialized) { - log.info("[CompleteTopologyEvent] Received: " + event.getClass()); - ApplicationHolder.acquireReadLock(); - try { - Applications applications = ApplicationHolder.getApplications(); - if (applications != null) { - for (Application application : applications.getApplications().values()) { - if (allClustersInitialized(application)) { - startApplicationMonitor(application.getUniqueIdentifier()); - } else { - log.error("Complete Topology is not consistent with the applications " + - "which got persisted"); - } - } - topologyInitialized = true; - } else { - log.info("No applications found in the complete topology"); - } - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - ApplicationHolder.releaseReadLock(); - } - } - } - }); - - topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass()); - ApplicationClustersCreatedEvent applicationClustersCreatedEvent = - (ApplicationClustersCreatedEvent) event; - String appId = applicationClustersCreatedEvent.getAppId(); - try { - //acquire read lock - ApplicationHolder.acquireReadLock(); - //start the application monitor - startApplicationMonitor(appId); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } finally { - //release read lock - ApplicationHolder.releaseReadLock(); - - } - } catch (ClassCastException e) { - String msg = "Error while casting the event " + e.getLocalizedMessage(); - log.error(msg, e); - } - - } - }); - - topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterActivatedEvent] Received: " + event.getClass()); - ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event; - String clusterId = clusterActivatedEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - //changing the status in the monitor, will notify its parent monitor - - } - }); - - topologyEventReceiver.addEventListener(new ClusterResetEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterCreatedEvent] Received: " + event.getClass()); - ClusterResetEvent clusterResetEvent = (ClusterResetEvent) event; - String clusterId = clusterResetEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - //changing the status in the monitor, will notify its parent monitor - monitor.destroy(); - monitor.setStatus(ClusterStatus.Created); - - } - }); - - topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterCreatedEvent] Received: " + event.getClass()); - } - }); - - topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterInActivateEvent] Received: " + event.getClass()); - ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event; - String clusterId = clusterInactivateEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - //changing the status in the monitor, will notify its parent monitor - monitor.setStatus(ClusterStatus.Inactive); - } - }); - - topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterTerminatingEvent] Received: " + event.getClass()); - ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event; - String clusterId = clusterTerminatingEvent.getClusterId(); - String instanceId = clusterTerminatingEvent.getInstanceId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - // if monitor does not exist, send cluster terminated event - ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(), - clusterTerminatingEvent.getServiceName(), - clusterId, instanceId); - return; - } - //changing the status in the monitor, will notify its parent monitor - if (monitor.getStatus() == ClusterStatus.Active) { - // terminated gracefully - monitor.setStatus(ClusterStatus.Terminating); - InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId); - } else { - monitor.setStatus(ClusterStatus.Terminating); - monitor.terminateAllMembers(); - } - ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain(). - process("", clusterId, instanceId); - } - }); - - topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - log.info("[ClusterTerminatedEvent] Received: " + event.getClass()); - ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event; - String clusterId = clusterTerminatedEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - // if the cluster monitor is null, assume that its termianted - ApplicationMonitor appMonitor = - AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId()); - if (appMonitor != null) { - appMonitor - .onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null)); - } - return; - } - //changing the status in the monitor, will notify its parent monitor - monitor.setStatus(ClusterStatus.Terminated); - //Destroying and Removing the Cluster monitor - monitor.destroy(); - AutoscalerContext.getInstance().removeClusterMonitor(clusterId); - } - }); - - topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event; - String clusterId = memberReadyToShutdownEvent.getClusterId(); - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } - }); - - topologyEventReceiver.addEventListener(new MemberStartedEventListener() { - @Override - protected void onEvent(Event event) { - - } - }); - - topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; - String clusterId = memberTerminatedEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberTerminatedEvent(memberTerminatedEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } - }); - - topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; - String clusterId = memberActivatedEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberActivatedEvent(memberActivatedEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } - }); - - topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { - @Override - protected void onEvent(Event event) { - try { - MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event; - String clusterId = maintenanceModeEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } - } - }); - - topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() { - @Override - protected void onEvent(Event event) { - - ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = - (ClusterInstanceCreatedEvent) event; - AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance(). - getClusterMonitor(clusterInstanceCreatedEvent.getClusterId()); - - if (clusterMonitor != null) { - TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), - clusterInstanceCreatedEvent.getClusterId()); - - try { - Service service = TopologyManager.getTopology(). - getService(clusterInstanceCreatedEvent.getServiceName()); - - if (service != null) { - Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId()); - if (cluster != null) { - // create and add Cluster Context - try { - if (cluster.isKubernetesCluster()) { - clusterMonitor.addClusterContextForInstance( - clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getKubernetesClusterContext(cluster)); - } else if (cluster.isLbCluster()) { - clusterMonitor.addClusterContextForInstance( - clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getVMLBClusterContext(cluster)); - } else { - clusterMonitor.addClusterContextForInstance( - clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getVMServiceClusterContext(cluster)); - } - - if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) { - clusterMonitor.startScheduler(); - log.info("Monitoring task for Cluster Monitor with cluster id " + - clusterInstanceCreatedEvent.getClusterId() + " started successfully"); - } - - } catch (PolicyValidationException e) { - log.error(e.getMessage(), e); - } catch (PartitionValidationException e) { - log.error(e.getMessage(), e); - } - - } else { - log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() + - ", no cluster instance added to ClusterMonitor " + - clusterInstanceCreatedEvent.getClusterId()); - } - } else { - log.error("Service " + clusterInstanceCreatedEvent.getServiceName() + - " not found, no cluster instance added to ClusterMonitor " + - clusterInstanceCreatedEvent.getClusterId()); - } - - } finally { - TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), - clusterInstanceCreatedEvent.getClusterId()); - } - - } else { - log.error("No Cluster Monitor found for cluster id " + - clusterInstanceCreatedEvent.getClusterId()); - } - } - }); - } - - /** - * Terminate load balancer topology receiver thread. - */ - public void terminate() { - topologyEventReceiver.terminate(); - terminated = true; - } - - protected synchronized void startApplicationMonitor(String applicationId) { - Thread th = null; - if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) { - th = new Thread(new ApplicationMonitorAdder(applicationId)); - } - if (th != null) { - th.start(); - } else { - if (log.isDebugEnabled()) { - log.debug(String - .format("Application monitor thread already exists: " + - "[application] %s ", applicationId)); - } - } - } ->>>>>>> ddf277b... Remove unnessary threads in messaging model public ExecutorService getExecutorService() { return executorService; @@ -987,8 +523,6 @@ public class AutoscalerTopologyEventReceiver { public void setExecutorService(ExecutorService executorService) { this.executorService = executorService; } -<<<<<<< HEAD -======= private class ApplicationMonitorAdder implements Runnable { private String appId; @@ -1047,5 +581,5 @@ public class AutoscalerTopologyEventReceiver { } } } ->>>>>>> ddf277b... Remove unnessary threads in messaging model + } http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java index bb5e167..91d52d3 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java @@ -39,6 +39,7 @@ import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.cloud.controller.stub.domain.Partition; import org.apache.stratos.common.kubernetes.KubernetesGroup; import org.apache.stratos.common.threading.StratosThreadPool; +import org.drools.reteoo.PartitionManager; import org.osgi.service.component.ComponentContext; import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.registry.api.RegistryException; @@ -75,28 +76,27 @@ public class AutoscalerServerComponent { protected void activate(ComponentContext componentContext) throws Exception { -<<<<<<< HEAD - try { - // Start topology receiver - XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration(); - int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE); - String threadIdentifier=conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER); - ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize); - asTopologyReceiver = new AutoscalerTopologyEventReceiver(); - asTopologyReceiver.setExecutorService(executorService); - asTopologyReceiver.execute(); + try { + // Start topology receiver + XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration(); + int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE); + String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER); + ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize); + asTopologyReceiver = new AutoscalerTopologyEventReceiver(); + asTopologyReceiver.setExecutorService(executorService); + asTopologyReceiver.execute(); - if (log.isDebugEnabled()) { - log.debug("Topology receiver executor service started"); - } + if (log.isDebugEnabled()) { + log.debug("Topology receiver executor service started"); + } - // Start health stat receiver - autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver(); - Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver); - healthDelegatorThread.start(); - if (log.isDebugEnabled()) { - log.debug("Health statistics receiver thread started"); - } + // Start health stat receiver + autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver(); + autoscalerHealthStatEventReceiver.setExecutorService(executorService); + autoscalerHealthStatEventReceiver.execute(); + if (log.isDebugEnabled()) { + log.debug("Health statistics receiver thread started"); + } // Adding the registry stored partitions to the information model List<Partition> partitions = RegistryManager.getInstance().retrievePartitions(); @@ -105,7 +105,7 @@ public class AutoscalerServerComponent { Partition partition = partitionIterator.next(); // PartitionManager.getInstance().addPartitionToInformationModel(partition); } - + // Adding the network partitions stored in registry to the information model // List<NetworkPartitionLbHolder> nwPartitionHolders = RegistryManager.getInstance().retrieveNetworkPartitionLbHolders(); // Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator(); @@ -113,7 +113,7 @@ public class AutoscalerServerComponent { // NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next(); // PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition); // } - + List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies(); Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator(); while (asPolicyIterator.hasNext()) { @@ -121,43 +121,43 @@ public class AutoscalerServerComponent { PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy); } - List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies(); - Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator(); - while (depPolicyIterator.hasNext()) { - DeploymentPolicy depPolicy = depPolicyIterator.next(); - PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy); - } + List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies(); + Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator(); + while (depPolicyIterator.hasNext()) { + DeploymentPolicy depPolicy = depPolicyIterator.next(); + PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy); + } - // Adding KubernetesGroups stored in registry to the information model - List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups(); - Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator(); - while (kubernetesGroupIterator.hasNext()) { - KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next(); - KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup); - } + // Adding KubernetesGroups stored in registry to the information model + List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups(); + Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator(); + while (kubernetesGroupIterator.hasNext()) { + KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next(); + KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup); + } - //starting the processor chain - ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain(); - ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain); + //starting the processor chain + ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain(); + ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain); - GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain(); - ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain); + GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain(); + ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain); - if (log.isInfoEnabled()) { - log.info("Scheduling tasks to publish applications"); - } + if (log.isInfoEnabled()) { + log.info("Scheduling tasks to publish applications"); + } - ApplicationSynchronizerTaskScheduler - .schedule(ServiceReferenceHolder.getInstance() - .getTaskService()); + ApplicationSynchronizerTaskScheduler + .schedule(ServiceReferenceHolder.getInstance() + .getTaskService()); - if (log.isInfoEnabled()) { - log.info("Autoscaler server Component activated"); - } - } catch (Throwable e) { - log.error("Error in activating the autoscaler component ", e); - } - } + if (log.isInfoEnabled()) { + log.info("Autoscaler server Component activated"); + } + } catch (Throwable e) { + log.error("Error in activating the autoscaler component ", e); + } + } protected void deactivate(ComponentContext context) { asTopologyReceiver.terminate(); @@ -198,128 +198,4 @@ public class AutoscalerServerComponent { ServiceReferenceHolder.getInstance().setTaskService(null); } } -======= - try { - // Start topology receiver - XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration(); - int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE); - String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER); - ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize); - asTopologyReceiver = new AutoscalerTopologyEventReceiver(); - asTopologyReceiver.setExecutorService(executorService); - asTopologyReceiver.execute(); - - if (log.isDebugEnabled()) { - log.debug("Topology receiver executor service started"); - } - - // Start health stat receiver - autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver(); - autoscalerHealthStatEventReceiver.setExecutorService(executorService); - autoscalerHealthStatEventReceiver.execute(); - if (log.isDebugEnabled()) { - log.debug("Health statistics receiver thread started"); - } - - // Adding the registry stored partitions to the information model - List<Partition> partitions = RegistryManager.getInstance().retrievePartitions(); - Iterator<Partition> partitionIterator = partitions.iterator(); - while (partitionIterator.hasNext()) { - Partition partition = partitionIterator.next(); - PartitionManager.getInstance().addPartitionToInformationModel(partition); - } - - // Adding the network partitions stored in registry to the information model - List<NetworkPartitionLbHolder> nwPartitionHolders = - RegistryManager.getInstance().retrieveNetworkPartitionLbHolders(); - Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator(); - while (nwPartitionIterator.hasNext()) { - NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next(); - PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition); - } - - List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies(); - Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator(); - while (asPolicyIterator.hasNext()) { - AutoscalePolicy asPolicy = asPolicyIterator.next(); - PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy); - } - - List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies(); - Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator(); - while (depPolicyIterator.hasNext()) { - DeploymentPolicy depPolicy = depPolicyIterator.next(); - PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy); - } - - // Adding KubernetesGroups stored in registry to the information model - List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups(); - Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator(); - while (kubernetesGroupIterator.hasNext()) { - KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next(); - KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup); - } - - //starting the processor chain - ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain(); - ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain); - GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain(); - ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain); - - if (log.isInfoEnabled()) { - log.info("Scheduling tasks to publish applications"); - } - - ApplicationSynchronizerTaskScheduler - .schedule(ServiceReferenceHolder.getInstance() - .getTaskService()); - - if (log.isInfoEnabled()) { - log.info("Autoscaler server Component activated"); - } - } catch (Throwable e) { - log.error("Error in activating the autoscaler component ", e); - } - } - - protected void deactivate(ComponentContext context) { - asTopologyReceiver.terminate(); - autoscalerHealthStatEventReceiver.terminate(); - } - - protected void setRegistryService(RegistryService registryService) { - if (log.isDebugEnabled()) { - log.debug("Setting the Registry Service"); - } - try { - ServiceReferenceHolder.getInstance().setRegistry(registryService.getGovernanceSystemRegistry()); - } catch (RegistryException e) { - String msg = "Failed when retrieving Governance System Registry."; - log.error(msg, e); - throw new AutoScalerException(msg, e); - } - } - - protected void unsetRegistryService(RegistryService registryService) { - if (log.isDebugEnabled()) { - log.debug("Un-setting the Registry Service"); - } - ServiceReferenceHolder.getInstance().setRegistry(null); - } - - protected void setTaskService(TaskService taskService) { - if (log.isDebugEnabled()) { - log.debug("Setting the Task Service"); - } - ServiceReferenceHolder.getInstance().setTaskService(taskService); - } - - protected void unsetTaskService(TaskService taskService) { - if (log.isDebugEnabled()) { - log.debug("Un-setting the Task Service"); - } - ServiceReferenceHolder.getInstance().setTaskService(null); - } -} ->>>>>>> ddf277b... Remove unnessary threads in messaging model http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index a413218..700efb9 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -20,41 +20,23 @@ package org.apache.stratos.cloud.controller.internal; * */ -<<<<<<< HEAD -<<<<<<< HEAD +import org.apache.commons.configuration.XMLConfiguration; -<<<<<<< HEAD import com.hazelcast.core.HazelcastInstance; -======= ->>>>>>> ddf277b... Remove unnessary threads in messaging model -======= - ->>>>>>> ad3e45c... Remove unnessary threads in messaging model -======= -import org.apache.commons.configuration.XMLConfiguration; ->>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.context.CloudControllerContext; import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver; import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver; import org.apache.stratos.cloud.controller.exception.CloudControllerException; -<<<<<<< HEAD import org.apache.stratos.cloud.controller.services.CloudControllerService; import org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl; import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler; import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver; import org.apache.stratos.common.clustering.DistributedObjectProvider; -======= -import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl; -import org.apache.stratos.cloud.controller.interfaces.CloudControllerService; -import org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler; -import org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver; -import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.common.util.ConfUtil; ->>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.util.Util; import org.osgi.framework.BundleContext; @@ -72,7 +54,6 @@ import java.util.concurrent.ExecutorService; * Registering Cloud Controller Service. * * @scr.component name="org.apache.stratos.cloud.controller" immediate="true" -<<<<<<< HEAD * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.clustering.DistributedObjectProvider" * cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider" * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService" @@ -81,24 +62,6 @@ import java.util.concurrent.ExecutorService; * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService" * @scr.reference name="config.context.service" interface="org.wso2.carbon.utils.ConfigurationContextService" * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService" -======= - * @scr.reference name="distributedMapProvider" interface="org.wso2.carbon.caching.impl.DistributedMapProvider" - * cardinality="1..1" policy="dynamic" bind="setDistributedMapProvider" unbind="unsetDistributedMapProvider" - * @scr.reference name="ntask.component" - * interface="org.wso2.carbon.ntask.core.service.TaskService" - * cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService" - * @scr.reference name="registry.service" - * interface="org.wso2.carbon.registry.core.service.RegistryService" - * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService" - * @scr.reference name="config.context.service" -<<<<<<< HEAD - * interface="org.wso2.carbon.utils.ConfigurationContextService" - * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService" ->>>>>>> ddf277b... Remove unnessary threads in messaging model -======= - * interface="org.wso2.carbon.utils.ConfigurationContextService" - * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService" ->>>>>>> ad3e45c... Remove unnessary threads in messaging model */ public class CloudControllerServiceComponent { @@ -107,8 +70,8 @@ public class CloudControllerServiceComponent { private InstanceStatusTopicReceiver instanceStatusTopicReceiver; private ApplicationTopicReceiver applicationTopicReceiver; private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier"; - private static final String DEFAULT_IDENTIFIER = "Auto-Scaler"; - private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize"; + private static final String DEFAULT_IDENTIFIER = "Cloud-Controller"; + private static final String THREAD_POOL_SIZE_KEY = "threadPool.cloudcontroller.threadPoolSize"; private static final String COMPONENTS_CONFIG = "stratos-config"; private static final int THREAD_POOL_SIZE = 10; @@ -127,28 +90,16 @@ public class CloudControllerServiceComponent { log.info("Application Receiver thread started"); } -<<<<<<< HEAD - if (log.isInfoEnabled()) { - log.info("Application event receiver thread started"); - } -======= clusterStatusTopicReceiver = new ClusterStatusTopicReceiver(); clusterStatusTopicReceiver.setExecutorService(executorService); clusterStatusTopicReceiver.execute(); ->>>>>>> ddf277b... Remove unnessary threads in messaging model if (log.isInfoEnabled()) { log.info("Cluster status Receiver thread started"); } -<<<<<<< HEAD - if (log.isInfoEnabled()) { - log.info("Cluster status receiver thread started"); - } -======= instanceStatusTopicReceiver = new InstanceStatusTopicReceiver(); instanceStatusTopicReceiver.execute(); ->>>>>>> ddf277b... Remove unnessary threads in messaging model if (log.isInfoEnabled()) { log.info("Instance status message receiver thread started"); @@ -163,23 +114,15 @@ public class CloudControllerServiceComponent { log.info("Scheduling tasks"); } -<<<<<<< HEAD - if(log.isInfoEnabled()) { - log.info("Scheduling tasks"); - } + TopologySynchronizerTaskScheduler + .schedule(ServiceReferenceHolder.getInstance() + .getTaskService()); - if ((!CloudControllerContext.getInstance().isClustered()) || - (CloudControllerContext.getInstance().isCoordinator())) { - TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService()); - if(log.isInfoEnabled()) { - log.info("Topology synchronizer task scheduled"); - } - } - } catch (Throwable e) { - log.error("**** Cloud controller service bundle is failed to activate ****", e); + } catch (Throwable e) { + log.error("******* Cloud Controller Service bundle is failed to activate ****", e); } } - + protected void setTaskService(TaskService taskService) { if (log.isDebugEnabled()) { log.debug("Setting the Task Service"); @@ -193,32 +136,7 @@ public class CloudControllerServiceComponent { } ServiceReferenceHolder.getInstance().setTaskService(null); } - -======= - TopologySynchronizerTaskScheduler - .schedule(ServiceReferenceHolder.getInstance() - .getTaskService()); - - } catch (Throwable e) { - log.error("******* Cloud Controller Service bundle is failed to activate ****", e); - } - } - - protected void setTaskService(TaskService taskService) { - if (log.isDebugEnabled()) { - log.debug("Setting the Task Service"); - } - ServiceReferenceHolder.getInstance().setTaskService(taskService); - } - protected void unsetTaskService(TaskService taskService) { - if (log.isDebugEnabled()) { - log.debug("Unsetting the Task Service"); - } - ServiceReferenceHolder.getInstance().setTaskService(null); - } - ->>>>>>> ddf277b... Remove unnessary threads in messaging model protected void setRegistryService(RegistryService registryService) { if (log.isDebugEnabled()) { log.debug("Setting the Registry Service"); @@ -226,39 +144,22 @@ public class CloudControllerServiceComponent { try { UserRegistry registry = registryService.getGovernanceSystemRegistry(); -<<<<<<< HEAD ServiceReferenceHolder.getInstance().setRegistry(registry); } catch (RegistryException e) { String msg = "Failed when retrieving Governance System Registry."; log.error(msg, e); throw new CloudControllerException(msg, e); - } -======= - ServiceReferenceHolder.getInstance() - .setRegistry(registry); - } catch (RegistryException e) { - String msg = "Failed when retrieving Governance System Registry."; - log.error(msg, e); - throw new CloudControllerException(msg, e); - } ->>>>>>> ddf277b... Remove unnessary threads in messaging model + } } protected void unsetRegistryService(RegistryService registryService) { if (log.isDebugEnabled()) { -<<<<<<< HEAD log.debug("Un-setting the Registry Service"); } ServiceReferenceHolder.getInstance().setRegistry(null); -======= - log.debug("Unsetting the Registry Service"); - } - ServiceReferenceHolder.getInstance().setRegistry(null); ->>>>>>> ddf277b... Remove unnessary threads in messaging model } protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) { -<<<<<<< HEAD ServiceReferenceHolder.getInstance().setAxisConfiguration( cfgCtxService.getServerConfigContext().getAxisConfiguration()); } @@ -274,27 +175,9 @@ public class CloudControllerServiceComponent { protected void unsetDistributedObjectProvider(DistributedObjectProvider distributedObjectProvider) { ServiceReferenceHolder.getInstance().setDistributedObjectProvider(null); } - -======= - ServiceReferenceHolder.getInstance().setAxisConfiguration( - cfgCtxService.getServerConfigContext().getAxisConfiguration()); - } - - protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) { - ServiceReferenceHolder.getInstance().setAxisConfiguration(null); - } - - protected void setDistributedMapProvider(DistributedMapProvider mapProvider) { - ServiceReferenceHolder.getInstance().setDistributedMapProvider(mapProvider); - } - - protected void unsetDistributedMapProvider(DistributedMapProvider mapProvider) { - ServiceReferenceHolder.getInstance().setDistributedMapProvider(null); - } ->>>>>>> ddf277b... Remove unnessary threads in messaging model protected void deactivate(ComponentContext ctx) { - // Close event publisher connections to message broker - EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName()); + // Close event publisher connections to message broker + EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index 3aa77a8..509bc74 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -120,30 +120,13 @@ public class LoadBalancerServiceComponent { TopologyFilterConfigurator.configure(configuration); if (configuration.isMultiTenancyEnabled()) { -<<<<<<< HEAD // Start tenant event receiver startTenantEventReceiver(); -======= - - tenantReceiver = new LoadBalancerTenantEventReceiver(); - tenantReceiver.execute(); - - if (log.isInfoEnabled()) { - log.info("Tenant receiver thread started"); - } ->>>>>>> ae876c1... Remove unnessary threads in messaging model } if (configuration.isTopologyEventListenerEnabled()) { // Start topology receiver -<<<<<<< HEAD startTopologyEventReceiver(); -======= - topologyReceiver = new LoadBalancerTopologyEventReceiver(); - topologyReceiver.execute(); - if (log.isInfoEnabled()) { - log.info("Topology receiver thread started"); - } if (log.isInfoEnabled()) { if (TopologyServiceFilter.getInstance().isActive()) { @@ -177,7 +160,7 @@ public class LoadBalancerServiceComponent { log.info(String.format("Member filter activated: [lb-cluster-ids] %s", sb.toString())); } } ->>>>>>> ae876c1... Remove unnessary threads in messaging model + } if(configuration.isCepStatsPublisherEnabled()) { @@ -197,18 +180,16 @@ public class LoadBalancerServiceComponent { } private void startTenantEventReceiver() { - tenantReceiver = new LoadBalancerTenantEventReceiver(); - Thread tenantReceiverThread = new Thread(tenantReceiver); - tenantReceiverThread.start(); + tenantReceiver = new LoadBalancerTenantEventReceiver(); + tenantReceiver.execute(); if (log.isInfoEnabled()) { log.info("Tenant receiver thread started"); } } private void startTopologyEventReceiver() { - topologyReceiver = new LoadBalancerTopologyEventReceiver(); - Thread topologyReceiverThread = new Thread(topologyReceiver); - topologyReceiverThread.start(); + topologyReceiver = new LoadBalancerTopologyEventReceiver(); + topologyReceiver.execute(); if (log.isInfoEnabled()) { log.info("Topology receiver thread started"); } http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 5ee28d1..8bfcb2c 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -107,7 +107,6 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run log.debug("Member not found in the toplogy. Event rejected"); return; } -<<<<<<< HEAD if (StringUtils.isNotEmpty(id)) { memberTimeStampMap.put(id, event.getTimeStamp()); } else { @@ -320,220 +319,4 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { return memberTimeStampMap; } -======= - if (StringUtils.isNotEmpty(id)) { - memberTimeStampMap.put(id, event.getTimeStamp()); - } else { - log.warn("NULL member id found in the event received. Event rejected."); - } - if (log.isDebugEnabled()) { - log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); - } - } - - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } - - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } - - /** - * Retrieve the current activated members from the topology and initialize the time stamp map. - * This will allow the system to recover from a restart - * - * @param topology Topology model object - */ - boolean loadTimeStampMapFromTopology(Topology topology) { - - long currentTimeStamp = System.currentTimeMillis(); - if (topology == null || topology.getServices() == null) { - return false; - } - // TODO make this efficient by adding APIs to messaging component - for (Service service : topology.getServices()) { - if (service.getClusters() != null) { - for (Cluster cluster : service.getClusters()) { - if (cluster.getMembers() != null) { - for (Member member : cluster.getMembers()) { - // we are checking faulty status only in previously activated members - if (member != null && MemberStatus.Activated.equals(member.getStatus())) { - // Initialize the member time stamp map from the topology at the beginning - memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); - } - } - } - } - } - } - - log.info("Member time stamp map was successfully loaded from the topology."); - if (log.isDebugEnabled()) { - log.debug("Member TimeStamp Map: " + memberTimeStampMap); - } - return true; - } - - private Member getMemberFromId(String memberId) { - if (StringUtils.isEmpty(memberId)) { - return null; - } - if (TopologyManager.getTopology().isInitialized()) { - try { - TopologyManager.acquireReadLock(); - if (TopologyManager.getTopology().getServices() == null) { - return null; - } - // TODO make this efficient by adding APIs to messaging component - for (Service service : TopologyManager.getTopology().getServices()) { - if (service.getClusters() != null) { - for (Cluster cluster : service.getClusters()) { - if (cluster.getMembers() != null) { - for (Member member : cluster.getMembers()) { - if (memberId.equals(member.getMemberId())) { - return member; - } - } - } - } - } - } - } catch (Exception e) { - log.error("Error while reading topology" + e); - } finally { - TopologyManager.releaseReadLock(); - } - } - return null; - } - - private void publishMemberFault(String memberId) { - Member member = getMemberFromId(memberId); - if (member == null) { - log.error("Failed to publish member fault event. Member having [member-id] " + memberId + - " does not exist in topology"); - return; - } - log.info("Publishing member fault event for [member-id] " + memberId); - - MemberFaultEvent memberFaultEvent = - new MemberFaultEvent(member.getClusterId(), member.getInstanceId(), member.getMemberId(), - member.getPartitionId(), 0); - - memberFaultEventMessageMap.put("message", memberFaultEvent); - healthStatPublisher.publish(MemberFaultEventMap, true); - } - - @Override - public void run() { - try { - threadBarrier.pass(); - - for (Object o : memberTimeStampMap.entrySet()) { - Map.Entry pair = (Map.Entry) o; - long currentTime = System.currentTimeMillis(); - Long eventTimeStamp = (Long) pair.getValue(); - - if ((currentTime - eventTimeStamp) > TIME_OUT) { - log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + - eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); - publishMemberFault((String) pair.getKey()); - } - } - if (log.isDebugEnabled()) { - log.debug("Fault handling processor iteration completed with [time-stamp map length] " + - memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); - } - } catch (Throwable t) { - log.error(t.getMessage(), t); - } finally { - faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - } - - @Override - protected Object[] currentState() { - return new Object[] { window.currentState() }; - } - - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - window.reSchedule(); - } - - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, - AbstractDefinition streamDefinition, String elementId, boolean async, - SiddhiContext siddhiContext) { - - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } - - String memberIdAttrName = ((Variable) parameters[1]).getAttributeName(); - memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName); - - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - MemberFaultEventMap - .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - - ExecutorService executorService = StratosThreadPool.getExecutorService(IDENTIFIER, 10); - cepTopologyEventReceiver.setExecutorService(executorService); - executorService.execute(cepTopologyEventReceiver); - - //Ordinary scheduling - window.schedule(); - if (log.isDebugEnabled()) { - log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + - ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + - ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); - } - } - - @Override - public void schedule() { - faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - - @Override - public void scheduleNow() { - faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.faultHandleScheduler = scheduledExecutorService; - } - - @Override - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } - - @Override - public void destroy() { - // terminate topology listener thread - cepTopologyEventReceiver.terminate(); - window = null; - } - - public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { - return memberTimeStampMap; - } ->>>>>>> ddf277b... Remove unnessary threads in messaging model }
