When a member got terminated while MB is not responding, auto-scaler wont spin another instance
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/05e1ddc2 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/05e1ddc2 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/05e1ddc2 Branch: refs/heads/4.0.0-grouping Commit: 05e1ddc20a871b73b721487a13a2547cf9b8768d Parents: 0de28d8 Author: Udara Liyanage <[email protected]> Authored: Mon Aug 18 15:07:25 2014 +0530 Committer: Udara Liyanage <[email protected]> Committed: Mon Aug 18 15:07:25 2014 +0530 ---------------------------------------------------------------------- extensions/cep/stratos-cep-extension/pom.xml | 5 + .../extension/FaultHandlingWindowProcessor.java | 126 ++++++++++++++----- 2 files changed, 102 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/05e1ddc2/extensions/cep/stratos-cep-extension/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/pom.xml b/extensions/cep/stratos-cep-extension/pom.xml index 510325f..6c5442f 100644 --- a/extensions/cep/stratos-cep-extension/pom.xml +++ b/extensions/cep/stratos-cep-extension/pom.xml @@ -46,6 +46,11 @@ <artifactId>siddhi-core</artifactId> <version>2.0.0-wso2v4</version> </dependency> + <dependency> + <groupId>org.apache.stratos</groupId> + <artifactId>org.apache.stratos.messaging</artifactId> + <version>4.0.0-SNAPSHOT</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/stratos/blob/05e1ddc2/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 3cb9b9c..80174f4 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -19,6 +19,16 @@ package org.apache.stratos.cep.extension; import org.apache.log4j.Logger; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.MemberStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.Constants; import org.wso2.siddhi.core.config.SiddhiContext; import org.wso2.siddhi.core.event.StreamEvent; import org.wso2.siddhi.core.event.in.InEvent; @@ -45,16 +55,19 @@ import java.util.concurrent.TimeUnit; @SiddhiExtension(namespace = "stratos", function = "faultHandling") public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - private static final int MILI_TO_MINUTE = 1000; - private static final int TIME_OUT = 60; - + private static final int TIME_OUT = 60 * 1000; static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); private ScheduledExecutorService eventRemoverScheduler; private int subjectedAttrIndex; private ThreadBarrier threadBarrier; private long timeToKeep; private ISchedulerSiddhiQueue<StreamEvent> window; - private ConcurrentHashMap<String, InEvent> timeStampMap = new ConcurrentHashMap<String, InEvent>(); + private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + private ConcurrentHashMap<String, Member> memberIdMap = new ConcurrentHashMap<String, Member>(); + EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC); + Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + private TopologyEventReceiver topologyEventReceiver; private String memberID; @Override @@ -73,12 +86,11 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run protected void addDataToMap(InEvent event) { if (memberID != null) { String id = (String)event.getData()[subjectedAttrIndex]; - timeStampMap.put(id, event); - log.debug("Add member : " + id); + memberTimeStampMap.put(id, event.getTimeStamp()); + log.debug("Event received from [member-id] " + id); } else { - System.out.println("Member ID null"); - log.error("NULL Member ID"); + log.error("NULL member ID in the event received"); } } @@ -96,32 +108,80 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } + /* + * Retrieve the current activated member list from the topology and put them into the + * memberTimeStampMap if not already exists. This will allow the system to recover + * from any inconsistent state caused by MB/CEP failures. + */ + private void loadFromTopology(){ + if (TopologyManager.getTopology().isInitialized()){ + TopologyManager.acquireReadLock(); + memberIdMap.clear(); + long currentTimeStamp = System.currentTimeMillis(); + Iterator<Service> servicesItr = TopologyManager.getTopology().getServices().iterator(); + while(servicesItr.hasNext()){ + Service service = servicesItr.next(); + Iterator<Cluster> clusterItr = service.getClusters().iterator(); + while(clusterItr.hasNext()){ + Cluster cluster = clusterItr.next(); + Iterator<Member> memberItr = cluster.getMembers().iterator(); + while(memberItr.hasNext()){ + Member member = memberItr.next(); + if (member.getStatus().equals(MemberStatus.Activated)){ + memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); + memberIdMap.put(member.getMemberId(), member); + } + } + } + } + TopologyManager.releaseReadLock(); + } + if (log.isDebugEnabled()){ + log.debug("Member TimeStamp Map: " + memberTimeStampMap); + log.debug("Member ID Map: " + memberIdMap); + } + } + + private void publishMemberFault(String memberID){ + Member member = memberIdMap.get(memberID); + if (member == null){ + log.error("Failed to publish MemberFault event. Member having [member-id] " + memberID + " does not exist in topology"); + return; + } + MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(), member.getPartitionId(), 0); + memberFaultEventMessageMap.put("message", memberFaultEvent); + Properties headers = new Properties(); + headers.put(Constants.EVENT_CLASS_NAME, memberFaultEvent.getClass().getName()); + healthStatPublisher.publish(MemberFaultEventMap, headers, true); + + if (log.isDebugEnabled()){ + log.debug("Published MemberFault event for [member-id] " + memberID); + } + } + + @Override public void run() { try { - while (true) { - threadBarrier.pass(); - Iterator it = timeStampMap.entrySet().iterator(); - - while ( it.hasNext() ) { - Map.Entry pair = (Map.Entry)it.next(); - long currentTime = System.currentTimeMillis(); - InEvent event = (InEvent)pair.getValue(); - - if ((currentTime - event.getTimeStamp()) / MILI_TO_MINUTE > TIME_OUT) { - log.info("Member Inactive : " + pair.getKey() + " : " + "for " + TIME_OUT + " seconds"); - it.remove(); - log.debug("Inactive member : " + pair.getKey() + " : " + "for " + TIME_OUT + " seconds"); - nextProcessor.process(event); - } - } - - // to avoid cpu spinning when there're no entries in map - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { + threadBarrier.pass(); + loadFromTopology(); + Iterator it = memberTimeStampMap.entrySet().iterator(); + + while ( it.hasNext() ) { + Map.Entry pair = (Map.Entry)it.next(); + long currentTime = System.currentTimeMillis(); + Long eventTimeStamp = (Long) pair.getValue(); + + if ((currentTime - eventTimeStamp) > TIME_OUT) { + log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); + it.remove(); + publishMemberFault((String) pair.getKey()); } } + if (log.isDebugEnabled()){ + log.debug("Fault handling processor iteration completed with [time-stamp map length] " + memberTimeStampMap.size() + " [activated member-count] " + memberIdMap.size()); + } + eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); } catch (Throwable t) { log.error(t.getMessage(), t); } @@ -157,6 +217,11 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } else { window = new SchedulerSiddhiQueue<StreamEvent>(this); } + MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); + this.topologyEventReceiver = new TopologyEventReceiver(); + Thread thread = new Thread(topologyEventReceiver); + thread.start(); + log.info("WSO2 CEP topology receiver thread started"); //Ordinary scheduling window.schedule(); @@ -168,6 +233,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); } + @Override public void scheduleNow() { eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); } @@ -177,12 +243,14 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run this.eventRemoverScheduler = scheduledExecutorService; } + @Override public void setThreadBarrier(ThreadBarrier threadBarrier) { this.threadBarrier = threadBarrier; } @Override public void destroy(){ + this.topologyEventReceiver.terminate(); window = null; } }
