Improved fault-handling logic
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ce18e63e Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ce18e63e Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ce18e63e Branch: refs/heads/master Commit: ce18e63ec0f384d4cee3b0d0b9ab596d7da87cad Parents: 28c24dd Author: Akila Perera <[email protected]> Authored: Mon Sep 29 20:24:07 2014 +0530 Committer: Akila Perera <[email protected]> Committed: Tue Sep 30 16:58:51 2014 +0530 ---------------------------------------------------------------------- .../cep/extension/CEPTopologyEventReceiver.java | 133 +++++ .../extension/FaultHandlingWindowProcessor.java | 517 ++++++++++--------- 2 files changed, 413 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/ce18e63e/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java new file mode 100644 index 0000000..70c5679 --- /dev/null +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cep.extension; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * CEP Topology Receiver for Fault Handling Window Processor. + */ +public class CEPTopologyEventReceiver implements Runnable { + + private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); + + private TopologyEventReceiver topologyEventReceiver; + private boolean terminated; + private FaultHandlingWindowProcessor faultHandler; + + public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { + this.topologyEventReceiver = new TopologyEventReceiver(); + this.faultHandler = faultHandler; + addEventListeners(); + } + + private void addEventListeners() { + // Load member time stamp map from the topology as a one time task + topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { + private boolean initialized; + + @Override + protected void onEvent(Event event) { + if (!initialized) { + try { + TopologyManager.acquireReadLock(); + if (log.isInfoEnabled()) { + log.info("Complete topology event received to fault handling window processor."); + } + CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; + initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); + } catch (Exception e) { + log.error("Error loading member time stamp map from complete topology event.", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + } + }); + + // Remove member from the time stamp map when MemberTerminated event is received. + topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); + if (log.isInfoEnabled()){ + log.info("Member [member id] " + memberTerminatedEvent.getMemberId() + + " was removed from the time stamp map."); + } + } + }); + + // Add member to time stamp map whenever member is activated + topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + // do not put this member if we have already received a health event + faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), System.currentTimeMillis()); + log.info("Member [member id] " + memberActivatedEvent.getMemberId() + + " was added to the time stamp map."); + } + }); + } + + @Override + public void run() { + try { + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + Thread thread = new Thread(topologyEventReceiver); + thread.start(); + if (log.isInfoEnabled()) { + log.info("CEP topology receiver thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if (log.isInfoEnabled()) { + log.info("CEP topology receiver thread terminated"); + } + } + + /** + * Terminate CEP topology receiver thread. + */ + public void terminate() { + topologyEventReceiver.terminate(); + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/ce18e63e/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 0205566..3414b8a 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -1,40 +1,39 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.cep.extension; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - +import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.MemberStatus; -import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.Constants; import org.wso2.siddhi.core.config.SiddhiContext; import org.wso2.siddhi.core.event.StreamEvent; import org.wso2.siddhi.core.event.in.InEvent; @@ -53,224 +52,268 @@ import org.wso2.siddhi.query.api.expression.constant.IntConstant; import org.wso2.siddhi.query.api.expression.constant.LongConstant; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * CEP window processor to handle faulty member instances. This window processor is responsible for + * publishing MemberFault event if health stats are not received within a given time window. + */ @SiddhiExtension(namespace = "stratos", function = "faultHandling") -public class FaultHandlingWindowProcessor extends WindowProcessor implements - RunnableWindowProcessor { - - private static final String HEALTH_STAT_MEMBER_FAULT_EVENT = "health/stat/MemberFaultEvent"; - private static final int TIME_OUT = 60 * 1000; - static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); - private ScheduledExecutorService eventRemoverScheduler; - private int subjectedAttrIndex; - private ThreadBarrier threadBarrier; - private long timeToKeep; - private ISchedulerSiddhiQueue<StreamEvent> window; - private final ConcurrentHashMap<String, Long> memberTimeStampMap = - new ConcurrentHashMap<String, Long>(); - private final ConcurrentHashMap<String, Member> memberIdMap = - new ConcurrentHashMap<String, Member>(); - - EventPublisher healthStatPublisher = null; - Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); - Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); - private TopologyEventReceiver topologyEventReceiver; - private String memberID; - - @Override - protected void processEvent(InEvent event) { - addDataToMap(event); - } - - @Override - protected void processEvent(InListEvent listEvent) { - System.out.println(listEvent); - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - addDataToMap((InEvent) listEvent.getEvent(i)); - } - } - - protected void addDataToMap(InEvent event) { - if (memberID != null) { - String id = (String) event.getData()[subjectedAttrIndex]; - memberTimeStampMap.put(id, event.getTimeStamp()); - log.debug("Event received from [member-id] " + id); - } else { - log.error("NULL member ID in the event received"); - } - } - - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } - - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } - - /* - * Retrieve the current activated member list from the topology and put them - * into the - * memberTimeStampMap if not already exists. This will allow the system to - * recover - * from any inconsistent state caused by MB/CEP failures. - */ - private void loadFromTopology() { - if (TopologyManager.getTopology().isInitialized()) { - TopologyManager.acquireReadLock(); - memberIdMap.clear(); - long currentTimeStamp = System.currentTimeMillis(); - Iterator<Service> servicesItr = TopologyManager.getTopology().getServices().iterator(); - while (servicesItr.hasNext()) { - Service service = servicesItr.next(); - Iterator<Cluster> clusterItr = service.getClusters().iterator(); - while (clusterItr.hasNext()) { - Cluster cluster = clusterItr.next(); - Iterator<Member> memberItr = cluster.getMembers().iterator(); - while (memberItr.hasNext()) { - Member member = memberItr.next(); - if (member.getStatus().equals(MemberStatus.Activated)) { - memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); - memberIdMap.put(member.getMemberId(), member); - } - } - } - } - TopologyManager.releaseReadLock(); - } - if (log.isDebugEnabled()) { - log.debug("Member TimeStamp Map: " + memberTimeStampMap); - log.debug("Member ID Map: " + memberIdMap); - } - } - - private void publishMemberFault(String memberID) { - Member member = memberIdMap.get(memberID); - if (member == null) { - log.error("Failed to publish MemberFault event. Member having [member-id] " + memberID + - " does not exist in topology"); - return; - } - MemberFaultEvent memberFaultEvent = - new MemberFaultEvent(member.getClusterId(), - member.getMemberId(), - member.getPartitionId(), 0); - memberFaultEventMessageMap.put("message", memberFaultEvent); - - healthStatPublisher = EventPublisherPool.getPublisher(HEALTH_STAT_MEMBER_FAULT_EVENT); - healthStatPublisher.publish(MemberFaultEventMap, true); - - if (log.isDebugEnabled()) { - log.debug("Published MemberFault event for [member-id] " + memberID); - } - } - - @Override - public void run() { - try { - threadBarrier.pass(); - loadFromTopology(); - Iterator it = memberTimeStampMap.entrySet().iterator(); - - while (it.hasNext()) { - Map.Entry pair = (Map.Entry) it.next(); - long currentTime = System.currentTimeMillis(); - Long eventTimeStamp = (Long) pair.getValue(); - - if ((currentTime - eventTimeStamp) > TIME_OUT) { - log.info("Faulty member detected [member-id] " + pair.getKey() + - " with [last time-stamp] " + eventTimeStamp + " [time-out] " + - TIME_OUT + " milliseconds"); - it.remove(); - publishMemberFault((String) pair.getKey()); - } - } - if (log.isDebugEnabled()) { - log.debug("Fault handling processor iteration completed with [time-stamp map length] " + - memberTimeStampMap.size() + - " [activated member-count] " + - memberIdMap.size()); - } - eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - log.error(t.getMessage(), t); - } - } - - @Override - protected Object[] currentState() { - return new Object[] { window.currentState() }; - } - - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - window.reSchedule(); - } - - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, - AbstractDefinition streamDefinition, String elementId, boolean async, - SiddhiContext siddhiContext) { - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } - - memberID = ((Variable) parameters[1]).getAttributeName(); - - String subjectedAttr = ((Variable) parameters[1]).getAttributeName(); - subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); - - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = - new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, - this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", - memberFaultEventMessageMap); - this.topologyEventReceiver = new TopologyEventReceiver(); - Thread thread = new Thread(topologyEventReceiver); - thread.start(); - log.info("WSO2 CEP topology receiver thread started"); - - // Ordinary scheduling - window.schedule(); - - } - - @Override - public void schedule() { - eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - - @Override - public void scheduleNow() { - eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.eventRemoverScheduler = scheduledExecutorService; - } - - @Override - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } - - @Override - public void destroy() { - this.topologyEventReceiver.terminate(); - window = null; - } +public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + private static final int TIME_OUT = 60 * 1000; + static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); + private ScheduledExecutorService faultHandleScheduler; + private ThreadBarrier threadBarrier; + private long timeToKeep; + private ISchedulerSiddhiQueue<StreamEvent> window; + private EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC); + private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + + // Map of member id's to their last received health event time stamp + private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + + // Event receiver to receive topology events published by cloud-controller + private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); + + // Stratos member id attribute index in stream execution plan + private int memberIdAttrIndex; + + @Override + protected void processEvent(InEvent event) { + addDataToMap(event); + } + + @Override + protected void processEvent(InListEvent listEvent) { + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + addDataToMap((InEvent) listEvent.getEvent(i)); + } + } + + /** + * Add new entry to time stamp map from the received event. + * + * @param event Event received by Siddhi. + */ + protected void addDataToMap(InEvent event) { + String id = (String) event.getData()[memberIdAttrIndex]; + if (StringUtils.isNotEmpty(id)) { + memberTimeStampMap.put(id, event.getTimeStamp()); + } else { + log.warn("NULL member id found in the event received. Event rejected."); + } + if (log.isDebugEnabled()){ + log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + /** + * Retrieve the current activated members from the topology and initialize the time stamp map. + * This will allow the system to recover from a restart + * + * @param topology Topology model object + */ + boolean loadTimeStampMapFromTopology(Topology topology){ + + long currentTimeStamp = System.currentTimeMillis(); + if (topology == null || topology.getServices() == null){ + return false; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : topology.getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()) { + // we are checking faulty status only in previously activated members + if (member != null && MemberStatus.Activated.equals(member.getStatus())) { + // Initialize the member time stamp map from the topology at the beginning + memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); + } + } + } + } + } + } + + if (log.isInfoEnabled()){ + log.info("Member time stamp map was successfully loaded from the topology."); + } + if (log.isDebugEnabled()){ + log.debug("Member TimeStamp Map: " + memberTimeStampMap); + } + return true; + } + + private Member getMemberFromId(String memberId){ + if (StringUtils.isEmpty(memberId)){ + return null; + } + if (TopologyManager.getTopology().isInitialized()){ + TopologyManager.acquireReadLock(); + if (TopologyManager.getTopology().getServices() == null){ + return null; + } + + // TODO make this efficient by adding APIs to messaging component + for (Service service : TopologyManager.getTopology().getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()){ + if (memberId.equals(member.getMemberId())){ + return member; + } + } + } + } + } + } + TopologyManager.releaseReadLock(); + } + return null; + } + + private void publishMemberFault(String memberId){ + Member member = getMemberFromId(memberId); + if (member == null){ + log.error("Failed to publish member fault event. Member having [member-id] " + memberId + + " does not exist in topology"); + return; + } + if (log.isInfoEnabled()){ + log.info("Publishing member fault event for [member-id] " + memberId); + } + MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(), + member.getPartitionId(), 0); + memberFaultEventMessageMap.put("message", memberFaultEvent); + Properties headers = new Properties(); + headers.put(Constants.EVENT_CLASS_NAME, memberFaultEvent.getClass().getName()); + healthStatPublisher.publish(MemberFaultEventMap, headers, true); + } + + + @Override + public void run() { + try { + threadBarrier.pass(); + + for (Object o : memberTimeStampMap.entrySet()) { + Map.Entry pair = (Map.Entry) o; + long currentTime = System.currentTimeMillis(); + Long eventTimeStamp = (Long) pair.getValue(); + + if ((currentTime - eventTimeStamp) > TIME_OUT) { + log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); + publishMemberFault((String) pair.getKey()); + } + } + if (log.isDebugEnabled()){ + log.debug("Fault handling processor iteration completed with [time-stamp map length] " + + memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } finally { + faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState()}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String memberIdAttrName = ((Variable) parameters[1]).getAttributeName(); + memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName); + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); + + Thread topologyTopicSubscriberThread = new Thread(cepTopologyEventReceiver); + topologyTopicSubscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("CEP topology receiver thread started"); + } + + //Ordinary scheduling + window.schedule(); + + if (log.isDebugEnabled()){ + log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + + ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); + } + } + + @Override + public void schedule() { + faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + @Override + public void scheduleNow() { + faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.faultHandleScheduler = scheduledExecutorService; + } + + @Override + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + // terminate topology listener thread + cepTopologyEventReceiver.terminate(); + window = null; + } + + public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { + return memberTimeStampMap; + } }
