When a member got terminated while MB is not responding, auto-scaler wont spin 
another instance


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/05e1ddc2
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/05e1ddc2
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/05e1ddc2

Branch: refs/heads/4.0.0-grouping
Commit: 05e1ddc20a871b73b721487a13a2547cf9b8768d
Parents: 0de28d8
Author: Udara Liyanage <[email protected]>
Authored: Mon Aug 18 15:07:25 2014 +0530
Committer: Udara Liyanage <[email protected]>
Committed: Mon Aug 18 15:07:25 2014 +0530

----------------------------------------------------------------------
 extensions/cep/stratos-cep-extension/pom.xml    |   5 +
 .../extension/FaultHandlingWindowProcessor.java | 126 ++++++++++++++-----
 2 files changed, 102 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/05e1ddc2/extensions/cep/stratos-cep-extension/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/pom.xml 
b/extensions/cep/stratos-cep-extension/pom.xml
index 510325f..6c5442f 100644
--- a/extensions/cep/stratos-cep-extension/pom.xml
+++ b/extensions/cep/stratos-cep-extension/pom.xml
@@ -46,6 +46,11 @@
             <artifactId>siddhi-core</artifactId>
             <version>2.0.0-wso2v4</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.stratos</groupId>
+            <artifactId>org.apache.stratos.messaging</artifactId>
+            <version>4.0.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/stratos/blob/05e1ddc2/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 3cb9b9c..80174f4 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -19,6 +19,16 @@
 package org.apache.stratos.cep.extension;
 
 import org.apache.log4j.Logger;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.Constants;
 import org.wso2.siddhi.core.config.SiddhiContext;
 import org.wso2.siddhi.core.event.StreamEvent;
 import org.wso2.siddhi.core.event.in.InEvent;
@@ -45,16 +55,19 @@ import java.util.concurrent.TimeUnit;
 @SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements 
RunnableWindowProcessor {
 
-    private static final int MILI_TO_MINUTE = 1000;
-    private static final int TIME_OUT = 60;
-
+    private static final int TIME_OUT = 60 * 1000;
     static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
     private ScheduledExecutorService eventRemoverScheduler;
     private int subjectedAttrIndex;
     private ThreadBarrier threadBarrier;
     private long timeToKeep;
     private ISchedulerSiddhiQueue<StreamEvent> window;
-    private ConcurrentHashMap<String, InEvent> timeStampMap = new 
ConcurrentHashMap<String, InEvent>();
+    private ConcurrentHashMap<String, Long> memberTimeStampMap = new 
ConcurrentHashMap<String, Long>();
+    private ConcurrentHashMap<String, Member> memberIdMap = new 
ConcurrentHashMap<String, Member>();
+    EventPublisher healthStatPublisher = 
EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC);
+    Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+    Map<String, Object> memberFaultEventMessageMap = new HashMap<String, 
Object>();
+    private TopologyEventReceiver topologyEventReceiver;
     private String memberID;
 
     @Override
@@ -73,12 +86,11 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
     protected void addDataToMap(InEvent event) {
         if (memberID != null) {
             String id = (String)event.getData()[subjectedAttrIndex];
-            timeStampMap.put(id, event);
-            log.debug("Add member : " + id);
+            memberTimeStampMap.put(id, event.getTimeStamp());
+            log.debug("Event received from [member-id] " + id);
         }
         else {
-            System.out.println("Member ID null");
-            log.error("NULL Member ID");
+            log.error("NULL member ID in the event received");
         }
     }
 
@@ -96,32 +108,80 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
         }
     }
 
+    /*
+    *  Retrieve the current activated member list from the topology and put 
them into the
+    *  memberTimeStampMap if not already exists. This will allow the system to 
recover
+    *  from any inconsistent state caused by MB/CEP failures.
+    */
+    private void loadFromTopology(){
+        if (TopologyManager.getTopology().isInitialized()){
+            TopologyManager.acquireReadLock();
+            memberIdMap.clear();
+            long currentTimeStamp = System.currentTimeMillis();
+            Iterator<Service> servicesItr = 
TopologyManager.getTopology().getServices().iterator();
+            while(servicesItr.hasNext()){
+                Service service = servicesItr.next();
+                Iterator<Cluster> clusterItr = 
service.getClusters().iterator();
+                while(clusterItr.hasNext()){
+                    Cluster cluster = clusterItr.next();
+                    Iterator<Member> memberItr = 
cluster.getMembers().iterator();
+                    while(memberItr.hasNext()){
+                        Member member = memberItr.next();
+                        if (member.getStatus().equals(MemberStatus.Activated)){
+                            
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+                            memberIdMap.put(member.getMemberId(), member);
+                        }
+                    }
+                }
+            }
+            TopologyManager.releaseReadLock();
+        }
+        if (log.isDebugEnabled()){
+            log.debug("Member TimeStamp Map: " + memberTimeStampMap);
+            log.debug("Member ID Map: " + memberIdMap);
+        }
+    }
+
+    private void publishMemberFault(String memberID){
+        Member member = memberIdMap.get(memberID);
+        if (member == null){
+            log.error("Failed to publish MemberFault event. Member having 
[member-id] " + memberID + " does not exist in topology");
+            return;
+        }
+        MemberFaultEvent memberFaultEvent = new 
MemberFaultEvent(member.getClusterId(), member.getMemberId(), 
member.getPartitionId(), 0);
+        memberFaultEventMessageMap.put("message", memberFaultEvent);
+        Properties headers = new Properties();
+        headers.put(Constants.EVENT_CLASS_NAME, 
memberFaultEvent.getClass().getName());
+        healthStatPublisher.publish(MemberFaultEventMap, headers, true);
+
+        if (log.isDebugEnabled()){
+            log.debug("Published MemberFault event for [member-id] " + 
memberID);
+        }
+    }
+
+
     @Override
     public void run() {
         try {
-            while (true) {
-                threadBarrier.pass();
-                Iterator it = timeStampMap.entrySet().iterator();
-                
-                while ( it.hasNext() ) {
-                    Map.Entry pair = (Map.Entry)it.next();
-                    long currentTime = System.currentTimeMillis();
-                    InEvent event = (InEvent)pair.getValue();
-
-                    if ((currentTime - event.getTimeStamp()) / MILI_TO_MINUTE 
> TIME_OUT) {
-                        log.info("Member Inactive : " + pair.getKey() + " : " 
+ "for " + TIME_OUT + " seconds");
-                        it.remove();
-                        log.debug("Inactive member : " + pair.getKey() + " : " 
+ "for " + TIME_OUT + " seconds");
-                        nextProcessor.process(event);
-                    }
-                }
-                
-                // to avoid cpu spinning when there're no entries in map
-                try {
-                       Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
+            threadBarrier.pass();
+            loadFromTopology();
+            Iterator it = memberTimeStampMap.entrySet().iterator();
+
+            while ( it.hasNext() ) {
+                Map.Entry pair = (Map.Entry)it.next();
+                long currentTime = System.currentTimeMillis();
+                Long eventTimeStamp = (Long) pair.getValue();
+
+                if ((currentTime - eventTimeStamp) > TIME_OUT) {
+                    log.info("Faulty member detected [member-id] " + 
pair.getKey() + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + 
TIME_OUT + " milliseconds");
+                    it.remove();
+                    publishMemberFault((String) pair.getKey());
                 }
             }
+            if (log.isDebugEnabled()){
+                log.debug("Fault handling processor iteration completed with 
[time-stamp map length] " + memberTimeStampMap.size() + " [activated 
member-count] " + memberIdMap.size());
+            }
+            eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
         } catch (Throwable t) {
             log.error(t.getMessage(), t);
         }
@@ -157,6 +217,11 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
         } else {
             window = new SchedulerSiddhiQueue<StreamEvent>(this);
         }
+        
MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent",
 memberFaultEventMessageMap);
+        this.topologyEventReceiver = new TopologyEventReceiver();
+        Thread thread = new Thread(topologyEventReceiver);
+        thread.start();
+        log.info("WSO2 CEP topology receiver thread started");
 
         //Ordinary scheduling
         window.schedule();
@@ -168,6 +233,7 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
         eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
     }
 
+    @Override
     public void scheduleNow() {
         eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
     }
@@ -177,12 +243,14 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
         this.eventRemoverScheduler = scheduledExecutorService;
     }
 
+    @Override
     public void setThreadBarrier(ThreadBarrier threadBarrier) {
         this.threadBarrier = threadBarrier;
     }
 
     @Override
     public void destroy(){
+        this.topologyEventReceiver.terminate();
         window = null;
     }
 }

Reply via email to