Repository: stratos Updated Branches: refs/heads/stratos-4.1.x 3a2acec71 -> b80bfc4f6
Fixing STRATOS-1594: Failed to publish member fault event log message printed repeatedly Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b80bfc4f Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b80bfc4f Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b80bfc4f Branch: refs/heads/stratos-4.1.x Commit: b80bfc4f6ae9a546b539676c3bc2a67fa56b106c Parents: 3a2acec Author: Akila Perera <[email protected]> Authored: Sun Nov 1 09:20:41 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Sun Nov 1 09:20:41 2015 +0530 ---------------------------------------------------------------------- .../extension/FaultHandlingWindowProcessor.java | 189 +++++++++--------- .../extension/FaultHandlingWindowProcessor.java | 191 +++++++++---------- 2 files changed, 189 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/b80bfc4f/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 0aa01ed..0526f6a 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -48,20 +48,17 @@ import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * CEP window processor to handle faulty member instances. This window processor is responsible for * publishing MemberFault event if health stats are not received within a given time window. */ -@SiddhiExtension(namespace = "stratos", function = "faultHandling") +@SiddhiExtension(namespace = "stratos", + function = "faultHandling") public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); + private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); private static final int TIME_OUT = 60 * 1000; public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; @@ -70,57 +67,57 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run private ExecutorService executorService; private ScheduledExecutorService faultHandleScheduler; private ScheduledFuture<?> lastSchedule; - private ThreadBarrier threadBarrier; - private long timeToKeep; - private ISchedulerSiddhiQueue<StreamEvent> window; - private EventPublisher healthStatPublisher = - EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); - private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); - private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); - - // Map of member id's to their last received health event time stamp - private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); - - // Event receiver to receive topology events published by cloud-controller - private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); - - // Stratos member id attribute index in stream execution plan - private int memberIdAttrIndex; - - @Override - protected void processEvent(InEvent event) { - addDataToMap(event); - } - - @Override - protected void processEvent(InListEvent listEvent) { - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - addDataToMap((InEvent) listEvent.getEvent(i)); - } - } - - /** - * Add new entry to time stamp map from the received event. - * - * @param event Event received by Siddhi. - */ - protected void addDataToMap(InEvent event) { - String id = (String) event.getData()[memberIdAttrIndex]; - //checking whether this member is the topology. - //sometimes there can be a delay between publishing member terminated events - //and actually terminating instances. Hence CEP might get events for already terminated members - //so we are checking the topology for the member existence - Member member = getMemberFromId(id); - if (null == member) { - log.debug("Member not found in the topology. Event rejected"); - return; - } + private ThreadBarrier threadBarrier; + private long timeToKeep; + private ISchedulerSiddhiQueue<StreamEvent> window; + private EventPublisher healthStatPublisher = EventPublisherPool + .getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); + private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + + // Map of member id's to their last received health event time stamp + private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + + // Event receiver to receive topology events published by cloud-controller + private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); + + // Stratos member id attribute index in stream execution plan + private int memberIdAttrIndex; + + @Override + protected void processEvent(InEvent event) { + addDataToMap(event); + } + + @Override + protected void processEvent(InListEvent listEvent) { + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + addDataToMap((InEvent) listEvent.getEvent(i)); + } + } + + /** + * Add new entry to time stamp map from the received event. + * + * @param event Event received by Siddhi. + */ + protected void addDataToMap(InEvent event) { + String id = (String) event.getData()[memberIdAttrIndex]; + //checking whether this member is the topology. + //sometimes there can be a delay between publishing member terminated events + //and actually terminating instances. Hence CEP might get events for already terminated members + //so we are checking the topology for the member existence + Member member = getMemberFromId(id); + if (null == member) { + log.warn(String.format("Member with [id] %s not found in the topology. Event rejected", id)); + return; + } if (StringUtils.isNotEmpty(id)) { memberTimeStampMap.put(id, event.getTimeStamp()); } else { log.warn("NULL member id found in the event received. Event rejected."); } - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); } } @@ -140,15 +137,15 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } /** - * Retrieve the current activated members from the topology and initialize the timestamp map. - * This will allow the system to recover from a restart + * Retrieve the current activated members from the topology and initialize the timestamp map. + * This will allow the system to recover from a restart * - * @param topology Topology model object + * @param topology Topology model object */ - boolean loadTimeStampMapFromTopology(Topology topology){ + boolean loadTimeStampMapFromTopology(Topology topology) { long currentTimeStamp = System.currentTimeMillis(); - if (topology == null || topology.getServices() == null){ + if (topology == null || topology.getServices() == null) { return false; } // TODO make this efficient by adding APIs to messaging component @@ -168,21 +165,21 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } - if (log.isDebugEnabled()){ - log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + - memberTimeStampMap); + if (log.isDebugEnabled()) { + log.debug( + "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap); } return true; } - private Member getMemberFromId(String memberId){ - if (StringUtils.isEmpty(memberId)){ + private Member getMemberFromId(String memberId) { + if (StringUtils.isEmpty(memberId)) { return null; } - if (TopologyManager.getTopology().isInitialized()){ - try { + if (TopologyManager.getTopology().isInitialized()) { + try { TopologyManager.acquireReadLock(); - if (TopologyManager.getTopology().getServices() == null){ + if (TopologyManager.getTopology().getServices() == null) { return null; } // TODO make this efficient by adding APIs to messaging component @@ -190,8 +187,8 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run if (service.getClusters() != null) { for (Cluster cluster : service.getClusters()) { if (cluster.getMembers() != null) { - for (Member member : cluster.getMembers()){ - if (memberId.equals(member.getMemberId())){ + for (Member member : cluster.getMembers()) { + if (memberId.equals(member.getMemberId())) { return member; } } @@ -199,27 +196,24 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } } - } catch (Exception e) { - log.error("Error while reading topology" + e); - } finally { - TopologyManager.releaseReadLock(); - } + } catch (Exception e) { + log.error("Error while reading topology" + e); + } finally { + TopologyManager.releaseReadLock(); + } } return null; } - private void publishMemberFault(String memberId){ - Member member = getMemberFromId(memberId); - if (member == null){ - log.warn("Failed to publish member fault event. Member having [member-id] " + memberId + - " does not exist in topology"); + private void publishMemberFault(Member member) { + if (member == null) { + log.warn("Failed to publish member fault event. Member object is null"); return; } - log.info("Publishing member fault event for [member-id] " + memberId); + log.info("Publishing member fault event for [member-id] " + member.getMemberId()); MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), - member.getMemberId(), member.getPartitionId(), - member.getNetworkPartitionId(), 0); + member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0); memberFaultEventMessageMap.put("message", memberFaultEvent); healthStatPublisher.publish(MemberFaultEventMap, true); @@ -229,19 +223,23 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run public void run() { try { threadBarrier.pass(); - for (Object o : memberTimeStampMap.entrySet()) { Map.Entry pair = (Map.Entry) o; long currentTime = System.currentTimeMillis(); Long eventTimeStamp = (Long) pair.getValue(); if ((currentTime - eventTimeStamp) > TIME_OUT) { - log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + - eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); - publishMemberFault((String) pair.getKey()); + String memberId = (String) pair.getKey(); + Member member = getMemberFromId(memberId); + if (member != null) { + log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); + publishMemberFault(member); + } + memberTimeStampMap.remove(memberId); } } - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Fault handling processor iteration completed with [time-stamp map length] " + memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); } @@ -257,7 +255,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected Object[] currentState() { - return new Object[]{window.currentState()}; + return new Object[] { window.currentState() }; } @Override @@ -269,7 +267,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, - AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { if (parameters[0] instanceof IntConstant) { timeToKeep = ((IntConstant) parameters[0]).getValue(); @@ -285,16 +283,17 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } else { window = new SchedulerSiddhiQueue<StreamEvent>(this); } - MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); + MemberFaultEventMap + .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, - CEP_EXTENSION_THREAD_POOL_SIZE); - cepTopologyEventReceiver.setExecutorService(executorService); + executorService = StratosThreadPool + .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); + cepTopologyEventReceiver.setExecutorService(executorService); cepTopologyEventReceiver.execute(); //Ordinary scheduling window.schedule(); - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); @@ -328,13 +327,13 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } @Override - public void destroy(){ + public void destroy() { // terminate topology listener thread cepTopologyEventReceiver.terminate(); window = null; // Shutdown executor service - if(executorService != null) { + if (executorService != null) { try { executorService.shutdownNow(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b80bfc4f/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 305eda8..7d8a2a5 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -25,10 +25,10 @@ import org.wso2.siddhi.core.config.SiddhiContext; import org.wso2.siddhi.core.event.StreamEvent; import org.wso2.siddhi.core.event.in.InEvent; import org.wso2.siddhi.core.event.in.InListEvent; -import org.wso2.siddhi.core.snapshot.ThreadBarrier; import org.wso2.siddhi.core.query.QueryPostProcessingElement; import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.snapshot.ThreadBarrier; import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; @@ -42,20 +42,17 @@ import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * CEP window processor to handle faulty member instances. This window processor is responsible for * publishing MemberFault event if health stats are not received within a given time window. */ -@SiddhiExtension(namespace = "stratos", function = "faultHandling") +@SiddhiExtension(namespace = "stratos", + function = "faultHandling") public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); + private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); private static final int TIME_OUT = 60 * 1000; public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; @@ -64,57 +61,57 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run private ExecutorService executorService; private ScheduledExecutorService faultHandleScheduler; private ScheduledFuture<?> lastSchedule; - private ThreadBarrier threadBarrier; - private long timeToKeep; - private ISchedulerSiddhiQueue<StreamEvent> window; - private EventPublisher healthStatPublisher = - EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); - private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); - private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); - - // Map of member id's to their last received health event time stamp - private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); - - // Event receiver to receive topology events published by cloud-controller - private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); - - // Stratos member id attribute index in stream execution plan - private int memberIdAttrIndex; - - @Override - protected void processEvent(InEvent event) { - addDataToMap(event); - } - - @Override - protected void processEvent(InListEvent listEvent) { - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - addDataToMap((InEvent) listEvent.getEvent(i)); - } - } - - /** - * Add new entry to time stamp map from the received event. - * - * @param event Event received by Siddhi. - */ - protected void addDataToMap(InEvent event) { - String id = (String) event.getData()[memberIdAttrIndex]; - //checking whether this member is the topology. - //sometimes there can be a delay between publishing member terminated events - //and actually terminating instances. Hence CEP might get events for already terminated members - //so we are checking the topology for the member existence - Member member = getMemberFromId(id); - if (null == member) { - log.debug("Member not found in the topology. Event rejected"); - return; - } + private ThreadBarrier threadBarrier; + private long timeToKeep; + private ISchedulerSiddhiQueue<StreamEvent> window; + private EventPublisher healthStatPublisher = EventPublisherPool + .getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); + private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + + // Map of member id's to their last received health event time stamp + private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + + // Event receiver to receive topology events published by cloud-controller + private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); + + // Stratos member id attribute index in stream execution plan + private int memberIdAttrIndex; + + @Override + protected void processEvent(InEvent event) { + addDataToMap(event); + } + + @Override + protected void processEvent(InListEvent listEvent) { + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + addDataToMap((InEvent) listEvent.getEvent(i)); + } + } + + /** + * Add new entry to time stamp map from the received event. + * + * @param event Event received by Siddhi. + */ + protected void addDataToMap(InEvent event) { + String id = (String) event.getData()[memberIdAttrIndex]; + //checking whether this member is the topology. + //sometimes there can be a delay between publishing member terminated events + //and actually terminating instances. Hence CEP might get events for already terminated members + //so we are checking the topology for the member existence + Member member = getMemberFromId(id); + if (null == member) { + log.warn("Member not found in the topology. Event rejected"); + return; + } if (StringUtils.isNotEmpty(id)) { memberTimeStampMap.put(id, event.getTimeStamp()); } else { log.warn("NULL member id found in the event received. Event rejected."); } - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); } } @@ -134,15 +131,15 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } /** - * Retrieve the current activated members from the topology and initialize the timestamp map. - * This will allow the system to recover from a restart + * Retrieve the current activated members from the topology and initialize the timestamp map. + * This will allow the system to recover from a restart * - * @param topology Topology model object + * @param topology Topology model object */ - boolean loadTimeStampMapFromTopology(Topology topology){ + boolean loadTimeStampMapFromTopology(Topology topology) { long currentTimeStamp = System.currentTimeMillis(); - if (topology == null || topology.getServices() == null){ + if (topology == null || topology.getServices() == null) { return false; } // TODO make this efficient by adding APIs to messaging component @@ -162,21 +159,21 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } - if (log.isDebugEnabled()){ - log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + - memberTimeStampMap); + if (log.isDebugEnabled()) { + log.debug( + "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap); } return true; } - private Member getMemberFromId(String memberId){ - if (StringUtils.isEmpty(memberId)){ + private Member getMemberFromId(String memberId) { + if (StringUtils.isEmpty(memberId)) { return null; } - if (TopologyManager.getTopology().isInitialized()){ - try { + if (TopologyManager.getTopology().isInitialized()) { + try { TopologyManager.acquireReadLock(); - if (TopologyManager.getTopology().getServices() == null){ + if (TopologyManager.getTopology().getServices() == null) { return null; } // TODO make this efficient by adding APIs to messaging component @@ -184,8 +181,8 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run if (service.getClusters() != null) { for (Cluster cluster : service.getClusters()) { if (cluster.getMembers() != null) { - for (Member member : cluster.getMembers()){ - if (memberId.equals(member.getMemberId())){ + for (Member member : cluster.getMembers()) { + if (memberId.equals(member.getMemberId())) { return member; } } @@ -193,27 +190,24 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } } - } catch (Exception e) { - log.error("Error while reading topology" + e); - } finally { - TopologyManager.releaseReadLock(); - } + } catch (Exception e) { + log.error("Error while reading topology" + e); + } finally { + TopologyManager.releaseReadLock(); + } } return null; } - private void publishMemberFault(String memberId){ - Member member = getMemberFromId(memberId); - if (member == null){ - log.warn("Failed to publish member fault event. Member having [member-id] " + memberId + - " does not exist in topology"); + private void publishMemberFault(Member member) { + if (member == null) { + log.warn("Failed to publish member fault event. Member object is null"); return; } - log.info("Publishing member fault event for [member-id] " + memberId); + log.info("Publishing member fault event for [member-id] " + member.getMemberId()); MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), - member.getMemberId(), member.getPartitionId(), - member.getNetworkPartitionId(), 0); + member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0); memberFaultEventMessageMap.put("message", memberFaultEvent); healthStatPublisher.publish(MemberFaultEventMap, true); @@ -223,19 +217,23 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run public void run() { try { threadBarrier.pass(); - for (Object o : memberTimeStampMap.entrySet()) { Map.Entry pair = (Map.Entry) o; long currentTime = System.currentTimeMillis(); Long eventTimeStamp = (Long) pair.getValue(); if ((currentTime - eventTimeStamp) > TIME_OUT) { - log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + - eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); - publishMemberFault((String) pair.getKey()); + String memberId = (String) pair.getKey(); + Member member = getMemberFromId(memberId); + if (member != null) { + log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); + publishMemberFault(member); + } + memberTimeStampMap.remove(memberId); } } - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Fault handling processor iteration completed with [time-stamp map length] " + memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); } @@ -251,7 +249,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected Object[] currentState() { - return new Object[]{window.currentState()}; + return new Object[] { window.currentState() }; } @Override @@ -262,7 +260,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, - AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { if (parameters[0] instanceof IntConstant) { timeToKeep = ((IntConstant) parameters[0]).getValue(); @@ -278,16 +276,17 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } else { window = new SchedulerSiddhiQueue<StreamEvent>(this); } - MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); + MemberFaultEventMap + .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, - CEP_EXTENSION_THREAD_POOL_SIZE); - cepTopologyEventReceiver.setExecutorService(executorService); + executorService = StratosThreadPool + .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); + cepTopologyEventReceiver.setExecutorService(executorService); cepTopologyEventReceiver.execute(); //Ordinary scheduling window.schedule(); - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); @@ -321,13 +320,13 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } @Override - public void destroy(){ + public void destroy() { // terminate topology listener thread cepTopologyEventReceiver.terminate(); window = null; // Shutdown executor service - if(executorService != null) { + if (executorService != null) { try { executorService.shutdownNow(); } catch (Exception e) {
