Repository: stratos
Updated Branches:
  refs/heads/master d2615a5dc -> 88172d313


Refining CEPTopologyEventReceiver and removing Thread.sleep()


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/88172d31
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/88172d31
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/88172d31

Branch: refs/heads/master
Commit: 88172d313b2891110f0402b69a8eecf39e6c0e87
Parents: d2615a5
Author: Imesh Gunaratne <[email protected]>
Authored: Sat Mar 28 03:03:05 2015 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Sat Mar 28 03:03:05 2015 +0530

----------------------------------------------------------------------
 .../cep/extension/CEPTopologyEventReceiver.java | 68 +++++---------------
 .../extension/FaultHandlingWindowProcessor.java |  6 +-
 2 files changed, 18 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/88172d31/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index e384d95..59c70c5 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -31,29 +31,29 @@ import 
org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListe
 import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
-public class CEPTopologyEventReceiver implements Runnable {
+public class CEPTopologyEventReceiver extends TopologyEventReceiver {
 
     private static final Log log = 
LogFactory.getLog(CEPTopologyEventReceiver.class);
 
-    private TopologyEventReceiver topologyEventReceiver;
-    private boolean terminated;
     private FaultHandlingWindowProcessor faultHandler;
-       private ExecutorService executorService;
 
     public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) 
{
-        this.topologyEventReceiver = new TopologyEventReceiver();
         this.faultHandler = faultHandler;
         addEventListeners();
     }
 
+    @Override
+    public void execute() {
+        super.execute();
+        log.info("CEP topology event receiver thread started");
+    }
+
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
-        topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
+        addEventListener(new CompleteTopologyEventListener() {
             private boolean initialized;
 
             @Override
@@ -61,7 +61,7 @@ public class CEPTopologyEventReceiver implements Runnable {
                 if (!initialized) {
                     try {
                         TopologyManager.acquireReadLock();
-                        log.info("Complete topology event received to fault 
handling window processor.");
+                        log.debug("Complete topology event received to fault 
handling window processor.");
                         CompleteTopologyEvent completeTopologyEvent = 
(CompleteTopologyEvent) event;
                         initialized = 
faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
                     } catch (Exception e) {
@@ -74,64 +74,26 @@ public class CEPTopologyEventReceiver implements Runnable {
         });
 
         // Remove member from the time stamp map when MemberTerminated event 
is received.
-        topologyEventReceiver.addEventListener(new 
MemberTerminatedEventListener() {
+        addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
                 
faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
-                log.info("Member [member id] " + 
memberTerminatedEvent.getMemberId() +
-                            " was removed from the time stamp map.");
+                log.debug("Member was removed from the timestamp map: [member] 
" + memberTerminatedEvent.getMemberId());
             }
         });
 
         // Add member to time stamp map whenever member is activated
-        topologyEventReceiver.addEventListener(new 
MemberActivatedEventListener() {
+        addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
 
                 // do not put this member if we have already received a health 
event
-                
faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
 System.currentTimeMillis());
-                log.info("Member [member id] " + 
memberActivatedEvent.getMemberId() +
-                        " was added to the time stamp map.");
+                
faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
+                        System.currentTimeMillis());
+                log.debug("Member was added to the timestamp map: [member] " + 
memberActivatedEvent.getMemberId());
             }
         });
     }
-
-    @Override
-    public void run() {
-        try {
-            Thread.sleep(15000);
-        } catch (InterruptedException ignore) {
-        }
-        topologyEventReceiver.setExecutorService(executorService);
-           topologyEventReceiver.execute();
-        //   executorService.execute(topologyEventReceiver);
-        log.info("CEP topology receiver thread started");
-
-        // Keep the thread live until terminated
-        while (!terminated) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ignore) {
-            }
-        }
-        log.info("CEP topology receiver thread terminated");
-    }
-
-    /**
-     * Terminate CEP topology receiver thread.
-     */
-    public void terminate() {
-        topologyEventReceiver.terminate();
-        terminated = true;
-    }
-
-       public ExecutorService getExecutorService() {
-               return executorService;
-       }
-
-       public void setExecutorService(ExecutorService executorService) {
-               this.executorService = executorService;
-       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/88172d31/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index b6113c8..7737101 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -166,9 +166,9 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
             }
         }
 
-        log.info("Member timestamps were successfully loaded from the 
topology");
         if (log.isDebugEnabled()){
-            log.debug("Member timestamp map: " + memberTimeStampMap);
+            log.debug("Member timestamps were successfully loaded from the 
topology: [timestamps] " +
+                    memberTimeStampMap);
         }
         return true;
     }
@@ -285,7 +285,7 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
            executorService = 
StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY,
                 CEP_EXTENSION_THREAD_POOL_SIZE);
            cepTopologyEventReceiver.setExecutorService(executorService);
-           executorService.execute(cepTopologyEventReceiver);
+        cepTopologyEventReceiver.execute();
 
         //Ordinary scheduling
         window.schedule();

Reply via email to