Repository: stratos Updated Branches: refs/heads/master d2615a5dc -> 88172d313
Refining CEPTopologyEventReceiver and removing Thread.sleep() Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/88172d31 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/88172d31 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/88172d31 Branch: refs/heads/master Commit: 88172d313b2891110f0402b69a8eecf39e6c0e87 Parents: d2615a5 Author: Imesh Gunaratne <[email protected]> Authored: Sat Mar 28 03:03:05 2015 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sat Mar 28 03:03:05 2015 +0530 ---------------------------------------------------------------------- .../cep/extension/CEPTopologyEventReceiver.java | 68 +++++--------------- .../extension/FaultHandlingWindowProcessor.java | 6 +- 2 files changed, 18 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/88172d31/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java index e384d95..59c70c5 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -31,29 +31,29 @@ import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListe import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import java.util.concurrent.ExecutorService; - /** * CEP Topology Receiver for Fault Handling Window Processor. */ -public class CEPTopologyEventReceiver implements Runnable { +public class CEPTopologyEventReceiver extends TopologyEventReceiver { private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); - private TopologyEventReceiver topologyEventReceiver; - private boolean terminated; private FaultHandlingWindowProcessor faultHandler; - private ExecutorService executorService; public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { - this.topologyEventReceiver = new TopologyEventReceiver(); this.faultHandler = faultHandler; addEventListeners(); } + @Override + public void execute() { + super.execute(); + log.info("CEP topology event receiver thread started"); + } + private void addEventListeners() { // Load member time stamp map from the topology as a one time task - topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { + addEventListener(new CompleteTopologyEventListener() { private boolean initialized; @Override @@ -61,7 +61,7 @@ public class CEPTopologyEventReceiver implements Runnable { if (!initialized) { try { TopologyManager.acquireReadLock(); - log.info("Complete topology event received to fault handling window processor."); + log.debug("Complete topology event received to fault handling window processor."); CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); } catch (Exception e) { @@ -74,64 +74,26 @@ public class CEPTopologyEventReceiver implements Runnable { }); // Remove member from the time stamp map when MemberTerminated event is received. - topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { + addEventListener(new MemberTerminatedEventListener() { @Override protected void onEvent(Event event) { MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); - log.info("Member [member id] " + memberTerminatedEvent.getMemberId() + - " was removed from the time stamp map."); + log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId()); } }); // Add member to time stamp map whenever member is activated - topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { + addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; // do not put this member if we have already received a health event - faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), System.currentTimeMillis()); - log.info("Member [member id] " + memberActivatedEvent.getMemberId() + - " was added to the time stamp map."); + faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), + System.currentTimeMillis()); + log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); } }); } - - @Override - public void run() { - try { - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); - // executorService.execute(topologyEventReceiver); - log.info("CEP topology receiver thread started"); - - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - log.info("CEP topology receiver thread terminated"); - } - - /** - * Terminate CEP topology receiver thread. - */ - public void terminate() { - topologyEventReceiver.terminate(); - terminated = true; - } - - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/88172d31/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index b6113c8..7737101 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -166,9 +166,9 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } } - log.info("Member timestamps were successfully loaded from the topology"); if (log.isDebugEnabled()){ - log.debug("Member timestamp map: " + memberTimeStampMap); + log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + + memberTimeStampMap); } return true; } @@ -285,7 +285,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); cepTopologyEventReceiver.setExecutorService(executorService); - executorService.execute(cepTopologyEventReceiver); + cepTopologyEventReceiver.execute(); //Ordinary scheduling window.schedule();
