Github user pamod commented on a diff in the pull request:

    https://github.com/apache/stratos/pull/442#discussion_r64027238
  
    --- Diff: 
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
 ---
    @@ -199,867 +203,899 @@ public static void 
handleApplicationClustersCreated(String appUuid, List<Cluster
             }
     
             log.debug("Creating cluster port mappings: [application-id] " + 
appUuid);
    -        for(Cluster cluster : appClusters) {
    +        for (Cluster cluster : appClusters) {
                 String cartridgeUuid = cluster.getServiceUuid();
                 Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(cartridgeUuid);
    -            if(cartridge == null) {
    +            if (cartridge == null) {
                     throw new CloudControllerException("Cartridge not found: 
[cartridge-uuid] " + cartridgeUuid);
                 }
     
    -            for(PortMapping portMapping : cartridge.getPortMappings()) {
    +            for (PortMapping portMapping : cartridge.getPortMappings()) {
                     ClusterPortMapping clusterPortMapping = new 
ClusterPortMapping(appUuid,
                             cluster.getClusterId(), portMapping.getName(), 
portMapping.getProtocol(), portMapping.getPort(),
    -                        portMapping.getProxyPort());
    -                
CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
    -                log.debug("Cluster port mapping created: " + 
clusterPortMapping.toString());
    -            }
    -        }
    -
    -        // Persist cluster port mappings
    -        CloudControllerContext.getInstance().persist();
    -
    -        // Send application clusters created event
    -        TopologyEventPublisher.sendApplicationClustersCreated(appUuid, 
appClusters);
    -    }
    -
    -    public static void handleApplicationClustersRemoved(String appId,
    -                                                        
Set<ClusterDataHolder> clusterData) {
    -        TopologyManager.acquireWriteLock();
    -
    -        List<Cluster> removedClusters = new ArrayList<Cluster>();
    -        CloudControllerContext context = 
CloudControllerContext.getInstance();
    -        try {
    -            Topology topology = TopologyManager.getTopology();
    -
    -            if (clusterData != null) {
    -                // remove clusters from CC topology model and remove 
runtime information
    -                for (ClusterDataHolder aClusterData : clusterData) {
    -                    Service aService = 
topology.getService(aClusterData.getServiceUuid());
    -                    if (aService != null) {
    -                        
removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
    -                    } else {
    -                        log.warn("Service " + 
aClusterData.getServiceType() + " not found, " +
    -                                "unable to remove Cluster " + 
aClusterData.getClusterId());
    +                                portMapping.getProxyPort());
    +                        
CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
    +                        log.debug("Cluster port mapping created: " + 
clusterPortMapping.toString());
                         }
    -                    // remove runtime data
    -                    
context.removeClusterContext(aClusterData.getClusterId());
    -
    -                    log.info("Removed application [ " + appId + " ]'s 
Cluster " +
    -                            "[ " + aClusterData.getClusterId() + " ] from 
the topology");
                     }
    -                // persist runtime data changes
    -                CloudControllerContext.getInstance().persist();
    -            } else {
    -                log.info("No cluster data found for application " + appId 
+ " to remove");
    -            }
     
    -            TopologyManager.updateTopology(topology);
    +                // Persist cluster port mappings
    +                CloudControllerContext.getInstance().persist();
     
    -        } finally {
    -            TopologyManager.releaseWriteLock();
    -        }
    +                // Send application clusters created event
    +                
TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters);
    +            }
     
    -        // Remove cluster port mappings of application
    -        
CloudControllerContext.getInstance().removeClusterPortMappings(appId);
    -        CloudControllerContext.getInstance().persist();
    +        public static void handleApplicationClustersRemoved (String appId,
    +                Set < ClusterDataHolder > clusterData){
    +            TopologyManager.acquireWriteLock();
     
    -        TopologyEventPublisher.sendApplicationClustersRemoved(appId, 
clusterData);
    +            List<Cluster> removedClusters = new ArrayList<Cluster>();
    +            CloudControllerContext context = 
CloudControllerContext.getInstance();
    +            try {
    +                Topology topology = TopologyManager.getTopology();
    +
    +                if (clusterData != null) {
    +                    // remove clusters from CC topology model and remove 
runtime information
    +                    for (ClusterDataHolder aClusterData : clusterData) {
    +                        Service aService = 
topology.getService(aClusterData.getServiceUuid());
    +                        if (aService != null) {
    +                            
removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
    +                        } else {
    +                            log.warn("Service " + 
aClusterData.getServiceType() + " not found, " +
    +                                    "unable to remove Cluster " + 
aClusterData.getClusterId());
    +                        }
    +                        // remove runtime data
    +                        
context.removeClusterContext(aClusterData.getClusterId());
     
    -    }
    +                        log.info("Removed application [ " + appId + " ]'s 
Cluster " +
    +                                "[ " + aClusterData.getClusterId() + " ] 
from the topology");
    +                    }
    +                    // persist runtime data changes
    +                    CloudControllerContext.getInstance().persist();
    +                } else {
    +                    log.info("No cluster data found for application " + 
appId + " to remove");
    +                }
     
    -    public static void handleClusterReset(ClusterStatusClusterResetEvent 
event) {
    -        TopologyManager.acquireWriteLock();
    +                TopologyManager.updateTopology(topology);
     
    -        try {
    -            Topology topology = TopologyManager.getTopology();
    -            Service service = topology.getService(event.getServiceName());
    -            if (service == null) {
    -                log.error("Service " + event.getServiceName() +
    -                        " not found in Topology, unable to update the 
cluster status to Created");
    -                return;
    +            } finally {
    +                TopologyManager.releaseWriteLock();
                 }
     
    -            Cluster cluster = service.getCluster(event.getClusterId());
    -            if (cluster == null) {
    -                log.error("Cluster " + event.getClusterId() + " not found 
in Topology, unable to update " +
    -                        "status to Created");
    -                return;
    -            }
    +            // Remove cluster port mappings of application
    +            
CloudControllerContext.getInstance().removeClusterPortMappings(appId);
    +            CloudControllerContext.getInstance().persist();
     
    -            ClusterInstance context = 
cluster.getInstanceContexts(event.getInstanceId());
    -            if (context == null) {
    -                log.warn("Cluster Instance Context is not found for 
[cluster] " +
    -                        event.getClusterId() + " [instance-id] " +
    -                        event.getInstanceId());
    -                return;
    -            }
    -            ClusterStatus status = ClusterStatus.Created;
    -            if (context.isStateTransitionValid(status)) {
    -                context.setStatus(status);
    -                log.info("Cluster Created adding status started for" + 
cluster.getClusterId());
    -                TopologyManager.updateTopology(topology);
    -                //publishing data
    -                
TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), 
event.getServiceName(),
    -                        event.getClusterId(), event.getInstanceId());
    -            } else {
    -                log.warn(String.format("Cluster state transition is not 
valid: [cluster-id] %s " +
    -                                " [instance-id] %s [current-status] %s 
[status-requested] %s",
    -                        event.getClusterId(), event.getInstanceId(),
    -                        context.getStatus(), status));
    -            }
    +            TopologyEventPublisher.sendApplicationClustersRemoved(appId, 
clusterData);
     
    -        } finally {
    -            TopologyManager.releaseWriteLock();
             }
     
    +        public static void handleClusterReset 
(ClusterStatusClusterResetEvent event){
    +            TopologyManager.acquireWriteLock();
     
    -    }
    -
    -    public static void handleClusterInstanceCreated(String serviceUuid, 
String clusterId,
    -                                                    String alias, String 
instanceId, String partitionId,
    -                                                    String 
networkPartitionUuid) {
    -
    -        TopologyManager.acquireWriteLock();
    +            try {
    +                Topology topology = TopologyManager.getTopology();
    +                Service service = 
topology.getService(event.getServiceName());
    +                if (service == null) {
    +                    log.error("Service " + event.getServiceName() +
    +                            " not found in Topology, unable to update the 
cluster status to Created");
    +                    return;
    +                }
     
    -        try {
    -            Topology topology = TopologyManager.getTopology();
    -            Service service = topology.getService(serviceUuid);
    -            if (service == null) {
    -                log.error("Service " + serviceUuid +
    -                        " not found in Topology, unable to update the 
cluster status to Created");
    -                return;
    -            }
    +                Cluster cluster = service.getCluster(event.getClusterId());
    +                if (cluster == null) {
    +                    log.error("Cluster " + event.getClusterId() + " not 
found in Topology, unable to update " +
    +                            "status to Created");
    +                    return;
    +                }
     
    -            Cluster cluster = service.getCluster(clusterId);
    -            if (cluster == null) {
    -                log.error("Cluster " + clusterId + " not found in 
Topology, unable to update " +
    -                        "status to Created");
    -                return;
    -            }
    +                ClusterInstance context = 
cluster.getInstanceContexts(event.getInstanceId());
    +                if (context == null) {
    +                    log.warn("Cluster Instance Context is not found for 
[cluster] " +
    +                            event.getClusterId() + " [instance-id] " +
    +                            event.getInstanceId());
    +                    return;
    +                }
    +                ClusterStatus status = ClusterStatus.Created;
    +                if (context.isStateTransitionValid(status)) {
    +                    context.setStatus(status);
    +                    log.info("Cluster Created adding status started for" + 
cluster.getClusterId());
    +                    TopologyManager.updateTopology(topology);
    +                    //publishing data
    +                    
TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), 
event.getServiceName(),
    +                            event.getClusterId(), event.getInstanceId());
    +                } else {
    +                    log.warn(String.format("Cluster state transition is 
not valid: [cluster-id] %s " +
    +                                    " [instance-id] %s [current-status] %s 
[status-requested] %s",
    +                            event.getClusterId(), event.getInstanceId(),
    +                            context.getStatus(), status));
    +                }
     
    -            if (cluster.getInstanceContexts(instanceId) != null) {
    -                log.warn("The Instance context for the cluster already 
exists for [cluster] " +
    -                        clusterId + " [instance-id] " + instanceId);
    -                return;
    +            } finally {
    +                TopologyManager.releaseWriteLock();
                 }
     
    -            ClusterInstance clusterInstance = new ClusterInstance(alias, 
clusterId, instanceId);
    -            clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
    -            clusterInstance.setPartitionId(partitionId);
    -            cluster.addInstanceContext(instanceId, clusterInstance);
    -            TopologyManager.updateTopology(topology);
    -
    -            ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
    -                    new ClusterInstanceCreatedEvent(serviceUuid, clusterId,
    -                            clusterInstance);
    -            clusterInstanceCreatedEvent.setPartitionId(partitionId);
    -            
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
    -
    -        } finally {
    -            TopologyManager.releaseWriteLock();
    -        }
    -    }
    -
     
    -    public static void handleClusterRemoved(ClusterContext ctxt) {
    -        Topology topology = TopologyManager.getTopology();
    -        Service service = topology.getService(ctxt.getCartridgeUuid());
    -        String deploymentPolicy;
    -        if (service == null) {
    -            log.warn(String.format("Service %s does not exist",
    -                    ctxt.getCartridgeUuid()));
    -            return;
             }
     
    -        if (!service.clusterExists(ctxt.getClusterId())) {
    -            log.warn(String.format("Cluster %s does not exist for service 
%s",
    -                    ctxt.getClusterId(),
    -                    ctxt.getCartridgeUuid()));
    -            return;
    -        }
    +        public static void handleClusterInstanceCreated (String 
serviceUuid, String clusterId,
    +                String alias, String instanceId, String partitionId,
    +                String networkPartitionUuid){
     
    -        try {
                 TopologyManager.acquireWriteLock();
    -            Cluster cluster = service.removeCluster(ctxt.getClusterId());
    -            deploymentPolicy = cluster.getDeploymentPolicyUuid();
    -            TopologyManager.updateTopology(topology);
    -        } finally {
    -            TopologyManager.releaseWriteLock();
    -        }
    -        TopologyEventPublisher.sendClusterRemovedEvent(ctxt, 
deploymentPolicy);
    -    }
     
    -    /**
    -     * Add member object to the topology and publish member created event
    -     *
    -     * @param memberContext
    -     */
    -    public static void handleMemberCreatedEvent(MemberContext 
memberContext) {
    -        Topology topology = TopologyManager.getTopology();
    +            try {
    +                Topology topology = TopologyManager.getTopology();
    +                Service service = topology.getService(serviceUuid);
    +                if (service == null) {
    +                    log.error("Service " + serviceUuid +
    +                            " not found in Topology, unable to update the 
cluster status to Created");
    +                    return;
    +                }
     
    -        Service service = 
topology.getService(memberContext.getCartridgeType());
    -        String clusterId = memberContext.getClusterId();
    -        Cluster cluster = service.getCluster(clusterId);
    -        String memberId = memberContext.getMemberId();
    -        String clusterInstanceId = memberContext.getClusterInstanceId();
    -        String networkPartitionId = memberContext.getNetworkPartitionId();
    -        String partitionId = memberContext.getPartition().getUuid();
    -        String lbClusterId = memberContext.getLbClusterId();
    -        long initTime = memberContext.getInitTime();
    -
    -        if (cluster.memberExists(memberId)) {
    -            log.warn(String.format("Member %s already exists", memberId));
    -            return;
    -        }
    +                Cluster cluster = service.getCluster(clusterId);
    +                if (cluster == null) {
    +                    log.error("Cluster " + clusterId + " not found in 
Topology, unable to update " +
    +                            "status to Created");
    +                    return;
    +                }
     
    -        try {
    -            TopologyManager.acquireWriteLock();
    -            Member member = new Member(service.getServiceName(), 
clusterId, memberId, clusterInstanceId,
    -                    networkPartitionId, partitionId, 
memberContext.getLoadBalancingIPType(), initTime);
    -            member.setStatus(MemberStatus.Created);
    -            member.setLbClusterId(lbClusterId);
    -            
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
    -            cluster.addMember(member);
    -            TopologyManager.updateTopology(topology);
    -        } finally {
    -            TopologyManager.releaseWriteLock();
    -        }
    +                if (cluster.getInstanceContexts(instanceId) != null) {
    +                    log.warn("The Instance context for the cluster already 
exists for [cluster] " +
    +                            clusterId + " [instance-id] " + instanceId);
    +                    return;
    +                }
     
    -        TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
    -    }
    +                ClusterInstance clusterInstance = new 
ClusterInstance(alias, clusterId, instanceId);
    +                
clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
    +                clusterInstance.setPartitionId(partitionId);
    +                cluster.addInstanceContext(instanceId, clusterInstance);
    +                TopologyManager.updateTopology(topology);
     
    -    /**
    -     * Update member status to initialized and publish member initialized 
event
    -     *
    -     * @param memberContext
    -     */
    -    public static void handleMemberInitializedEvent(MemberContext 
memberContext) {
    -        Topology topology = TopologyManager.getTopology();
    -        Service service = 
topology.getService(memberContext.getCartridgeType());
    -        if (service == null) {
    -            log.warn(String.format("Service %s does not exist",
    -                    memberContext.getCartridgeType()));
    -            return;
    -        }
    -        if (!service.clusterExists(memberContext.getClusterId())) {
    -            log.warn(String.format("Cluster %s does not exist in service 
%s",
    -                    memberContext.getClusterId(),
    -                    memberContext.getCartridgeType()));
    -            return;
    -        }
    +                ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
    +                        new ClusterInstanceCreatedEvent(serviceUuid, 
clusterId,
    +                                clusterInstance);
    +                clusterInstanceCreatedEvent.setPartitionId(partitionId);
    +                
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
     
    -        Member member = service.getCluster(memberContext.getClusterId()).
    -                getMember(memberContext.getMemberId());
    -        if (member == null) {
    -            log.warn(String.format("Member %s does not exist",
    -                    memberContext.getMemberId()));
    -            return;
    +            } finally {
    +                TopologyManager.releaseWriteLock();
    +            }
             }
     
    -        try {
    -            TopologyManager.acquireWriteLock();
     
    -            // Set ip addresses
    -            
member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
    -            if (memberContext.getPrivateIPs() != null) {
    -                
member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
    -            }
    -            member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
    -            if (memberContext.getPublicIPs() != null) {
    -                
member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
    +        public static void handleClusterRemoved (ClusterContext ctxt){
    +            Topology topology = TopologyManager.getTopology();
    +            Service service = topology.getService(ctxt.getCartridgeUuid());
    +            String deploymentPolicy;
    +            if (service == null) {
    +                log.warn(String.format("Service %s does not exist",
    +                        ctxt.getCartridgeUuid()));
    +                return;
                 }
     
    -            // try update lifecycle state
    -            if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
    -                log.error("Invalid state transition from " + 
member.getStatus() + " to " +
    -                        MemberStatus.Initialized);
    +            if (!service.clusterExists(ctxt.getClusterId())) {
    +                log.warn(String.format("Cluster %s does not exist for 
service %s",
    +                        ctxt.getClusterId(),
    +                        ctxt.getCartridgeUuid()));
                     return;
    -            } else {
    -                member.setStatus(MemberStatus.Initialized);
    -                log.info("Member status updated to initialized");
    +            }
     
    +            try {
    +                TopologyManager.acquireWriteLock();
    +                Cluster cluster = 
service.removeCluster(ctxt.getClusterId());
    +                deploymentPolicy = cluster.getDeploymentPolicyUuid();
                     TopologyManager.updateTopology(topology);
    -
    -                
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
    -                //publishing data
    -                BAMUsageDataPublisher.publish(memberContext.getMemberId(),
    -                        memberContext.getPartition().getUuid(),
    -                        memberContext.getNetworkPartitionId(),
    -                        memberContext.getClusterId(),
    -                        memberContext.getCartridgeType(),
    -                        MemberStatus.Initialized.toString(),
    -                        null);
    +            } finally {
    +                TopologyManager.releaseWriteLock();
                 }
    -        } finally {
    -            TopologyManager.releaseWriteLock();
    +            TopologyEventPublisher.sendClusterRemovedEvent(ctxt, 
deploymentPolicy);
             }
    -    }
     
    -    private static int findKubernetesServicePort(String clusterId, 
List<KubernetesService> kubernetesServices,
    -                                                 PortMapping portMapping) {
    -        for (KubernetesService kubernetesService : kubernetesServices) {
    -            if 
(kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
    -                return kubernetesService.getPort();
    +        /**
    +         * Add member object to the topology and publish member created 
event
    +         *
    +         * @param memberContext
    +         */
    +        public static void handleMemberCreatedEvent (MemberContext 
memberContext){
    +            Topology topology = TopologyManager.getTopology();
    +
    +            Service service = 
topology.getService(memberContext.getCartridgeType());
    +            String clusterId = memberContext.getClusterId();
    +            Cluster cluster = service.getCluster(clusterId);
    +            String memberId = memberContext.getMemberId();
    +            String clusterInstanceId = 
memberContext.getClusterInstanceId();
    +            String networkPartitionId = 
memberContext.getNetworkPartitionId();
    +            String partitionId = memberContext.getPartition().getUuid();
    +            String lbClusterId = memberContext.getLbClusterId();
    +            long initTime = memberContext.getInitTime();
    +            String autoscalingReason = 
memberContext.getProperties().getProperty(
    +                    StratosConstants.SCALING_REASON).getValue();
    +            long scalingTime = 
Long.parseLong(memberContext.getProperties().getProperty(
    +                    StratosConstants.SCALING_TIME).getValue());
    +
    +
    +            if (cluster.memberExists(memberId)) {
    +                log.warn(String.format("Member %s already exists", 
memberId));
    +                return;
                 }
    +
    +            try {
    +                TopologyManager.acquireWriteLock();
    +                Member member = new Member(service.getServiceName(), 
clusterId, memberId, clusterInstanceId,
    +                        networkPartitionId, partitionId, 
memberContext.getLoadBalancingIPType(), initTime);
    +                member.setStatus(MemberStatus.Created);
    +                member.setLbClusterId(lbClusterId);
    +                
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
    +                cluster.addMember(member);
    +                TopologyManager.updateTopology(topology);
    +                //member created time
    +                Long timeStamp = System.currentTimeMillis();
    +                //publishing to BAM
    +                BAMUsageDataPublisher
    +                        .publish(memberContext.getMemberId(),
    +                                memberContext.getPartition().getId(),
    +                                memberContext.getNetworkPartitionId(),
    +                                memberContext.getClusterId(),
    +                                memberContext.getClusterInstanceId(),
    +                                memberContext.getCartridgeType(),
    +                                MemberStatus.Created.toString(),
    +                                timeStamp, autoscalingReason,
    +                                scalingTime, null);
    +            } finally {
    +                TopologyManager.releaseWriteLock();
    +            }
    +
    +            TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
             }
    -        throw new RuntimeException("Kubernetes service port not found: 
[cluster-id] " + clusterId + " [port] "
    -                + portMapping.getPort());
    -    }
     
    -    public static void handleMemberStarted(InstanceStartedEvent 
instanceStartedEvent) {
    -        try {
    +        /**
    +         * Update member status to initialized and publish member 
initialized event
    +         *
    +         * @param memberContext
    +         */
    +        public static void handleMemberInitializedEvent (MemberContext 
memberContext){
                 Topology topology = TopologyManager.getTopology();
    -            Service service = 
topology.getService(instanceStartedEvent.getServiceName());
    +            Service service = 
topology.getService(memberContext.getCartridgeType());
                 if (service == null) {
                     log.warn(String.format("Service %s does not exist",
    -                        instanceStartedEvent.getServiceName()));
    +                        memberContext.getCartridgeType()));
                     return;
                 }
    -            if 
(!service.clusterExists(instanceStartedEvent.getClusterId())) {
    +            if (!service.clusterExists(memberContext.getClusterId())) {
                     log.warn(String.format("Cluster %s does not exist in 
service %s",
    -                        instanceStartedEvent.getClusterId(),
    -                        instanceStartedEvent.getServiceName()));
    +                        memberContext.getClusterId(),
    +                        memberContext.getCartridgeType()));
                     return;
                 }
     
    -            Cluster cluster = 
service.getCluster(instanceStartedEvent.getClusterId());
    -            Member member = 
cluster.getMember(instanceStartedEvent.getMemberId());
    +            Member member = 
service.getCluster(memberContext.getClusterId()).
    +                    getMember(memberContext.getMemberId());
                 if (member == null) {
                     log.warn(String.format("Member %s does not exist",
    -                        instanceStartedEvent.getMemberId()));
    +                        memberContext.getMemberId()));
                     return;
                 }
     
                 try {
                     TopologyManager.acquireWriteLock();
    +
    +                // Set ip addresses
    +                
member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
    +                if (memberContext.getPrivateIPs() != null) {
    +                    
member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
    +                }
    +                
member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
    +                if (memberContext.getPublicIPs() != null) {
    +                    
member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
    +                }
    +
                     // try update lifecycle state
    -                if (!member.isStateTransitionValid(MemberStatus.Starting)) 
{
    -                    log.error("Invalid State Transition from " + 
member.getStatus() + " to " +
    -                            MemberStatus.Starting);
    +                if 
(!member.isStateTransitionValid(MemberStatus.Initialized)) {
    +                    log.error("Invalid state transition from " + 
member.getStatus() + " to " +
    +                            MemberStatus.Initialized);
                         return;
                     } else {
    -                    member.setStatus(MemberStatus.Starting);
    -                    log.info("member started event adding status started");
    +                    member.setStatus(MemberStatus.Initialized);
    +                    log.info("Member status updated to initialized");
     
                         TopologyManager.updateTopology(topology);
    -                    //memberStartedEvent.
    -                    
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
    +                    //member intialized time
    +                    Long timeStamp = System.currentTimeMillis();
    +                    
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
                         //publishing data
    -                    
BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
    -                            instanceStartedEvent.getPartitionId(),
    -                            instanceStartedEvent.getNetworkPartitionId(),
    -                            instanceStartedEvent.getClusterId(),
    -                            instanceStartedEvent.getServiceName(),
    -                            MemberStatus.Starting.toString(),
    -                            null);
    +                    
BAMUsageDataPublisher.publish(memberContext.getMemberId(),
    +                            memberContext.getPartition().getUuid(),
    +                            memberContext.getNetworkPartitionId(),
    +                            memberContext.getClusterInstanceId(),
    +                            memberContext.getClusterId(),
    +                            memberContext.getCartridgeType(),
    +                            MemberStatus.Initialized.toString(),
    +                            timeStamp, null, null, null);
                     }
                 } finally {
                     TopologyManager.releaseWriteLock();
                 }
    -        } catch (Exception e) {
    -            String message = String.format("Could not handle member 
started event: [application-id] %s " +
    -                            "[service-name] %s [member-id] %s", 
instanceStartedEvent.getApplicationId(),
    -                    instanceStartedEvent.getServiceName(), 
instanceStartedEvent.getMemberId());
    -            log.warn(message, e);
    -        }
    -    }
    -
    -    public static void handleMemberActivated(InstanceActivatedEvent 
instanceActivatedEvent) {
    -        Topology topology = TopologyManager.getTopology();
    -        Service service = 
topology.getService(instanceActivatedEvent.getServiceName());
    -        if (service == null) {
    -            log.warn(String.format("Service %s does not exist",
    -                    instanceActivatedEvent.getServiceName()));
    -            return;
             }
     
    -        Cluster cluster = 
service.getCluster(instanceActivatedEvent.getClusterId());
    -        if (cluster == null) {
    -            log.warn(String.format("Cluster %s does not exist",
    -                    instanceActivatedEvent.getClusterId()));
    -            return;
    +        private static int findKubernetesServicePort (String clusterId, 
List < KubernetesService > kubernetesServices,
    +                PortMapping portMapping){
    +            for (KubernetesService kubernetesService : kubernetesServices) 
{
    +                if 
(kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
    +                    return kubernetesService.getPort();
    +                }
    +            }
    +            throw new RuntimeException("Kubernetes service port not found: 
[cluster-id] " + clusterId + " [port] "
    +                    + portMapping.getPort());
             }
     
    -        Member member = 
cluster.getMember(instanceActivatedEvent.getMemberId());
    -        if (member == null) {
    -            log.warn(String.format("Member %s does not exist",
    -                    instanceActivatedEvent.getMemberId()));
    -            return;
    -        }
    +        public static void handleMemberStarted (InstanceStartedEvent 
instanceStartedEvent){
    +            try {
    +                Topology topology = TopologyManager.getTopology();
    +                Service service = 
topology.getService(instanceStartedEvent.getServiceName());
    +                if (service == null) {
    +                    log.warn(String.format("Service %s does not exist",
    +                            instanceStartedEvent.getServiceName()));
    +                    return;
    +                }
    +                if 
(!service.clusterExists(instanceStartedEvent.getClusterId())) {
    +                    log.warn(String.format("Cluster %s does not exist in 
service %s",
    +                            instanceStartedEvent.getClusterId(),
    +                            instanceStartedEvent.getServiceName()));
    +                    return;
    +                }
     
    -        MemberActivatedEvent memberActivatedEvent = new 
MemberActivatedEvent(
    -                instanceActivatedEvent.getServiceName(),
    -                instanceActivatedEvent.getClusterId(),
    -                instanceActivatedEvent.getClusterInstanceId(),
    -                instanceActivatedEvent.getMemberId(),
    -                instanceActivatedEvent.getNetworkPartitionId(),
    -                instanceActivatedEvent.getPartitionId());
    -
    -        // grouping - set grouid
    -        //TODO
    -        memberActivatedEvent.setApplicationId(null);
    -        try {
    -            TopologyManager.acquireWriteLock();
    -            // try update lifecycle state
    -            if (!member.isStateTransitionValid(MemberStatus.Active)) {
    -                log.error("Invalid state transition from [" + 
member.getStatus() + "] to [" + MemberStatus.Active + "]");
    -                return;
    -            } else {
    -                member.setStatus(MemberStatus.Active);
    +                Cluster cluster = 
service.getCluster(instanceStartedEvent.getClusterId());
    +                Member member = 
cluster.getMember(instanceStartedEvent.getMemberId());
    +                if (member == null) {
    +                    log.warn(String.format("Member %s does not exist",
    +                            instanceStartedEvent.getMemberId()));
    +                    return;
    +                }
     
    -                // Set member ports
                     try {
    -                    Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
    -                    if (cartridge == null) {
    -                        throw new 
RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
    -                                service.getServiceName()));
    -                    }
    -
    -                    Port port;
    -                    int portValue;
    -                    List<PortMapping> portMappings = 
Arrays.asList(cartridge.getPortMappings());
    -                    String clusterId = cluster.getClusterId();
    -                    ClusterContext clusterContext = 
CloudControllerContext.getInstance().getClusterContext(clusterId);
    -                    List<KubernetesService> kubernetesServices = 
clusterContext.getKubernetesServices();
    +                    TopologyManager.acquireWriteLock();
    +                    // try update lifecycle state
    +                    if 
(!member.isStateTransitionValid(MemberStatus.Starting)) {
    +                        log.error("Invalid State Transition from " + 
member.getStatus() + " to " +
    +                                MemberStatus.Starting);
    +                        return;
    +                    } else {
    +                        member.setStatus(MemberStatus.Starting);
    +                        log.info("member started event adding status 
started");
     
    -                    for (PortMapping portMapping : portMappings) {
    -                        if (kubernetesServices != null) {
    -                            portValue = 
findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
    -                        } else {
    -                            portValue = portMapping.getPort();
    -                        }
    -                        port = new Port(portMapping.getProtocol(), 
portValue, portMapping.getProxyPort());
    -                        member.addPort(port);
    -                        memberActivatedEvent.addPort(port);
    +                        TopologyManager.updateTopology(topology);
    +                        //member started time
    +                        Long timeStamp = System.currentTimeMillis();
    +                        //memberStartedEvent.
    +                        
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
    +                        //publishing data
    +                        
BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
    +                                instanceStartedEvent.getPartitionId(),
    +                                
instanceStartedEvent.getNetworkPartitionId(),
    +                                
instanceStartedEvent.getClusterInstanceId(),
    +                                instanceStartedEvent.getClusterId(),
    +                                instanceStartedEvent.getServiceName(),
    +                                MemberStatus.Starting.toString(),
    +                                timeStamp, null, null, null);
                         }
    -                } catch (Exception e) {
    -                    String message = String.format("Could not add member 
ports: [service-name] %s [member-id] %s",
    -                            memberActivatedEvent.getServiceName(), 
memberActivatedEvent.getMemberId());
    -                    log.error(message, e);
    +                } finally {
    +                    TopologyManager.releaseWriteLock();
                     }
    -
    -                // Set member ip addresses
    -                
memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
    -                
memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
    -                
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
    -                
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
    -                TopologyManager.updateTopology(topology);
    -
    -                // Publish member activated event
    -                
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
    -
    -                // Publish statistics data
    -                
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
    -                        memberActivatedEvent.getPartitionId(),
    -                        memberActivatedEvent.getNetworkPartitionId(),
    -                        memberActivatedEvent.getClusterId(),
    -                        memberActivatedEvent.getServiceName(),
    -                        MemberStatus.Active.toString(),
    -                        null);
    +            } catch (Exception e) {
    +                String message = String.format("Could not handle member 
started event: [application-id] %s " +
    +                                "[service-name] %s [member-id] %s", 
instanceStartedEvent.getApplicationId(),
    +                        instanceStartedEvent.getServiceName(), 
instanceStartedEvent.getMemberId());
    +                log.warn(message, e);
                 }
    -        } finally {
    -            TopologyManager.releaseWriteLock();
    -        }
    -    }
    -
    -    public static void 
handleMemberReadyToShutdown(InstanceReadyToShutdownEvent 
instanceReadyToShutdownEvent)
    -            throws InvalidMemberException, InvalidCartridgeTypeException {
    -        Topology topology = TopologyManager.getTopology();
    -        Service service = 
topology.getService(instanceReadyToShutdownEvent.getServiceName());
    -        //update the status of the member
    -        if (service == null) {
    -            log.warn(String.format("Service %s does not exist",
    -                    instanceReadyToShutdownEvent.getServiceName()));
    -            return;
    -        }
    -
    -        Cluster cluster = 
service.getCluster(instanceReadyToShutdownEvent.getClusterId());
    -        if (cluster == null) {
    -            log.warn(String.format("Cluster %s does not exist",
    -                    instanceReadyToShutdownEvent.getClusterId()));
    -            return;
             }
     
    +        public static void handleMemberActivated (InstanceActivatedEvent 
instanceActivatedEvent){
    +            Topology topology = TopologyManager.getTopology();
    +            Service service = 
topology.getService(instanceActivatedEvent.getServiceName());
    +            if (service == null) {
    +                log.warn(String.format("Service %s does not exist",
    +                        instanceActivatedEvent.getServiceName()));
    +                return;
    +            }
     
    -        Member member = 
cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
    -        if (member == null) {
    -            log.warn(String.format("Member %s does not exist",
    -                    instanceReadyToShutdownEvent.getMemberId()));
    -            return;
    -        }
    -        MemberReadyToShutdownEvent memberReadyToShutdownEvent = new 
MemberReadyToShutdownEvent(
    -                instanceReadyToShutdownEvent.getServiceName(),
    -                instanceReadyToShutdownEvent.getClusterId(),
    -                instanceReadyToShutdownEvent.getClusterInstanceId(),
    -                instanceReadyToShutdownEvent.getMemberId(),
    -                instanceReadyToShutdownEvent.getNetworkPartitionId(),
    -                instanceReadyToShutdownEvent.getPartitionId());
    -        try {
    -            TopologyManager.acquireWriteLock();
    +            Cluster cluster = 
service.getCluster(instanceActivatedEvent.getClusterId());
    +            if (cluster == null) {
    +                log.warn(String.format("Cluster %s does not exist",
    +                        instanceActivatedEvent.getClusterId()));
    +                return;
    +            }
     
    -            if 
(!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
    -                log.error("Invalid State Transition from " + 
member.getStatus() + " to " +
    -                        MemberStatus.ReadyToShutDown);
    +            Member member = 
cluster.getMember(instanceActivatedEvent.getMemberId());
    +            if (member == null) {
    +                log.warn(String.format("Member %s does not exist",
    +                        instanceActivatedEvent.getMemberId()));
                     return;
                 }
    -            member.setStatus(MemberStatus.ReadyToShutDown);
    -            log.info("Member Ready to shut down event adding status 
started");
     
    -            TopologyManager.updateTopology(topology);
    -        } finally {
    -            TopologyManager.releaseWriteLock();
    -        }
    -        
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
    -        //publishing data
    -        
BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
    -                instanceReadyToShutdownEvent.getPartitionId(),
    -                instanceReadyToShutdownEvent.getNetworkPartitionId(),
    -                instanceReadyToShutdownEvent.getClusterId(),
    -                instanceReadyToShutdownEvent.getServiceName(),
    -                MemberStatus.ReadyToShutDown.toString(),
    -                null);
    -        //termination of particular instance will be handled by autoscaler
    -    }
    +            MemberActivatedEvent memberActivatedEvent = new 
MemberActivatedEvent(
    +                    instanceActivatedEvent.getServiceName(),
    +                    instanceActivatedEvent.getClusterId(),
    +                    instanceActivatedEvent.getClusterInstanceId(),
    +                    instanceActivatedEvent.getMemberId(),
    +                    instanceActivatedEvent.getNetworkPartitionId(),
    +                    instanceActivatedEvent.getPartitionId());
    +
    +            // grouping - set grouid
    +            //TODO
    +            memberActivatedEvent.setApplicationId(null);
    +            try {
    +                TopologyManager.acquireWriteLock();
    +                // try update lifecycle state
    +                if (!member.isStateTransitionValid(MemberStatus.Active)) {
    +                    log.error("Invalid state transition from [" + 
member.getStatus() + "] to [" +
    +                            MemberStatus.Active + "]");
    +                    return;
    +                } else {
    +                    member.setStatus(MemberStatus.Active);
     
    -    public static void 
handleMemberMaintenance(InstanceMaintenanceModeEvent 
instanceMaintenanceModeEvent)
    -            throws InvalidMemberException, InvalidCartridgeTypeException {
    -        Topology topology = TopologyManager.getTopology();
    -        Service service = 
topology.getService(instanceMaintenanceModeEvent.getServiceName());
    -        //update the status of the member
    -        if (service == null) {
    -            log.warn(String.format("Service %s does not exist",
    -                    instanceMaintenanceModeEvent.getServiceName()));
    -            return;
    -        }
    +                    // Set member ports
    +                    try {
    +                        Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
    +                        if (cartridge == null) {
    +                            throw new 
RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
    +                                    service.getServiceName()));
    +                        }
     
    -        Cluster cluster = 
service.getCluster(instanceMaintenanceModeEvent.getClusterId());
    -        if (cluster == null) {
    -            log.warn(String.format("Cluster %s does not exist",
    -                    instanceMaintenanceModeEvent.getClusterId()));
    -            return;
    -        }
    +                        Port port;
    +                        int portValue;
    +                        List<PortMapping> portMappings = 
Arrays.asList(cartridge.getPortMappings());
    +                        String clusterId = cluster.getClusterId();
    +                        ClusterContext clusterContext = 
CloudControllerContext.getInstance().getClusterContext(clusterId);
    +                        List<KubernetesService> kubernetesServices = 
clusterContext.getKubernetesServices();
     
    -        Member member = 
cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
    -        if (member == null) {
    -            log.warn(String.format("Member %s does not exist",
    -                    instanceMaintenanceModeEvent.getMemberId()));
    -            return;
    +                        for (PortMapping portMapping : portMappings) {
    +                            if (kubernetesServices != null) {
    +                                portValue = 
findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
    +                            } else {
    +                                portValue = portMapping.getPort();
    +                            }
    +                            port = new Port(portMapping.getProtocol(), 
portValue, portMapping.getProxyPort());
    +                            member.addPort(port);
    +                            memberActivatedEvent.addPort(port);
    +                        }
    +                    } catch (Exception e) {
    --- End diff --
    
    Is there any reason for capturing a generic exception here ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to