http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java deleted file mode 100644 index 0aa01ed..0000000 --- a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cep.extension; - -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Logger; -import org.apache.stratos.common.threading.StratosThreadPool; -import org.apache.stratos.messaging.broker.publish.EventPublisher; -import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import org.apache.stratos.messaging.util.MessagingUtil; -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.event.StreamEvent; -import org.wso2.siddhi.core.event.in.InEvent; -import org.wso2.siddhi.core.event.in.InListEvent; -import org.wso2.siddhi.core.persistence.ThreadBarrier; -import org.wso2.siddhi.core.query.QueryPostProcessingElement; -import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; -import org.wso2.siddhi.core.query.processor.window.WindowProcessor; -import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; -import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import org.wso2.siddhi.query.api.expression.Expression; -import org.wso2.siddhi.query.api.expression.Variable; -import org.wso2.siddhi.query.api.expression.constant.IntConstant; -import org.wso2.siddhi.query.api.expression.constant.LongConstant; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -/** - * CEP window processor to handle faulty member instances. This window processor is responsible for - * publishing MemberFault event if health stats are not received within a given time window. - */ -@SiddhiExtension(namespace = "stratos", function = "faultHandling") -public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - - private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); - - private static final int TIME_OUT = 60 * 1000; - public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; - public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10; - - private ExecutorService executorService; - private ScheduledExecutorService faultHandleScheduler; - private ScheduledFuture<?> lastSchedule; - private ThreadBarrier threadBarrier; - private long timeToKeep; - private ISchedulerSiddhiQueue<StreamEvent> window; - private EventPublisher healthStatPublisher = - EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); - private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); - private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); - - // Map of member id's to their last received health event time stamp - private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); - - // Event receiver to receive topology events published by cloud-controller - private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); - - // Stratos member id attribute index in stream execution plan - private int memberIdAttrIndex; - - @Override - protected void processEvent(InEvent event) { - addDataToMap(event); - } - - @Override - protected void processEvent(InListEvent listEvent) { - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - addDataToMap((InEvent) listEvent.getEvent(i)); - } - } - - /** - * Add new entry to time stamp map from the received event. - * - * @param event Event received by Siddhi. - */ - protected void addDataToMap(InEvent event) { - String id = (String) event.getData()[memberIdAttrIndex]; - //checking whether this member is the topology. - //sometimes there can be a delay between publishing member terminated events - //and actually terminating instances. Hence CEP might get events for already terminated members - //so we are checking the topology for the member existence - Member member = getMemberFromId(id); - if (null == member) { - log.debug("Member not found in the topology. Event rejected"); - return; - } - if (StringUtils.isNotEmpty(id)) { - memberTimeStampMap.put(id, event.getTimeStamp()); - } else { - log.warn("NULL member id found in the event received. Event rejected."); - } - if (log.isDebugEnabled()){ - log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); - } - } - - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } - - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } - - /** - * Retrieve the current activated members from the topology and initialize the timestamp map. - * This will allow the system to recover from a restart - * - * @param topology Topology model object - */ - boolean loadTimeStampMapFromTopology(Topology topology){ - - long currentTimeStamp = System.currentTimeMillis(); - if (topology == null || topology.getServices() == null){ - return false; - } - // TODO make this efficient by adding APIs to messaging component - for (Service service : topology.getServices()) { - if (service.getClusters() != null) { - for (Cluster cluster : service.getClusters()) { - if (cluster.getMembers() != null) { - for (Member member : cluster.getMembers()) { - // we are checking faulty status only in previously activated members - if (member != null && MemberStatus.Active.equals(member.getStatus())) { - // Initialize the member time stamp map from the topology at the beginning - memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); - } - } - } - } - } - } - - if (log.isDebugEnabled()){ - log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + - memberTimeStampMap); - } - return true; - } - - private Member getMemberFromId(String memberId){ - if (StringUtils.isEmpty(memberId)){ - return null; - } - if (TopologyManager.getTopology().isInitialized()){ - try { - TopologyManager.acquireReadLock(); - if (TopologyManager.getTopology().getServices() == null){ - return null; - } - // TODO make this efficient by adding APIs to messaging component - for (Service service : TopologyManager.getTopology().getServices()) { - if (service.getClusters() != null) { - for (Cluster cluster : service.getClusters()) { - if (cluster.getMembers() != null) { - for (Member member : cluster.getMembers()){ - if (memberId.equals(member.getMemberId())){ - return member; - } - } - } - } - } - } - } catch (Exception e) { - log.error("Error while reading topology" + e); - } finally { - TopologyManager.releaseReadLock(); - } - } - return null; - } - - private void publishMemberFault(String memberId){ - Member member = getMemberFromId(memberId); - if (member == null){ - log.warn("Failed to publish member fault event. Member having [member-id] " + memberId + - " does not exist in topology"); - return; - } - log.info("Publishing member fault event for [member-id] " + memberId); - - MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), - member.getMemberId(), member.getPartitionId(), - member.getNetworkPartitionId(), 0); - - memberFaultEventMessageMap.put("message", memberFaultEvent); - healthStatPublisher.publish(MemberFaultEventMap, true); - } - - @Override - public void run() { - try { - threadBarrier.pass(); - - for (Object o : memberTimeStampMap.entrySet()) { - Map.Entry pair = (Map.Entry) o; - long currentTime = System.currentTimeMillis(); - Long eventTimeStamp = (Long) pair.getValue(); - - if ((currentTime - eventTimeStamp) > TIME_OUT) { - log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + - eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); - publishMemberFault((String) pair.getKey()); - } - } - if (log.isDebugEnabled()){ - log.debug("Fault handling processor iteration completed with [time-stamp map length] " + - memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); - } - } catch (Throwable t) { - log.error(t.getMessage(), t); - } finally { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - } - - @Override - protected Object[] currentState() { - return new Object[]{window.currentState()}; - } - - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - window.reSchedule(); - } - - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, - AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { - - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } - - String memberIdAttrName = ((Variable) parameters[1]).getAttributeName(); - memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName); - - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - - executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, - CEP_EXTENSION_THREAD_POOL_SIZE); - cepTopologyEventReceiver.setExecutorService(executorService); - cepTopologyEventReceiver.execute(); - - //Ordinary scheduling - window.schedule(); - if (log.isDebugEnabled()){ - log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + - ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + - ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); - } - } - - @Override - public void schedule() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - - @Override - public void scheduleNow() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.faultHandleScheduler = scheduledExecutorService; - } - - @Override - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } - - @Override - public void destroy(){ - // terminate topology listener thread - cepTopologyEventReceiver.terminate(); - window = null; - - // Shutdown executor service - if(executorService != null) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down cep extension executor service", e); - } - } - } - - public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { - return memberTimeStampMap; - } -}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java deleted file mode 100644 index dff0f79..0000000 --- a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cep.extension; - -import org.apache.log4j.Logger; -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.event.StreamEvent; -import org.wso2.siddhi.core.event.in.InEvent; -import org.wso2.siddhi.core.event.in.InListEvent; -import org.wso2.siddhi.core.event.remove.RemoveEvent; -import org.wso2.siddhi.core.event.remove.RemoveListEvent; -import org.wso2.siddhi.core.persistence.ThreadBarrier; -import org.wso2.siddhi.core.query.QueryPostProcessingElement; -import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; -import org.wso2.siddhi.core.query.processor.window.WindowProcessor; -import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; -import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.definition.Attribute.Type; -import org.wso2.siddhi.query.api.expression.Expression; -import org.wso2.siddhi.query.api.expression.Variable; -import org.wso2.siddhi.query.api.expression.constant.IntConstant; -import org.wso2.siddhi.query.api.expression.constant.LongConstant; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -@SiddhiExtension(namespace = "stratos", function = "gradient") -public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - - static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class); - private ScheduledExecutorService eventRemoverScheduler; - private ScheduledFuture<?> lastSchedule; - private long timeToKeep; - private int subjectedAttrIndex; - private Attribute.Type subjectedAttrType; - private List<InEvent> newEventList; - private List<RemoveEvent> oldEventList; - private ThreadBarrier threadBarrier; - private ISchedulerSiddhiQueue<StreamEvent> window; - - @Override - protected void processEvent(InEvent event) { - acquireLock(); - try { - newEventList.add(event); - } finally { - releaseLock(); - } - } - - @Override - protected void processEvent(InListEvent listEvent) { - acquireLock(); - try { - System.out.println(listEvent); - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - newEventList.add((InEvent) listEvent.getEvent(i)); - } - } finally { - releaseLock(); - } - } - - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } - - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } - - - @Override - public void run() { - acquireLock(); - try { - long scheduledTime = System.currentTimeMillis(); - try { - oldEventList.clear(); - while (true) { - threadBarrier.pass(); - RemoveEvent removeEvent = (RemoveEvent) window.poll(); - if (removeEvent == null) { - if (oldEventList.size() > 0) { - nextProcessor.process(new RemoveListEvent( - oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); - oldEventList.clear(); - } - - if (newEventList.size() > 0) { - InEvent[] inEvents = - newEventList.toArray(new InEvent[newEventList.size()]); - for (InEvent inEvent : inEvents) { - window.put(new RemoveEvent(inEvent, -1)); - } - - InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]); - - for (InEvent inEvent : gradientEvents) { - window.put(new RemoveEvent(inEvent, -1)); - } - nextProcessor.process(new InListEvent(gradientEvents)); - - newEventList.clear(); - } - - long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); - if (diff > 0) { - try { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ex) { - log.warn("scheduling cannot be accepted for execution: elementID " + - elementId); - } - break; - } - scheduledTime = System.currentTimeMillis(); - } else { - oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); - } - } - } catch (Throwable t) { - log.error(t.getMessage(), t); - } - } finally { - releaseLock(); - } - } - - - /** - * This function will calculate the linear gradient (per second) of the events received during - * a specified time period. - */ - private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) { - double firstVal = 0.0, lastVal = 0.0; - // FIXME I'm not sure whether there's some other good way to do correct casting, - // based on the type. - if (Type.DOUBLE.equals(subjectedAttrType)) { - firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.INT.equals(subjectedAttrType)) { - firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.LONG.equals(subjectedAttrType)) { - firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.FLOAT.equals(subjectedAttrType)) { - firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; - } - - long t1 = firstInEvent.getTimeStamp(); - long t2 = lastInEvent.getTimeStamp(); - long millisecondsForASecond = 1000; - long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; - double gradient = 0.0; - if (tGap > 0) { - gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; - } - if (log.isDebugEnabled()) { - log.debug("Gradient: " + gradient + " Last val: " + lastVal + - " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ - t2+" hash: "+this.hashCode()); - } - Object[] data = firstInEvent.getData().clone(); - data[subjectedAttrIndex] = gradient; - InEvent gradientEvent = - new InEvent(firstInEvent.getStreamId(), (t1+t2)/2, - data); - InEvent[] output = new InEvent[1]; - output[0] = gradientEvent; - return output; - } - - @Override - protected Object[] currentState() { - return new Object[]{window.currentState(), oldEventList, newEventList}; - } - - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - oldEventList = ((ArrayList<RemoveEvent>) data[1]); - newEventList = ((ArrayList<InEvent>) data[2]); - window.reSchedule(); - } - - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } - - String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); - subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); - subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); - - oldEventList = new ArrayList<RemoveEvent>(); - if (this.siddhiContext.isDistributedProcessingEnabled()) { - newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); - } else { - newEventList = new ArrayList<InEvent>(); - } - - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - //Ordinary scheduling - window.schedule(); - - } - - @Override - public void schedule() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - - public void scheduleNow() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.eventRemoverScheduler = scheduledExecutorService; - } - - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } - - @Override - public void destroy(){ - oldEventList = null; - newEventList = null; - window = null; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java deleted file mode 100755 index 0dc24bd..0000000 --- a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.cep.extension; - -/** - * Member Request Handling Capability Window Processor - */ - -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.executor.function.FunctionExecutor; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -@SiddhiExtension(namespace = "stratos", function = "divider") -public class MemeberRequestHandlingCapabilityWindowProcessor extends FunctionExecutor { - - Attribute.Type returnType = Attribute.Type.DOUBLE; - - @Override - public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { - } - - @Override - protected Object process(Object obj) { - - double[] value = new double[2]; - if (obj instanceof Object[]) { - int i=0; - for (Object aObj : (Object[]) obj) { - value[i]= Double.parseDouble(String.valueOf(aObj)); - i++; - } - }//to do avoid deviding zero number of active instances won't be zero cz there is min - Double unit = (value[0] / value[1]); - if(!unit.isNaN() && !unit.isInfinite()) - return unit; - else - return 0.0; - - } - - @Override - public void destroy() { - - } - - @Override - public Attribute.Type getReturnType() { - return returnType; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java deleted file mode 100644 index 96cff22..0000000 --- a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cep.extension; - -import org.apache.log4j.Logger; -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.event.StreamEvent; -import org.wso2.siddhi.core.event.in.InEvent; -import org.wso2.siddhi.core.event.in.InListEvent; -import org.wso2.siddhi.core.event.remove.RemoveEvent; -import org.wso2.siddhi.core.event.remove.RemoveListEvent; -import org.wso2.siddhi.core.persistence.ThreadBarrier; -import org.wso2.siddhi.core.query.QueryPostProcessingElement; -import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; -import org.wso2.siddhi.core.query.processor.window.WindowProcessor; -import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; -import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.definition.Attribute.Type; -import org.wso2.siddhi.query.api.expression.Expression; -import org.wso2.siddhi.query.api.expression.Variable; -import org.wso2.siddhi.query.api.expression.constant.IntConstant; -import org.wso2.siddhi.query.api.expression.constant.LongConstant; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -@SiddhiExtension(namespace = "stratos", function = "secondDerivative") -public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - - static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class); - private ScheduledExecutorService eventRemoverScheduler; - private ScheduledFuture<?> lastSchedule; - private long timeToKeep; - private int subjectedAttrIndex; - private Attribute.Type subjectedAttrType; - private List<InEvent> newEventList; - private List<RemoveEvent> oldEventList; - private ThreadBarrier threadBarrier; - private ISchedulerSiddhiQueue<StreamEvent> window; - - @Override - protected void processEvent(InEvent event) { - acquireLock(); - try { - newEventList.add(event); - } finally { - releaseLock(); - } - } - - @Override - protected void processEvent(InListEvent listEvent) { - acquireLock(); - try { - System.out.println(listEvent); - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - newEventList.add((InEvent) listEvent.getEvent(i)); - } - } finally { - releaseLock(); - } - } - - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } - - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } - - - @Override - public void run() { - acquireLock(); - try { - long scheduledTime = System.currentTimeMillis(); - try { - oldEventList.clear(); - while (true) { - threadBarrier.pass(); - RemoveEvent removeEvent = (RemoveEvent) window.poll(); - if (removeEvent == null) { - if (oldEventList.size() > 0) { - nextProcessor.process(new RemoveListEvent( - oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); - oldEventList.clear(); - } - - if (newEventList.size() > 0) { - InEvent[] inEvents = - newEventList.toArray(new InEvent[newEventList.size()]); - for (InEvent inEvent : inEvents) { - window.put(new RemoveEvent(inEvent, -1)); - } - - // in order to find second derivative, we need at least 3 events. - if (newEventList.size() > 2) { - - InEvent firstDerivative1 = - gradient(inEvents[0], - inEvents[(newEventList.size() / 2) - 1], - null)[0]; - InEvent firstDerivative2 = - gradient(inEvents[newEventList.size() / 2], - inEvents[newEventList.size() - 1], - null)[0]; - InEvent[] secondDerivative = - gradient(firstDerivative1, - firstDerivative2, Type.DOUBLE); - - for (InEvent inEvent : secondDerivative) { - window.put(new RemoveEvent(inEvent, -1)); - } - nextProcessor.process(new InListEvent(secondDerivative)); - } else { - log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " + - newEventList.size()); - } - - newEventList.clear(); - } - - long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); - if (diff > 0) { - try { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ex) { - log.warn("scheduling cannot be accepted for execution: elementID " + - elementId); - } - break; - } - scheduledTime = System.currentTimeMillis(); - } else { - oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); - } - } - } catch (Throwable t) { - log.error(t.getMessage(), t); - } - } finally { - releaseLock(); - } - } - - - /** - * This function will calculate the linear gradient (per second) of the events received during - * a specified time period. - */ - private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) { - Type attrType = type == null ? subjectedAttrType : type; - double firstVal = 0.0, lastVal = 0.0; - // FIXME I'm not sure whether there's some other good way to do correct casting, - // based on the type. - if (Type.DOUBLE.equals(attrType)) { - firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.INT.equals(attrType)) { - firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.LONG.equals(attrType)) { - firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.FLOAT.equals(attrType)) { - firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; - } - - long t1 = firstInEvent.getTimeStamp(); - long t2 = lastInEvent.getTimeStamp(); - long millisecondsForASecond = 1000; - long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; - double gradient = 0.0; - if (tGap > 0) { - gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; - } - if (log.isDebugEnabled()) { - log.debug("Gradient: " + gradient + " Last val: " + lastVal + - " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ - t2+" hash: "+this.hashCode()); - } - Object[] data = firstInEvent.getData().clone(); - data[subjectedAttrIndex] = gradient; - InEvent gradientEvent = - new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2), - data); - InEvent[] output = new InEvent[1]; - output[0] = gradientEvent; - return output; - } - - @Override - protected Object[] currentState() { - return new Object[]{window.currentState(), oldEventList, newEventList}; - } - - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - oldEventList = ((ArrayList<RemoveEvent>) data[1]); - newEventList = ((ArrayList<InEvent>) data[2]); - window.reSchedule(); - } - - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } - - String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); - subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); - subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); - - oldEventList = new ArrayList<RemoveEvent>(); - if (this.siddhiContext.isDistributedProcessingEnabled()) { - newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); - } else { - newEventList = new ArrayList<InEvent>(); - } - - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - //Ordinary scheduling - window.schedule(); - - } - - @Override - public void schedule() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - - public void scheduleNow() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.eventRemoverScheduler = scheduledExecutorService; - } - - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } - - @Override - public void destroy(){ - oldEventList = null; - newEventList = null; - window = null; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/pom.xml b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/pom.xml new file mode 100644 index 0000000..1b2bc7a --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/pom.xml @@ -0,0 +1,63 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- + # Licensed to the Apache Software Foundation (ASF) under one + # or more contributor license agreements. See the NOTICE file + # distributed with this work for additional information + # regarding copyright ownership. The ASF licenses this file + # to you under the Apache License, Version 2.0 (the + # "License"); you may not use this file except in compliance + # with the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, + # software distributed under the License is distributed on an + # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + # KIND, either express or implied. See the License for the + # specific language governing permissions and limitations + # under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.stratos</groupId> + <artifactId>cep-extensions</artifactId> + <version>4.1.3-SNAPSHOT</version> + <relativePath>../../../pom.xml</relativePath> + </parent> + + <artifactId>org.apache.stratos.cep300.extension</artifactId> + <name>Apache Stratos - CEP Extensions</name> + <description>Apache Stratos CEP Extensions</description> + + <repositories> + <repository> + <id>wso2-maven2-repository</id> + <name>WSO2 Maven2 Repository</name> + <url>http://dist.wso2.org/maven2</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.wso2.siddhi</groupId> + <artifactId>siddhi-core</artifactId> + <version>2.0.0-wso2v5</version> + </dependency> + <dependency> + <groupId>org.apache.stratos</groupId> + <artifactId>org.apache.stratos.messaging</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java new file mode 100644 index 0000000..59c70c5 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cep.extension; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * CEP Topology Receiver for Fault Handling Window Processor. + */ +public class CEPTopologyEventReceiver extends TopologyEventReceiver { + + private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); + + private FaultHandlingWindowProcessor faultHandler; + + public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { + this.faultHandler = faultHandler; + addEventListeners(); + } + + @Override + public void execute() { + super.execute(); + log.info("CEP topology event receiver thread started"); + } + + private void addEventListeners() { + // Load member time stamp map from the topology as a one time task + addEventListener(new CompleteTopologyEventListener() { + private boolean initialized; + + @Override + protected void onEvent(Event event) { + if (!initialized) { + try { + TopologyManager.acquireReadLock(); + log.debug("Complete topology event received to fault handling window processor."); + CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; + initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); + } catch (Exception e) { + log.error("Error loading member time stamp map from complete topology event.", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + } + }); + + // Remove member from the time stamp map when MemberTerminated event is received. + addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); + log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId()); + } + }); + + // Add member to time stamp map whenever member is activated + addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + // do not put this member if we have already received a health event + faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), + System.currentTimeMillis()); + log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java new file mode 100644 index 0000000..699f036 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cep.extension; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +@SiddhiExtension(namespace = "stratos", function = "concat") +public class ConcatWindowProcessor extends FunctionExecutor { + Attribute.Type returnType = Attribute.Type.STRING; + @Override + public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { + } + + @Override + protected Object process(Object obj) { + if (obj instanceof Object[]) { + StringBuffer sb=new StringBuffer(); + for (Object aObj : (Object[]) obj) { + sb.append(aObj); + } + return sb.toString(); + } else { + return obj.toString(); + } + + } + + @Override + public void destroy() { + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java new file mode 100644 index 0000000..0aa01ed --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.MessagingUtil; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.persistence.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * CEP window processor to handle faulty member instances. This window processor is responsible for + * publishing MemberFault event if health stats are not received within a given time window. + */ +@SiddhiExtension(namespace = "stratos", function = "faultHandling") +public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); + + private static final int TIME_OUT = 60 * 1000; + public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; + public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10; + + private ExecutorService executorService; + private ScheduledExecutorService faultHandleScheduler; + private ScheduledFuture<?> lastSchedule; + private ThreadBarrier threadBarrier; + private long timeToKeep; + private ISchedulerSiddhiQueue<StreamEvent> window; + private EventPublisher healthStatPublisher = + EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); + private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + + // Map of member id's to their last received health event time stamp + private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + + // Event receiver to receive topology events published by cloud-controller + private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); + + // Stratos member id attribute index in stream execution plan + private int memberIdAttrIndex; + + @Override + protected void processEvent(InEvent event) { + addDataToMap(event); + } + + @Override + protected void processEvent(InListEvent listEvent) { + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + addDataToMap((InEvent) listEvent.getEvent(i)); + } + } + + /** + * Add new entry to time stamp map from the received event. + * + * @param event Event received by Siddhi. + */ + protected void addDataToMap(InEvent event) { + String id = (String) event.getData()[memberIdAttrIndex]; + //checking whether this member is the topology. + //sometimes there can be a delay between publishing member terminated events + //and actually terminating instances. Hence CEP might get events for already terminated members + //so we are checking the topology for the member existence + Member member = getMemberFromId(id); + if (null == member) { + log.debug("Member not found in the topology. Event rejected"); + return; + } + if (StringUtils.isNotEmpty(id)) { + memberTimeStampMap.put(id, event.getTimeStamp()); + } else { + log.warn("NULL member id found in the event received. Event rejected."); + } + if (log.isDebugEnabled()){ + log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + /** + * Retrieve the current activated members from the topology and initialize the timestamp map. + * This will allow the system to recover from a restart + * + * @param topology Topology model object + */ + boolean loadTimeStampMapFromTopology(Topology topology){ + + long currentTimeStamp = System.currentTimeMillis(); + if (topology == null || topology.getServices() == null){ + return false; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : topology.getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()) { + // we are checking faulty status only in previously activated members + if (member != null && MemberStatus.Active.equals(member.getStatus())) { + // Initialize the member time stamp map from the topology at the beginning + memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); + } + } + } + } + } + } + + if (log.isDebugEnabled()){ + log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + + memberTimeStampMap); + } + return true; + } + + private Member getMemberFromId(String memberId){ + if (StringUtils.isEmpty(memberId)){ + return null; + } + if (TopologyManager.getTopology().isInitialized()){ + try { + TopologyManager.acquireReadLock(); + if (TopologyManager.getTopology().getServices() == null){ + return null; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : TopologyManager.getTopology().getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()){ + if (memberId.equals(member.getMemberId())){ + return member; + } + } + } + } + } + } + } catch (Exception e) { + log.error("Error while reading topology" + e); + } finally { + TopologyManager.releaseReadLock(); + } + } + return null; + } + + private void publishMemberFault(String memberId){ + Member member = getMemberFromId(memberId); + if (member == null){ + log.warn("Failed to publish member fault event. Member having [member-id] " + memberId + + " does not exist in topology"); + return; + } + log.info("Publishing member fault event for [member-id] " + memberId); + + MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), + member.getMemberId(), member.getPartitionId(), + member.getNetworkPartitionId(), 0); + + memberFaultEventMessageMap.put("message", memberFaultEvent); + healthStatPublisher.publish(MemberFaultEventMap, true); + } + + @Override + public void run() { + try { + threadBarrier.pass(); + + for (Object o : memberTimeStampMap.entrySet()) { + Map.Entry pair = (Map.Entry) o; + long currentTime = System.currentTimeMillis(); + Long eventTimeStamp = (Long) pair.getValue(); + + if ((currentTime - eventTimeStamp) > TIME_OUT) { + log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); + publishMemberFault((String) pair.getKey()); + } + } + if (log.isDebugEnabled()){ + log.debug("Fault handling processor iteration completed with [time-stamp map length] " + + memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } finally { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState()}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String memberIdAttrName = ((Variable) parameters[1]).getAttributeName(); + memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName); + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); + + executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, + CEP_EXTENSION_THREAD_POOL_SIZE); + cepTopologyEventReceiver.setExecutorService(executorService); + cepTopologyEventReceiver.execute(); + + //Ordinary scheduling + window.schedule(); + if (log.isDebugEnabled()){ + log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + + ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); + } + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + @Override + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.faultHandleScheduler = scheduledExecutorService; + } + + @Override + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + // terminate topology listener thread + cepTopologyEventReceiver.terminate(); + window = null; + + // Shutdown executor service + if(executorService != null) { + try { + executorService.shutdownNow(); + } catch (Exception e) { + log.warn("An error occurred while shutting down cep extension executor service", e); + } + } + } + + public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { + return memberTimeStampMap; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java new file mode 100644 index 0000000..f354ca7 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java @@ -0,0 +1,277 @@ +/* + * Copyright 2005-2009 WSO2, Inc. (http://wso2.com) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.log4j.Logger; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.event.remove.RemoveEvent; +import org.wso2.siddhi.core.event.remove.RemoveListEvent; +import org.wso2.siddhi.core.persistence.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@SiddhiExtension(namespace = "stratos", function = "gradient") +public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class); + private ScheduledExecutorService eventRemoverScheduler; + private ScheduledFuture<?> lastSchedule; + private long timeToKeep; + private int subjectedAttrIndex; + private Type subjectedAttrType; + private List<InEvent> newEventList; + private List<RemoveEvent> oldEventList; + private ThreadBarrier threadBarrier; + private ISchedulerSiddhiQueue<StreamEvent> window; + + @Override + protected void processEvent(InEvent event) { + acquireLock(); + try { + newEventList.add(event); + } finally { + releaseLock(); + } + } + + @Override + protected void processEvent(InListEvent listEvent) { + acquireLock(); + try { + System.out.println(listEvent); + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + newEventList.add((InEvent) listEvent.getEvent(i)); + } + } finally { + releaseLock(); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + + @Override + public void run() { + acquireLock(); + try { + long scheduledTime = System.currentTimeMillis(); + try { + oldEventList.clear(); + while (true) { + threadBarrier.pass(); + RemoveEvent removeEvent = (RemoveEvent) window.poll(); + if (removeEvent == null) { + if (oldEventList.size() > 0) { + nextProcessor.process(new RemoveListEvent( + oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); + oldEventList.clear(); + } + + if (newEventList.size() > 0) { + InEvent[] inEvents = + newEventList.toArray(new InEvent[newEventList.size()]); + for (InEvent inEvent : inEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + + InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]); + + for (InEvent inEvent : gradientEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + nextProcessor.process(new InListEvent(gradientEvents)); + + newEventList.clear(); + } + + long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); + if (diff > 0) { + try { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + log.warn("scheduling cannot be accepted for execution: elementID " + + elementId); + } + break; + } + scheduledTime = System.currentTimeMillis(); + } else { + oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); + } + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + } finally { + releaseLock(); + } + } + + + /** + * This function will calculate the linear gradient (per second) of the events received during + * a specified time period. + */ + private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) { + double firstVal = 0.0, lastVal = 0.0; + // FIXME I'm not sure whether there's some other good way to do correct casting, + // based on the type. + if (Type.DOUBLE.equals(subjectedAttrType)) { + firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.INT.equals(subjectedAttrType)) { + firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.LONG.equals(subjectedAttrType)) { + firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.FLOAT.equals(subjectedAttrType)) { + firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; + } + + long t1 = firstInEvent.getTimeStamp(); + long t2 = lastInEvent.getTimeStamp(); + long millisecondsForASecond = 1000; + long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; + double gradient = 0.0; + if (tGap > 0) { + gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; + } + if (log.isDebugEnabled()) { + log.debug("Gradient: " + gradient + " Last val: " + lastVal + + " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ + t2+" hash: "+this.hashCode()); + } + Object[] data = firstInEvent.getData().clone(); + data[subjectedAttrIndex] = gradient; + InEvent gradientEvent = + new InEvent(firstInEvent.getStreamId(), (t1+t2)/2, + data); + InEvent[] output = new InEvent[1]; + output[0] = gradientEvent; + return output; + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState(), oldEventList, newEventList}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + oldEventList = ((ArrayList<RemoveEvent>) data[1]); + newEventList = ((ArrayList<InEvent>) data[2]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); + subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); + + oldEventList = new ArrayList<RemoveEvent>(); + if (this.siddhiContext.isDistributedProcessingEnabled()) { + newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); + } else { + newEventList = new ArrayList<InEvent>(); + } + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + //Ordinary scheduling + window.schedule(); + + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.eventRemoverScheduler = scheduledExecutorService; + } + + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + oldEventList = null; + newEventList = null; + window = null; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java new file mode 100755 index 0000000..0dc24bd --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cep.extension; + +/** + * Member Request Handling Capability Window Processor + */ + +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +@SiddhiExtension(namespace = "stratos", function = "divider") +public class MemeberRequestHandlingCapabilityWindowProcessor extends FunctionExecutor { + + Attribute.Type returnType = Attribute.Type.DOUBLE; + + @Override + public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { + } + + @Override + protected Object process(Object obj) { + + double[] value = new double[2]; + if (obj instanceof Object[]) { + int i=0; + for (Object aObj : (Object[]) obj) { + value[i]= Double.parseDouble(String.valueOf(aObj)); + i++; + } + }//to do avoid deviding zero number of active instances won't be zero cz there is min + Double unit = (value[0] / value[1]); + if(!unit.isNaN() && !unit.isInfinite()) + return unit; + else + return 0.0; + + } + + @Override + public void destroy() { + + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } +}
