Repository: stratos
Updated Branches:
  refs/heads/master 35db118b2 -> 73d5dce00


OOM fix for CEP/Stratos Window Processors


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/73d5dce0
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/73d5dce0
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/73d5dce0

Branch: refs/heads/master
Commit: 73d5dce00bc5b8f0ab0f58328a10ae4b9dc3dbc5
Parents: 35db118
Author: lasinducharith <[email protected]>
Authored: Mon Jun 22 19:09:43 2015 +0530
Committer: lasinducharith <[email protected]>
Committed: Mon Jun 22 19:09:43 2015 +0530

----------------------------------------------------------------------
 .../extension/FaultHandlingWindowProcessor.java | 21 ++++++++++++------
 .../GradientFinderWindowProcessor.java          | 21 +++++++++++++-----
 .../SecondDerivativeFinderWindowProcessor.java  | 23 +++++++++++++++-----
 3 files changed, 47 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/73d5dce0/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index e22c00b..c029c34 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -48,10 +48,7 @@ import 
org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * CEP window processor to handle faulty member instances. This window 
processor is responsible for
@@ -68,6 +65,7 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
 
     private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
+    private ScheduledFuture<?> lastSchedule;
        private ThreadBarrier threadBarrier;
        private long timeToKeep;
        private ISchedulerSiddhiQueue<StreamEvent> window;
@@ -246,7 +244,10 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
         } catch (Throwable t) {
             log.error(t.getMessage(), t);
         } finally {
-            faultHandleScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
+            if (lastSchedule != null) {
+                lastSchedule.cancel(false);
+            }
+            lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
         }
     }
 
@@ -298,12 +299,18 @@ public class FaultHandlingWindowProcessor extends 
WindowProcessor implements Run
 
     @Override
     public void schedule() {
-        faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+        if (lastSchedule != null) {
+            lastSchedule.cancel(false);
+        }
+        lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void scheduleNow() {
-        faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+        if (lastSchedule != null) {
+            lastSchedule.cancel(false);
+        }
+        lastSchedule = faultHandleScheduler.schedule(this, 0, 
TimeUnit.MILLISECONDS);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/73d5dce0/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
index 9f7ea9a..dff0f79 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 @SiddhiExtension(namespace = "stratos", function = "gradient")
@@ -53,6 +54,7 @@ public class GradientFinderWindowProcessor extends 
WindowProcessor implements Ru
 
     static final Logger log = 
Logger.getLogger(GradientFinderWindowProcessor.class);
     private ScheduledExecutorService eventRemoverScheduler;
+    private ScheduledFuture<?> lastSchedule;
     private long timeToKeep;
     private int subjectedAttrIndex;
     private Attribute.Type subjectedAttrType;
@@ -135,9 +137,12 @@ public class GradientFinderWindowProcessor extends 
WindowProcessor implements Ru
 
                                                long diff = timeToKeep - 
(System.currentTimeMillis() - scheduledTime);
                                                if (diff > 0) {
-                                                       try {
-                                                               
eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
-                                                       } catch 
(RejectedExecutionException ex) {
+                            try {
+                                if (lastSchedule != null) {
+                                    lastSchedule.cancel(false);
+                                }
+                                lastSchedule = 
eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+                            } catch (RejectedExecutionException ex) {
                                                                
log.warn("scheduling cannot be accepted for execution: elementID " +
                                                                         
elementId);
                                                        }
@@ -247,11 +252,17 @@ public class GradientFinderWindowProcessor extends 
WindowProcessor implements Ru
 
     @Override
     public void schedule() {
-        eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
+        if (lastSchedule != null) {
+            lastSchedule.cancel(false);
+            }
+        lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
     }
 
     public void scheduleNow() {
-        eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+        if (lastSchedule != null) {
+            lastSchedule.cancel(false);
+        }
+        lastSchedule = eventRemoverScheduler.schedule(this, 0, 
TimeUnit.MILLISECONDS);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/73d5dce0/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
index 252f490..96cff22 100644
--- 
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
+++ 
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 @SiddhiExtension(namespace = "stratos", function = "secondDerivative")
@@ -53,6 +54,7 @@ public class SecondDerivativeFinderWindowProcessor extends 
WindowProcessor imple
 
     static final Logger log = 
Logger.getLogger(SecondDerivativeFinderWindowProcessor.class);
     private ScheduledExecutorService eventRemoverScheduler;
+       private ScheduledFuture<?> lastSchedule;
     private long timeToKeep;
     private int subjectedAttrIndex;
     private Attribute.Type subjectedAttrType;
@@ -153,7 +155,10 @@ public class SecondDerivativeFinderWindowProcessor extends 
WindowProcessor imple
                                                long diff = timeToKeep - 
(System.currentTimeMillis() - scheduledTime);
                                                if (diff > 0) {
                                                        try {
-                                                               
eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+                                                               if 
(lastSchedule != null) {
+                                                                       
lastSchedule.cancel(false);
+                                                               }
+                                                               lastSchedule = 
eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
                                                        } catch 
(RejectedExecutionException ex) {
                                                                
log.warn("scheduling cannot be accepted for execution: elementID " +
                                                                         
elementId);
@@ -265,12 +270,18 @@ public class SecondDerivativeFinderWindowProcessor 
extends WindowProcessor imple
 
     @Override
     public void schedule() {
-        eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
-    }
+               if (lastSchedule != null) {
+                       lastSchedule.cancel(false);
+               }
+               lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, 
TimeUnit.MILLISECONDS);
+       }
 
-    public void scheduleNow() {
-        eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
-    }
+       public void scheduleNow() {
+               if (lastSchedule != null) {
+                       lastSchedule.cancel(false);
+               }
+               lastSchedule = eventRemoverScheduler.schedule(this, 0, 
TimeUnit.MILLISECONDS);
+       }
 
     @Override
     public void setScheduledExecutorService(ScheduledExecutorService 
scheduledExecutorService) {

Reply via email to