fixing cep listening on topology issue by merging with master
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7a5797df Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7a5797df Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7a5797df Branch: refs/heads/4.0.0-grouping Commit: 7a5797df08bbf92483cd5993e6ce9ca576686e3d Parents: bea691b Author: reka <[email protected]> Authored: Mon Oct 27 14:10:15 2014 +0530 Committer: reka <[email protected]> Committed: Mon Oct 27 14:14:22 2014 +0530 ---------------------------------------------------------------------- .../cep/extension/CEPTopologyEventReceiver.java | 125 +++++++++++ .../extension/FaultHandlingWindowProcessor.java | 217 ++++++++++++------- 2 files changed, 267 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/7a5797df/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java new file mode 100644 index 0000000..90c67f0 --- /dev/null +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cep.extension; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * CEP Topology Receiver for Fault Handling Window Processor. + */ +public class CEPTopologyEventReceiver implements Runnable { + + private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); + + private TopologyEventReceiver topologyEventReceiver; + private boolean terminated; + private FaultHandlingWindowProcessor faultHandler; + + public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { + this.topologyEventReceiver = new TopologyEventReceiver(); + this.faultHandler = faultHandler; + addEventListeners(); + } + + private void addEventListeners() { + // Load member time stamp map from the topology as a one time task + topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { + private boolean initialized; + + @Override + protected void onEvent(Event event) { + if (!initialized) { + try { + TopologyManager.acquireReadLock(); + log.info("Complete topology event received to fault handling window processor."); + CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; + initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); + } catch (Exception e) { + log.error("Error loading member time stamp map from complete topology event.", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + } + }); + + // Remove member from the time stamp map when MemberTerminated event is received. + topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); + log.info("Member [member id] " + memberTerminatedEvent.getMemberId() + + " was removed from the time stamp map."); + } + }); + + // Add member to time stamp map whenever member is activated + topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + // do not put this member if we have already received a health event + faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), System.currentTimeMillis()); + log.info("Member [member id] " + memberActivatedEvent.getMemberId() + + " was added to the time stamp map."); + } + }); + } + + @Override + public void run() { + try { + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + Thread thread = new Thread(topologyEventReceiver); + thread.start(); + log.info("CEP topology receiver thread started"); + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + log.info("CEP topology receiver thread terminated"); + } + + /** + * Terminate CEP topology receiver thread. + */ + public void terminate() { + topologyEventReceiver.terminate(); + terminated = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/7a5797df/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 80174f4..0104a03 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -18,14 +18,19 @@ */ package org.apache.stratos.cep.extension; +import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.MemberStatus; -import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.stratos.messaging.util.Constants; @@ -52,23 +57,31 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +/** + * CEP window processor to handle faulty member instances. This window processor is responsible for + * publishing MemberFault event if health stats are not received within a given time window. + */ @SiddhiExtension(namespace = "stratos", function = "faultHandling") public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { private static final int TIME_OUT = 60 * 1000; static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); - private ScheduledExecutorService eventRemoverScheduler; - private int subjectedAttrIndex; + private ScheduledExecutorService faultHandleScheduler; private ThreadBarrier threadBarrier; private long timeToKeep; private ISchedulerSiddhiQueue<StreamEvent> window; + private EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC); + private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + + // Map of member id's to their last received health event time stamp private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); - private ConcurrentHashMap<String, Member> memberIdMap = new ConcurrentHashMap<String, Member>(); - EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC); - Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); - Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); - private TopologyEventReceiver topologyEventReceiver; - private String memberID; + + // Event receiver to receive topology events published by cloud-controller + private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); + + // Stratos member id attribute index in stream execution plan + private int memberIdAttrIndex; @Override protected void processEvent(InEvent event) { @@ -77,20 +90,34 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override protected void processEvent(InListEvent listEvent) { - System.out.println(listEvent); for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { addDataToMap((InEvent) listEvent.getEvent(i)); } } + /** + * Add new entry to time stamp map from the received event. + * + * @param event Event received by Siddhi. + */ protected void addDataToMap(InEvent event) { - if (memberID != null) { - String id = (String)event.getData()[subjectedAttrIndex]; + String id = (String) event.getData()[memberIdAttrIndex]; + //checking whether this member is the topology. + //sometimes there can be a delay between publishing member terminated events + //and actually terminating instances. Hence CEP might get events for already terminated members + //so we are checking the topology for the member existence + Member member = getMemberFromId(id); + if (null == member) { + log.debug("Member not found in the toplogy. Event rejected"); + return; + } + if (StringUtils.isNotEmpty(id)) { memberTimeStampMap.put(id, event.getTimeStamp()); - log.debug("Event received from [member-id] " + id); + } else { + log.warn("NULL member id found in the event received. Event rejected."); } - else { - log.error("NULL member ID in the event received"); + if (log.isDebugEnabled()){ + log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); } } @@ -108,55 +135,88 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } - /* - * Retrieve the current activated member list from the topology and put them into the - * memberTimeStampMap if not already exists. This will allow the system to recover - * from any inconsistent state caused by MB/CEP failures. - */ - private void loadFromTopology(){ - if (TopologyManager.getTopology().isInitialized()){ - TopologyManager.acquireReadLock(); - memberIdMap.clear(); - long currentTimeStamp = System.currentTimeMillis(); - Iterator<Service> servicesItr = TopologyManager.getTopology().getServices().iterator(); - while(servicesItr.hasNext()){ - Service service = servicesItr.next(); - Iterator<Cluster> clusterItr = service.getClusters().iterator(); - while(clusterItr.hasNext()){ - Cluster cluster = clusterItr.next(); - Iterator<Member> memberItr = cluster.getMembers().iterator(); - while(memberItr.hasNext()){ - Member member = memberItr.next(); - if (member.getStatus().equals(MemberStatus.Activated)){ - memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); - memberIdMap.put(member.getMemberId(), member); + /** + * Retrieve the current activated members from the topology and initialize the time stamp map. + * This will allow the system to recover from a restart + * + * @param topology Topology model object + */ + boolean loadTimeStampMapFromTopology(Topology topology){ + + long currentTimeStamp = System.currentTimeMillis(); + if (topology == null || topology.getServices() == null){ + return false; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : topology.getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()) { + // we are checking faulty status only in previously activated members + if (member != null && MemberStatus.Activated.equals(member.getStatus())) { + // Initialize the member time stamp map from the topology at the beginning + memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); + } } } } } - TopologyManager.releaseReadLock(); } + + log.info("Member time stamp map was successfully loaded from the topology."); if (log.isDebugEnabled()){ log.debug("Member TimeStamp Map: " + memberTimeStampMap); - log.debug("Member ID Map: " + memberIdMap); } + return true; } - private void publishMemberFault(String memberID){ - Member member = memberIdMap.get(memberID); + private Member getMemberFromId(String memberId){ + if (StringUtils.isEmpty(memberId)){ + return null; + } + if (TopologyManager.getTopology().isInitialized()){ + try { + TopologyManager.acquireReadLock(); + if (TopologyManager.getTopology().getServices() == null){ + return null; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : TopologyManager.getTopology().getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()){ + if (memberId.equals(member.getMemberId())){ + return member; + } + } + } + } + } + } + } catch (Exception e) { + log.error("Error while reading topology" + e); + } finally { + TopologyManager.releaseReadLock(); + } + } + return null; + } + + private void publishMemberFault(String memberId){ + Member member = getMemberFromId(memberId); if (member == null){ - log.error("Failed to publish MemberFault event. Member having [member-id] " + memberID + " does not exist in topology"); + log.error("Failed to publish member fault event. Member having [member-id] " + memberId + + " does not exist in topology"); return; } - MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(), member.getPartitionId(), 0); - memberFaultEventMessageMap.put("message", memberFaultEvent); - Properties headers = new Properties(); - headers.put(Constants.EVENT_CLASS_NAME, memberFaultEvent.getClass().getName()); - healthStatPublisher.publish(MemberFaultEventMap, headers, true); + log.info("Publishing member fault event for [member-id] " + memberId); - if (log.isDebugEnabled()){ - log.debug("Published MemberFault event for [member-id] " + memberID); - } + MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(), + member.getPartitionId(), 0); + memberFaultEventMessageMap.put("message", memberFaultEvent); + healthStatPublisher.publish(MemberFaultEventMap, true); } @@ -164,26 +224,26 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run public void run() { try { threadBarrier.pass(); - loadFromTopology(); - Iterator it = memberTimeStampMap.entrySet().iterator(); - while ( it.hasNext() ) { - Map.Entry pair = (Map.Entry)it.next(); + for (Object o : memberTimeStampMap.entrySet()) { + Map.Entry pair = (Map.Entry) o; long currentTime = System.currentTimeMillis(); Long eventTimeStamp = (Long) pair.getValue(); if ((currentTime - eventTimeStamp) > TIME_OUT) { - log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); - it.remove(); + log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); publishMemberFault((String) pair.getKey()); } } if (log.isDebugEnabled()){ - log.debug("Fault handling processor iteration completed with [time-stamp map length] " + memberTimeStampMap.size() + " [activated member-count] " + memberIdMap.size()); + log.debug("Fault handling processor iteration completed with [time-stamp map length] " + + memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); } - eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); } catch (Throwable t) { log.error(t.getMessage(), t); + } finally { + faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); } } @@ -200,17 +260,16 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { if (parameters[0] instanceof IntConstant) { timeToKeep = ((IntConstant) parameters[0]).getValue(); } else { timeToKeep = ((LongConstant) parameters[0]).getValue(); } - memberID = ((Variable)parameters[1]).getAttributeName(); - - String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); - subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + String memberIdAttrName = ((Variable) parameters[1]).getAttributeName(); + memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName); if (this.siddhiContext.isDistributedProcessingEnabled()) { window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); @@ -218,29 +277,32 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run window = new SchedulerSiddhiQueue<StreamEvent>(this); } MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - this.topologyEventReceiver = new TopologyEventReceiver(); - Thread thread = new Thread(topologyEventReceiver); - thread.start(); - log.info("WSO2 CEP topology receiver thread started"); + + Thread topologyTopicSubscriberThread = new Thread(cepTopologyEventReceiver); + topologyTopicSubscriberThread.start(); //Ordinary scheduling window.schedule(); - + if (log.isDebugEnabled()){ + log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + + ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); + } } @Override public void schedule() { - eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); } @Override public void scheduleNow() { - eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); } @Override public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.eventRemoverScheduler = scheduledExecutorService; + this.faultHandleScheduler = scheduledExecutorService; } @Override @@ -250,7 +312,12 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override public void destroy(){ - this.topologyEventReceiver.terminate(); + // terminate topology listener thread + cepTopologyEventReceiver.terminate(); window = null; } + + public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { + return memberTimeStampMap; + } }
