Repository: stratos Updated Branches: refs/heads/master 35db118b2 -> 73d5dce00
OOM fix for CEP/Stratos Window Processors Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/73d5dce0 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/73d5dce0 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/73d5dce0 Branch: refs/heads/master Commit: 73d5dce00bc5b8f0ab0f58328a10ae4b9dc3dbc5 Parents: 35db118 Author: lasinducharith <[email protected]> Authored: Mon Jun 22 19:09:43 2015 +0530 Committer: lasinducharith <[email protected]> Committed: Mon Jun 22 19:09:43 2015 +0530 ---------------------------------------------------------------------- .../extension/FaultHandlingWindowProcessor.java | 21 ++++++++++++------ .../GradientFinderWindowProcessor.java | 21 +++++++++++++----- .../SecondDerivativeFinderWindowProcessor.java | 23 +++++++++++++++----- 3 files changed, 47 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/73d5dce0/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index e22c00b..c029c34 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -48,10 +48,7 @@ import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * CEP window processor to handle faulty member instances. This window processor is responsible for @@ -68,6 +65,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run private ExecutorService executorService; private ScheduledExecutorService faultHandleScheduler; + private ScheduledFuture<?> lastSchedule; private ThreadBarrier threadBarrier; private long timeToKeep; private ISchedulerSiddhiQueue<StreamEvent> window; @@ -246,7 +244,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run } catch (Throwable t) { log.error(t.getMessage(), t); } finally { - faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); } } @@ -298,12 +299,18 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override public void schedule() { - faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); } @Override public void scheduleNow() { - faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); } @Override http://git-wip-us.apache.org/repos/asf/stratos/blob/73d5dce0/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java index 9f7ea9a..dff0f79 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java @@ -46,6 +46,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @SiddhiExtension(namespace = "stratos", function = "gradient") @@ -53,6 +54,7 @@ public class GradientFinderWindowProcessor extends WindowProcessor implements Ru static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class); private ScheduledExecutorService eventRemoverScheduler; + private ScheduledFuture<?> lastSchedule; private long timeToKeep; private int subjectedAttrIndex; private Attribute.Type subjectedAttrType; @@ -135,9 +137,12 @@ public class GradientFinderWindowProcessor extends WindowProcessor implements Ru long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); if (diff > 0) { - try { - eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ex) { + try { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { log.warn("scheduling cannot be accepted for execution: elementID " + elementId); } @@ -247,11 +252,17 @@ public class GradientFinderWindowProcessor extends WindowProcessor implements Ru @Override public void schedule() { - eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); } public void scheduleNow() { - eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); } @Override http://git-wip-us.apache.org/repos/asf/stratos/blob/73d5dce0/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java index 252f490..96cff22 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java @@ -46,6 +46,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @SiddhiExtension(namespace = "stratos", function = "secondDerivative") @@ -53,6 +54,7 @@ public class SecondDerivativeFinderWindowProcessor extends WindowProcessor imple static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class); private ScheduledExecutorService eventRemoverScheduler; + private ScheduledFuture<?> lastSchedule; private long timeToKeep; private int subjectedAttrIndex; private Attribute.Type subjectedAttrType; @@ -153,7 +155,10 @@ public class SecondDerivativeFinderWindowProcessor extends WindowProcessor imple long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); if (diff > 0) { try { - eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { log.warn("scheduling cannot be accepted for execution: elementID " + elementId); @@ -265,12 +270,18 @@ public class SecondDerivativeFinderWindowProcessor extends WindowProcessor imple @Override public void schedule() { - eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } - public void scheduleNow() { - eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } @Override public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
