Improved fault-handling logic

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ce18e63e
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ce18e63e
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ce18e63e

Branch: refs/heads/master
Commit: ce18e63ec0f384d4cee3b0d0b9ab596d7da87cad
Parents: 28c24dd
Author: Akila Perera <[email protected]>
Authored: Mon Sep 29 20:24:07 2014 +0530
Committer: Akila Perera <[email protected]>
Committed: Tue Sep 30 16:58:51 2014 +0530

----------------------------------------------------------------------
 .../cep/extension/CEPTopologyEventReceiver.java | 133 +++++
 .../extension/FaultHandlingWindowProcessor.java | 517 ++++++++++---------
 2 files changed, 413 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/ce18e63e/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
new file mode 100644
index 0000000..70c5679
--- /dev/null
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cep.extension;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import 
org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
+import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * CEP Topology Receiver for Fault Handling Window Processor.
+ */
+public class CEPTopologyEventReceiver implements Runnable {
+
+    private static final Log log = 
LogFactory.getLog(CEPTopologyEventReceiver.class);
+
+    private TopologyEventReceiver topologyEventReceiver;
+    private boolean terminated;
+    private FaultHandlingWindowProcessor faultHandler;
+
+    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) 
{
+        this.topologyEventReceiver = new TopologyEventReceiver();
+        this.faultHandler = faultHandler;
+        addEventListeners();
+    }
+
+    private void addEventListeners() {
+        // Load member time stamp map from the topology as a one time task
+        topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
+            private boolean initialized;
+
+            @Override
+            protected void onEvent(Event event) {
+                if (!initialized) {
+                    try {
+                        TopologyManager.acquireReadLock();
+                        if (log.isInfoEnabled()) {
+                            log.info("Complete topology event received to 
fault handling window processor.");
+                        }
+                        CompleteTopologyEvent completeTopologyEvent = 
(CompleteTopologyEvent) event;
+                        initialized = 
faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
+                    } catch (Exception e) {
+                        log.error("Error loading member time stamp map from 
complete topology event.", e);
+                    } finally {
+                        TopologyManager.releaseReadLock();
+                    }
+                }
+            }
+        });
+
+        // Remove member from the time stamp map when MemberTerminated event 
is received.
+        topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
+                
faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
+                if (log.isInfoEnabled()){
+                    log.info("Member [member id] " + 
memberTerminatedEvent.getMemberId() +
+                            " was removed from the time stamp map.");
+                }
+            }
+        });
+
+        // Add member to time stamp map whenever member is activated
+        topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
+
+                // do not put this member if we have already received a health 
event
+                
faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
 System.currentTimeMillis());
+                log.info("Member [member id] " + 
memberActivatedEvent.getMemberId() +
+                        " was added to the time stamp map.");
+            }
+        });
+    }
+
+    @Override
+    public void run() {
+        try {
+            Thread.sleep(15000);
+        } catch (InterruptedException ignore) {
+        }
+        Thread thread = new Thread(topologyEventReceiver);
+        thread.start();
+        if (log.isInfoEnabled()) {
+            log.info("CEP topology receiver thread started");
+        }
+
+        // Keep the thread live until terminated
+        while (!terminated) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        if (log.isInfoEnabled()) {
+            log.info("CEP topology receiver thread terminated");
+        }
+    }
+
+    /**
+     * Terminate CEP topology receiver thread.
+     */
+    public void terminate() {
+        topologyEventReceiver.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/ce18e63e/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 0205566..3414b8a 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -1,40 +1,39 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
+ * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.stratos.cep.extension;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import 
org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
 import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.Constants;
 import org.wso2.siddhi.core.config.SiddhiContext;
 import org.wso2.siddhi.core.event.StreamEvent;
 import org.wso2.siddhi.core.event.in.InEvent;
@@ -53,224 +52,268 @@ import 
org.wso2.siddhi.query.api.expression.constant.IntConstant;
 import org.wso2.siddhi.query.api.expression.constant.LongConstant;
 import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CEP window processor to handle faulty member instances. This window 
processor is responsible for
+ * publishing MemberFault event if health stats are not received within a 
given time window.
+ */
 @SiddhiExtension(namespace = "stratos", function = "faultHandling")
-public class FaultHandlingWindowProcessor extends WindowProcessor implements
-                                                                 
RunnableWindowProcessor {
-
-       private static final String HEALTH_STAT_MEMBER_FAULT_EVENT = 
"health/stat/MemberFaultEvent";
-       private static final int TIME_OUT = 60 * 1000;
-       static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
-       private ScheduledExecutorService eventRemoverScheduler;
-       private int subjectedAttrIndex;
-       private ThreadBarrier threadBarrier;
-       private long timeToKeep;
-       private ISchedulerSiddhiQueue<StreamEvent> window;
-       private final ConcurrentHashMap<String, Long> memberTimeStampMap =
-                                                                          new 
ConcurrentHashMap<String, Long>();
-       private final ConcurrentHashMap<String, Member> memberIdMap =
-                                                                     new 
ConcurrentHashMap<String, Member>();
-
-       EventPublisher healthStatPublisher = null;
-       Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
-       Map<String, Object> memberFaultEventMessageMap = new HashMap<String, 
Object>();
-       private TopologyEventReceiver topologyEventReceiver;
-       private String memberID;
-
-       @Override
-       protected void processEvent(InEvent event) {
-               addDataToMap(event);
-       }
-
-       @Override
-       protected void processEvent(InListEvent listEvent) {
-               System.out.println(listEvent);
-               for (int i = 0, size = listEvent.getActiveEvents(); i < size; 
i++) {
-                       addDataToMap((InEvent) listEvent.getEvent(i));
-               }
-       }
-
-       protected void addDataToMap(InEvent event) {
-               if (memberID != null) {
-                       String id = (String) 
event.getData()[subjectedAttrIndex];
-                       memberTimeStampMap.put(id, event.getTimeStamp());
-                       log.debug("Event received from [member-id] " + id);
-               } else {
-                       log.error("NULL member ID in the event received");
-               }
-       }
-
-       @Override
-       public Iterator<StreamEvent> iterator() {
-               return window.iterator();
-       }
-
-       @Override
-       public Iterator<StreamEvent> iterator(String predicate) {
-               if (siddhiContext.isDistributedProcessingEnabled()) {
-                       return ((SchedulerSiddhiQueueGrid<StreamEvent>) 
window).iterator(predicate);
-               } else {
-                       return window.iterator();
-               }
-       }
-
-       /*
-        * Retrieve the current activated member list from the topology and put 
them
-        * into the
-        * memberTimeStampMap if not already exists. This will allow the system 
to
-        * recover
-        * from any inconsistent state caused by MB/CEP failures.
-        */
-       private void loadFromTopology() {
-               if (TopologyManager.getTopology().isInitialized()) {
-                       TopologyManager.acquireReadLock();
-                       memberIdMap.clear();
-                       long currentTimeStamp = System.currentTimeMillis();
-                       Iterator<Service> servicesItr = 
TopologyManager.getTopology().getServices().iterator();
-                       while (servicesItr.hasNext()) {
-                               Service service = servicesItr.next();
-                               Iterator<Cluster> clusterItr = 
service.getClusters().iterator();
-                               while (clusterItr.hasNext()) {
-                                       Cluster cluster = clusterItr.next();
-                                       Iterator<Member> memberItr = 
cluster.getMembers().iterator();
-                                       while (memberItr.hasNext()) {
-                                               Member member = 
memberItr.next();
-                                               if 
(member.getStatus().equals(MemberStatus.Activated)) {
-                                                       
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
-                                                       
memberIdMap.put(member.getMemberId(), member);
-                                               }
-                                       }
-                               }
-                       }
-                       TopologyManager.releaseReadLock();
-               }
-               if (log.isDebugEnabled()) {
-                       log.debug("Member TimeStamp Map: " + 
memberTimeStampMap);
-                       log.debug("Member ID Map: " + memberIdMap);
-               }
-       }
-
-       private void publishMemberFault(String memberID) {
-               Member member = memberIdMap.get(memberID);
-               if (member == null) {
-                       log.error("Failed to publish MemberFault event. Member 
having [member-id] " + memberID +
-                                 " does not exist in topology");
-                       return;
-               }
-               MemberFaultEvent memberFaultEvent =
-                                                   new 
MemberFaultEvent(member.getClusterId(),
-                                                                        
member.getMemberId(),
-                                                                        
member.getPartitionId(), 0);
-               memberFaultEventMessageMap.put("message", memberFaultEvent);
-
-               healthStatPublisher = 
EventPublisherPool.getPublisher(HEALTH_STAT_MEMBER_FAULT_EVENT);
-               healthStatPublisher.publish(MemberFaultEventMap, true);
-
-               if (log.isDebugEnabled()) {
-                       log.debug("Published MemberFault event for [member-id] 
" + memberID);
-               }
-       }
-
-       @Override
-       public void run() {
-               try {
-                       threadBarrier.pass();
-                       loadFromTopology();
-                       Iterator it = memberTimeStampMap.entrySet().iterator();
-
-                       while (it.hasNext()) {
-                               Map.Entry pair = (Map.Entry) it.next();
-                               long currentTime = System.currentTimeMillis();
-                               Long eventTimeStamp = (Long) pair.getValue();
-
-                               if ((currentTime - eventTimeStamp) > TIME_OUT) {
-                                       log.info("Faulty member detected 
[member-id] " + pair.getKey() +
-                                                " with [last time-stamp] " + 
eventTimeStamp + " [time-out] " +
-                                                TIME_OUT + " milliseconds");
-                                       it.remove();
-                                       publishMemberFault((String) 
pair.getKey());
-                               }
-                       }
-                       if (log.isDebugEnabled()) {
-                               log.debug("Fault handling processor iteration 
completed with [time-stamp map length] " +
-                                         memberTimeStampMap.size() +
-                                         " [activated member-count] " +
-                                         memberIdMap.size());
-                       }
-                       eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
-               } catch (Throwable t) {
-                       log.error(t.getMessage(), t);
-               }
-       }
-
-       @Override
-       protected Object[] currentState() {
-               return new Object[] { window.currentState() };
-       }
-
-       @Override
-       protected void restoreState(Object[] data) {
-               window.restoreState(data);
-               window.restoreState((Object[]) data[0]);
-               window.reSchedule();
-       }
-
-       @Override
-       protected void init(Expression[] parameters, QueryPostProcessingElement 
nextProcessor,
-                           AbstractDefinition streamDefinition, String 
elementId, boolean async,
-                           SiddhiContext siddhiContext) {
-               if (parameters[0] instanceof IntConstant) {
-                       timeToKeep = ((IntConstant) parameters[0]).getValue();
-               } else {
-                       timeToKeep = ((LongConstant) parameters[0]).getValue();
-               }
-
-               memberID = ((Variable) parameters[1]).getAttributeName();
-
-               String subjectedAttr = ((Variable) 
parameters[1]).getAttributeName();
-               subjectedAttrIndex = 
streamDefinition.getAttributePosition(subjectedAttr);
-
-               if (this.siddhiContext.isDistributedProcessingEnabled()) {
-                       window =
-                                new 
SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext,
-                                                                          
this.async);
-               } else {
-                       window = new SchedulerSiddhiQueue<StreamEvent>(this);
-               }
-               
MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent",
-                                       memberFaultEventMessageMap);
-               this.topologyEventReceiver = new TopologyEventReceiver();
-               Thread thread = new Thread(topologyEventReceiver);
-               thread.start();
-               log.info("WSO2 CEP topology receiver thread started");
-
-               // Ordinary scheduling
-               window.schedule();
-
-       }
-
-       @Override
-       public void schedule() {
-               eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
-       }
-
-       @Override
-       public void scheduleNow() {
-               eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
-       }
-
-       @Override
-       public void setScheduledExecutorService(ScheduledExecutorService 
scheduledExecutorService) {
-               this.eventRemoverScheduler = scheduledExecutorService;
-       }
-
-       @Override
-       public void setThreadBarrier(ThreadBarrier threadBarrier) {
-               this.threadBarrier = threadBarrier;
-       }
-
-       @Override
-       public void destroy() {
-               this.topologyEventReceiver.terminate();
-               window = null;
-       }
+public class FaultHandlingWindowProcessor extends WindowProcessor implements 
RunnableWindowProcessor {
+
+    private static final int TIME_OUT = 60 * 1000;
+    static final Logger log = 
Logger.getLogger(FaultHandlingWindowProcessor.class);
+    private ScheduledExecutorService faultHandleScheduler;
+    private ThreadBarrier threadBarrier;
+    private long timeToKeep;
+    private ISchedulerSiddhiQueue<StreamEvent> window;
+    private EventPublisher healthStatPublisher = 
EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC);
+    private Map<String, Object> MemberFaultEventMap = new HashMap<String, 
Object>();
+    private Map<String, Object> memberFaultEventMessageMap = new 
HashMap<String, Object>();
+
+    // Map of member id's to their last received health event time stamp
+    private ConcurrentHashMap<String, Long> memberTimeStampMap = new 
ConcurrentHashMap<String, Long>();
+
+    // Event receiver to receive topology events published by cloud-controller
+    private CEPTopologyEventReceiver cepTopologyEventReceiver = new 
CEPTopologyEventReceiver(this);
+
+    // Stratos member id attribute index in stream execution plan
+    private int memberIdAttrIndex;
+
+    @Override
+    protected void processEvent(InEvent event) {
+        addDataToMap(event);
+    }
+
+    @Override
+    protected void processEvent(InListEvent listEvent) {
+        for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+            addDataToMap((InEvent) listEvent.getEvent(i));
+        }
+    }
+
+    /**
+     * Add new entry to time stamp map from the received event.
+     *
+     * @param event Event received by Siddhi.
+     */
+    protected void addDataToMap(InEvent event) {
+        String id = (String) event.getData()[memberIdAttrIndex];
+        if (StringUtils.isNotEmpty(id)) {
+            memberTimeStampMap.put(id, event.getTimeStamp());
+        } else {
+            log.warn("NULL member id found in the event received. Event 
rejected.");
+        }
+        if (log.isDebugEnabled()){
+            log.debug("Event received from [member-id] " + id + " [time-stamp] 
" + event.getTimeStamp());
+        }
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator() {
+        return window.iterator();
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator(String predicate) {
+        if (siddhiContext.isDistributedProcessingEnabled()) {
+            return ((SchedulerSiddhiQueueGrid<StreamEvent>) 
window).iterator(predicate);
+        } else {
+            return window.iterator();
+        }
+    }
+
+    /**
+     *  Retrieve the current activated members from the topology and 
initialize the time stamp map.
+     *  This will allow the system to recover from a restart
+     *
+     *  @param topology Topology model object
+     */
+    boolean loadTimeStampMapFromTopology(Topology topology){
+
+        long currentTimeStamp = System.currentTimeMillis();
+        if (topology == null || topology.getServices() == null){
+            return false;
+        }
+        // TODO make this efficient by adding APIs to messaging component
+        for (Service service : topology.getServices()) {
+            if (service.getClusters() != null) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.getMembers() != null) {
+                        for (Member member : cluster.getMembers()) {
+                            // we are checking faulty status only in 
previously activated members
+                            if (member != null && 
MemberStatus.Activated.equals(member.getStatus())) {
+                                // Initialize the member time stamp map from 
the topology at the beginning
+                                
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        if  (log.isInfoEnabled()){
+            log.info("Member time stamp map was successfully loaded from the 
topology.");
+        }
+        if (log.isDebugEnabled()){
+            log.debug("Member TimeStamp Map: " + memberTimeStampMap);
+        }
+        return true;
+    }
+
+    private Member getMemberFromId(String memberId){
+        if (StringUtils.isEmpty(memberId)){
+            return null;
+        }
+        if (TopologyManager.getTopology().isInitialized()){
+            TopologyManager.acquireReadLock();
+            if (TopologyManager.getTopology().getServices() == null){
+                return null;
+            }
+
+            // TODO make this efficient by adding APIs to messaging component
+            for (Service service : 
TopologyManager.getTopology().getServices()) {
+                if (service.getClusters() != null) {
+                    for (Cluster cluster : service.getClusters()) {
+                        if (cluster.getMembers() != null) {
+                            for (Member member : cluster.getMembers()){
+                                if (memberId.equals(member.getMemberId())){
+                                    return member;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            TopologyManager.releaseReadLock();
+        }
+        return null;
+    }
+
+    private void publishMemberFault(String memberId){
+        Member member = getMemberFromId(memberId);
+        if (member == null){
+            log.error("Failed to publish member fault event. Member having 
[member-id] " + memberId +
+                    " does not exist in topology");
+            return;
+        }
+        if (log.isInfoEnabled()){
+            log.info("Publishing member fault event for [member-id] " + 
memberId);
+        }
+        MemberFaultEvent memberFaultEvent = new 
MemberFaultEvent(member.getClusterId(), member.getMemberId(),
+                member.getPartitionId(), 0);
+        memberFaultEventMessageMap.put("message", memberFaultEvent);
+        Properties headers = new Properties();
+        headers.put(Constants.EVENT_CLASS_NAME, 
memberFaultEvent.getClass().getName());
+        healthStatPublisher.publish(MemberFaultEventMap, headers, true);
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            threadBarrier.pass();
+
+            for (Object o : memberTimeStampMap.entrySet()) {
+                Map.Entry pair = (Map.Entry) o;
+                long currentTime = System.currentTimeMillis();
+                Long eventTimeStamp = (Long) pair.getValue();
+
+                if ((currentTime - eventTimeStamp) > TIME_OUT) {
+                    log.info("Faulty member detected [member-id] " + 
pair.getKey() + " with [last time-stamp] " +
+                            eventTimeStamp + " [time-out] " + TIME_OUT + " 
milliseconds");
+                    publishMemberFault((String) pair.getKey());
+                }
+            }
+            if (log.isDebugEnabled()){
+                log.debug("Fault handling processor iteration completed with 
[time-stamp map length] " +
+                        memberTimeStampMap.size() + " [time-stamp map] " + 
memberTimeStampMap);
+            }
+        } catch (Throwable t) {
+            log.error(t.getMessage(), t);
+        } finally {
+            faultHandleScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    protected Object[] currentState() {
+        return new Object[]{window.currentState()};
+    }
+
+    @Override
+    protected void restoreState(Object[] data) {
+        window.restoreState(data);
+        window.restoreState((Object[]) data[0]);
+        window.reSchedule();
+    }
+
+    @Override
+    protected void init(Expression[] parameters, QueryPostProcessingElement 
nextProcessor,
+                        AbstractDefinition streamDefinition, String elementId, 
boolean async, SiddhiContext siddhiContext) {
+        if (parameters[0] instanceof IntConstant) {
+            timeToKeep = ((IntConstant) parameters[0]).getValue();
+        } else {
+            timeToKeep = ((LongConstant) parameters[0]).getValue();
+        }
+
+        String memberIdAttrName = ((Variable) 
parameters[1]).getAttributeName();
+        memberIdAttrIndex = 
streamDefinition.getAttributePosition(memberIdAttrName);
+
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, 
this, this.siddhiContext, this.async);
+        } else {
+            window = new SchedulerSiddhiQueue<StreamEvent>(this);
+        }
+        
MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent",
 memberFaultEventMessageMap);
+
+        Thread topologyTopicSubscriberThread = new 
Thread(cepTopologyEventReceiver);
+        topologyTopicSubscriberThread.start();
+        if (log.isDebugEnabled()) {
+            log.debug("CEP topology receiver thread started");
+        }
+
+        //Ordinary scheduling
+        window.schedule();
+
+        if (log.isDebugEnabled()){
+            log.debug("Fault handling window processor initialized with 
[timeToKeep] " + timeToKeep +
+                    ", [memberIdAttrName] " + memberIdAttrName + ", 
[memberIdAttrIndex] " + memberIdAttrIndex +
+                    ", [distributed-enabled] " + 
this.siddhiContext.isDistributedProcessingEnabled());
+        }
+    }
+
+    @Override
+    public void schedule() {
+        faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void scheduleNow() {
+        faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setScheduledExecutorService(ScheduledExecutorService 
scheduledExecutorService) {
+        this.faultHandleScheduler = scheduledExecutorService;
+    }
+
+    @Override
+    public void setThreadBarrier(ThreadBarrier threadBarrier) {
+        this.threadBarrier = threadBarrier;
+    }
+
+    @Override
+    public void destroy(){
+        // terminate topology listener thread
+        cepTopologyEventReceiver.terminate();
+        window = null;
+    }
+
+    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+        return memberTimeStampMap;
+    }
 }

Reply via email to