http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java new file mode 100644 index 0000000..261259a --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java @@ -0,0 +1,295 @@ +/* + * Copyright 2005-2009 WSO2, Inc. (http://wso2.com) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.log4j.Logger; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.event.remove.RemoveEvent; +import org.wso2.siddhi.core.event.remove.RemoveListEvent; +import org.wso2.siddhi.core.persistence.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@SiddhiExtension(namespace = "stratos", function = "secondDerivative") +public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class); + private ScheduledExecutorService eventRemoverScheduler; + private ScheduledFuture<?> lastSchedule; + private long timeToKeep; + private int subjectedAttrIndex; + private Type subjectedAttrType; + private List<InEvent> newEventList; + private List<RemoveEvent> oldEventList; + private ThreadBarrier threadBarrier; + private ISchedulerSiddhiQueue<StreamEvent> window; + + @Override + protected void processEvent(InEvent event) { + acquireLock(); + try { + newEventList.add(event); + } finally { + releaseLock(); + } + } + + @Override + protected void processEvent(InListEvent listEvent) { + acquireLock(); + try { + System.out.println(listEvent); + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + newEventList.add((InEvent) listEvent.getEvent(i)); + } + } finally { + releaseLock(); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + + @Override + public void run() { + acquireLock(); + try { + long scheduledTime = System.currentTimeMillis(); + try { + oldEventList.clear(); + while (true) { + threadBarrier.pass(); + RemoveEvent removeEvent = (RemoveEvent) window.poll(); + if (removeEvent == null) { + if (oldEventList.size() > 0) { + nextProcessor.process(new RemoveListEvent( + oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); + oldEventList.clear(); + } + + if (newEventList.size() > 0) { + InEvent[] inEvents = + newEventList.toArray(new InEvent[newEventList.size()]); + for (InEvent inEvent : inEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + + // in order to find second derivative, we need at least 3 events. + if (newEventList.size() > 2) { + + InEvent firstDerivative1 = + gradient(inEvents[0], + inEvents[(newEventList.size() / 2) - 1], + null)[0]; + InEvent firstDerivative2 = + gradient(inEvents[newEventList.size() / 2], + inEvents[newEventList.size() - 1], + null)[0]; + InEvent[] secondDerivative = + gradient(firstDerivative1, + firstDerivative2, Type.DOUBLE); + + for (InEvent inEvent : secondDerivative) { + window.put(new RemoveEvent(inEvent, -1)); + } + nextProcessor.process(new InListEvent(secondDerivative)); + } else { + log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " + + newEventList.size()); + } + + newEventList.clear(); + } + + long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); + if (diff > 0) { + try { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + log.warn("scheduling cannot be accepted for execution: elementID " + + elementId); + } + break; + } + scheduledTime = System.currentTimeMillis(); + } else { + oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); + } + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + } finally { + releaseLock(); + } + } + + + /** + * This function will calculate the linear gradient (per second) of the events received during + * a specified time period. + */ + private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) { + Type attrType = type == null ? subjectedAttrType : type; + double firstVal = 0.0, lastVal = 0.0; + // FIXME I'm not sure whether there's some other good way to do correct casting, + // based on the type. + if (Type.DOUBLE.equals(attrType)) { + firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.INT.equals(attrType)) { + firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.LONG.equals(attrType)) { + firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.FLOAT.equals(attrType)) { + firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; + } + + long t1 = firstInEvent.getTimeStamp(); + long t2 = lastInEvent.getTimeStamp(); + long millisecondsForASecond = 1000; + long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; + double gradient = 0.0; + if (tGap > 0) { + gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; + } + if (log.isDebugEnabled()) { + log.debug("Gradient: " + gradient + " Last val: " + lastVal + + " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ + t2+" hash: "+this.hashCode()); + } + Object[] data = firstInEvent.getData().clone(); + data[subjectedAttrIndex] = gradient; + InEvent gradientEvent = + new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2), + data); + InEvent[] output = new InEvent[1]; + output[0] = gradientEvent; + return output; + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState(), oldEventList, newEventList}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + oldEventList = ((ArrayList<RemoveEvent>) data[1]); + newEventList = ((ArrayList<InEvent>) data[2]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); + subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); + + oldEventList = new ArrayList<RemoveEvent>(); + if (this.siddhiContext.isDistributedProcessingEnabled()) { + newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); + } else { + newEventList = new ArrayList<InEvent>(); + } + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + //Ordinary scheduling + window.schedule(); + + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.eventRemoverScheduler = scheduledExecutorService; + } + + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + oldEventList = null; + newEventList = null; + window = null; + } +}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/pom.xml b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/pom.xml new file mode 100644 index 0000000..f9d7f98 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/pom.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- + ~ Copyright 2005-2009 WSO2, Inc. (http://wso2.com) + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.stratos</groupId> + <artifactId>cep-extensions</artifactId> + <version>4.1.3-SNAPSHOT</version> + <relativePath>../../../pom.xml</relativePath> + </parent> + + <artifactId>org.apache.stratos.cep.310.extension</artifactId> + <name>Apache Stratos - CEP Extensions</name> + <description>Apache Stratos CEP Extensions</description> + + <repositories> + <repository> + <id>wso2-maven2-repository</id> + <name>WSO2 Maven2 Repository</name> + <url>http://dist.wso2.org/maven2</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.wso2.siddhi</groupId> + <artifactId>siddhi-core</artifactId> + <version>2.1.0-wso2v1</version> + </dependency> + <dependency> + <groupId>org.apache.stratos</groupId> + <artifactId>org.apache.stratos.messaging</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java new file mode 100644 index 0000000..59c70c5 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cep.extension; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * CEP Topology Receiver for Fault Handling Window Processor. + */ +public class CEPTopologyEventReceiver extends TopologyEventReceiver { + + private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); + + private FaultHandlingWindowProcessor faultHandler; + + public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { + this.faultHandler = faultHandler; + addEventListeners(); + } + + @Override + public void execute() { + super.execute(); + log.info("CEP topology event receiver thread started"); + } + + private void addEventListeners() { + // Load member time stamp map from the topology as a one time task + addEventListener(new CompleteTopologyEventListener() { + private boolean initialized; + + @Override + protected void onEvent(Event event) { + if (!initialized) { + try { + TopologyManager.acquireReadLock(); + log.debug("Complete topology event received to fault handling window processor."); + CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; + initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); + } catch (Exception e) { + log.error("Error loading member time stamp map from complete topology event.", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + } + }); + + // Remove member from the time stamp map when MemberTerminated event is received. + addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); + log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId()); + } + }); + + // Add member to time stamp map whenever member is activated + addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + // do not put this member if we have already received a health event + faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), + System.currentTimeMillis()); + log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java new file mode 100644 index 0000000..699f036 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cep.extension; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +@SiddhiExtension(namespace = "stratos", function = "concat") +public class ConcatWindowProcessor extends FunctionExecutor { + Attribute.Type returnType = Attribute.Type.STRING; + @Override + public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { + } + + @Override + protected Object process(Object obj) { + if (obj instanceof Object[]) { + StringBuffer sb=new StringBuffer(); + for (Object aObj : (Object[]) obj) { + sb.append(aObj); + } + return sb.toString(); + } else { + return obj.toString(); + } + + } + + @Override + public void destroy() { + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java new file mode 100644 index 0000000..55b7572 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -0,0 +1,343 @@ +/* + * Copyright 2005-2009 WSO2, Inc. (http://wso2.com) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.MessagingUtil; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.snapshot.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * CEP window processor to handle faulty member instances. This window processor is responsible for + * publishing MemberFault event if health stats are not received within a given time window. + */ +@SiddhiExtension(namespace = "stratos", function = "faultHandling") +public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); + + private static final int TIME_OUT = 60 * 1000; + public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; + public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10; + + private ExecutorService executorService; + private ScheduledExecutorService faultHandleScheduler; + private ScheduledFuture<?> lastSchedule; + private ThreadBarrier threadBarrier; + private long timeToKeep; + private ISchedulerSiddhiQueue<StreamEvent> window; + private EventPublisher healthStatPublisher = + EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); + private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + + // Map of member id's to their last received health event time stamp + private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + + // Event receiver to receive topology events published by cloud-controller + private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); + + // Stratos member id attribute index in stream execution plan + private int memberIdAttrIndex; + + @Override + protected void processEvent(InEvent event) { + addDataToMap(event); + } + + @Override + protected void processEvent(InListEvent listEvent) { + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + addDataToMap((InEvent) listEvent.getEvent(i)); + } + } + + /** + * Add new entry to time stamp map from the received event. + * + * @param event Event received by Siddhi. + */ + protected void addDataToMap(InEvent event) { + String id = (String) event.getData()[memberIdAttrIndex]; + //checking whether this member is the topology. + //sometimes there can be a delay between publishing member terminated events + //and actually terminating instances. Hence CEP might get events for already terminated members + //so we are checking the topology for the member existence + Member member = getMemberFromId(id); + if (null == member) { + log.debug("Member not found in the topology. Event rejected"); + return; + } + if (StringUtils.isNotEmpty(id)) { + memberTimeStampMap.put(id, event.getTimeStamp()); + } else { + log.warn("NULL member id found in the event received. Event rejected."); + } + if (log.isDebugEnabled()){ + log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + /** + * Retrieve the current activated members from the topology and initialize the timestamp map. + * This will allow the system to recover from a restart + * + * @param topology Topology model object + */ + boolean loadTimeStampMapFromTopology(Topology topology){ + + long currentTimeStamp = System.currentTimeMillis(); + if (topology == null || topology.getServices() == null){ + return false; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : topology.getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()) { + // we are checking faulty status only in previously activated members + if (member != null && MemberStatus.Active.equals(member.getStatus())) { + // Initialize the member time stamp map from the topology at the beginning + memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); + } + } + } + } + } + } + + if (log.isDebugEnabled()){ + log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + + memberTimeStampMap); + } + return true; + } + + private Member getMemberFromId(String memberId){ + if (StringUtils.isEmpty(memberId)){ + return null; + } + if (TopologyManager.getTopology().isInitialized()){ + try { + TopologyManager.acquireReadLock(); + if (TopologyManager.getTopology().getServices() == null){ + return null; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : TopologyManager.getTopology().getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()){ + if (memberId.equals(member.getMemberId())){ + return member; + } + } + } + } + } + } + } catch (Exception e) { + log.error("Error while reading topology" + e); + } finally { + TopologyManager.releaseReadLock(); + } + } + return null; + } + + private void publishMemberFault(String memberId){ + Member member = getMemberFromId(memberId); + if (member == null){ + log.warn("Failed to publish member fault event. Member having [member-id] " + memberId + + " does not exist in topology"); + return; + } + log.info("Publishing member fault event for [member-id] " + memberId); + + MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), + member.getMemberId(), member.getPartitionId(), + member.getNetworkPartitionId(), 0); + + memberFaultEventMessageMap.put("message", memberFaultEvent); + healthStatPublisher.publish(MemberFaultEventMap, true); + } + + @Override + public void run() { + try { + threadBarrier.pass(); + + for (Object o : memberTimeStampMap.entrySet()) { + Map.Entry pair = (Map.Entry) o; + long currentTime = System.currentTimeMillis(); + Long eventTimeStamp = (Long) pair.getValue(); + + if ((currentTime - eventTimeStamp) > TIME_OUT) { + log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); + publishMemberFault((String) pair.getKey()); + } + } + if (log.isDebugEnabled()){ + log.debug("Fault handling processor iteration completed with [time-stamp map length] " + + memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } finally { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState()}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String memberIdAttrName = ((Variable) parameters[1]).getAttributeName(); + memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName); + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); + + executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, + CEP_EXTENSION_THREAD_POOL_SIZE); + cepTopologyEventReceiver.setExecutorService(executorService); + cepTopologyEventReceiver.execute(); + + //Ordinary scheduling + window.schedule(); + if (log.isDebugEnabled()){ + log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + + ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); + } + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + @Override + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.faultHandleScheduler = scheduledExecutorService; + } + + @Override + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + // terminate topology listener thread + cepTopologyEventReceiver.terminate(); + window = null; + + // Shutdown executor service + if(executorService != null) { + try { + executorService.shutdownNow(); + } catch (Exception e) { + log.warn("An error occurred while shutting down cep extension executor service", e); + } + } + } + + public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { + return memberTimeStampMap; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java new file mode 100644 index 0000000..3d13533 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java @@ -0,0 +1,277 @@ +/* + * Copyright 2005-2009 WSO2, Inc. (http://wso2.com) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.log4j.Logger; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.event.remove.RemoveEvent; +import org.wso2.siddhi.core.event.remove.RemoveListEvent; +import org.wso2.siddhi.core.snapshot.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@SiddhiExtension(namespace = "stratos", function = "gradient") +public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class); + private ScheduledExecutorService eventRemoverScheduler; + private ScheduledFuture<?> lastSchedule; + private long timeToKeep; + private int subjectedAttrIndex; + private Attribute.Type subjectedAttrType; + private List<InEvent> newEventList; + private List<RemoveEvent> oldEventList; + private ThreadBarrier threadBarrier; + private ISchedulerSiddhiQueue<StreamEvent> window; + + @Override + protected void processEvent(InEvent event) { + acquireLock(); + try { + newEventList.add(event); + } finally { + releaseLock(); + } + } + + @Override + protected void processEvent(InListEvent listEvent) { + acquireLock(); + try { + System.out.println(listEvent); + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + newEventList.add((InEvent) listEvent.getEvent(i)); + } + } finally { + releaseLock(); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + + @Override + public void run() { + acquireLock(); + try { + long scheduledTime = System.currentTimeMillis(); + try { + oldEventList.clear(); + while (true) { + threadBarrier.pass(); + RemoveEvent removeEvent = (RemoveEvent) window.poll(); + if (removeEvent == null) { + if (oldEventList.size() > 0) { + nextProcessor.process(new RemoveListEvent( + oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); + oldEventList.clear(); + } + + if (newEventList.size() > 0) { + InEvent[] inEvents = + newEventList.toArray(new InEvent[newEventList.size()]); + for (InEvent inEvent : inEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + + InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]); + + for (InEvent inEvent : gradientEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + nextProcessor.process(new InListEvent(gradientEvents)); + + newEventList.clear(); + } + + long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); + if (diff > 0) { + try { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + log.warn("scheduling cannot be accepted for execution: elementID " + + elementId); + } + break; + } + scheduledTime = System.currentTimeMillis(); + } else { + oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); + } + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + } finally { + releaseLock(); + } + } + + + /** + * This function will calculate the linear gradient (per second) of the events received during + * a specified time period. + */ + private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) { + double firstVal = 0.0, lastVal = 0.0; + // FIXME I'm not sure whether there's some other good way to do correct casting, + // based on the type. + if (Type.DOUBLE.equals(subjectedAttrType)) { + firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.INT.equals(subjectedAttrType)) { + firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.LONG.equals(subjectedAttrType)) { + firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.FLOAT.equals(subjectedAttrType)) { + firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; + } + + long t1 = firstInEvent.getTimeStamp(); + long t2 = lastInEvent.getTimeStamp(); + long millisecondsForASecond = 1000; + long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; + double gradient = 0.0; + if (tGap > 0) { + gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; + } + if (log.isDebugEnabled()) { + log.debug("Gradient: " + gradient + " Last val: " + lastVal + + " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ + t2+" hash: "+this.hashCode()); + } + Object[] data = firstInEvent.getData().clone(); + data[subjectedAttrIndex] = gradient; + InEvent gradientEvent = + new InEvent(firstInEvent.getStreamId(), (t1+t2)/2, + data); + InEvent[] output = new InEvent[1]; + output[0] = gradientEvent; + return output; + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState(), oldEventList, newEventList}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + oldEventList = ((ArrayList<RemoveEvent>) data[1]); + newEventList = ((ArrayList<InEvent>) data[2]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); + subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); + + oldEventList = new ArrayList<RemoveEvent>(); + if (this.siddhiContext.isDistributedProcessingEnabled()) { + newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); + } else { + newEventList = new ArrayList<InEvent>(); + } + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + //Ordinary scheduling + window.schedule(); + + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.eventRemoverScheduler = scheduledExecutorService; + } + + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + oldEventList = null; + newEventList = null; + window = null; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java new file mode 100755 index 0000000..0dc24bd --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cep.extension; + +/** + * Member Request Handling Capability Window Processor + */ + +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +@SiddhiExtension(namespace = "stratos", function = "divider") +public class MemeberRequestHandlingCapabilityWindowProcessor extends FunctionExecutor { + + Attribute.Type returnType = Attribute.Type.DOUBLE; + + @Override + public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { + } + + @Override + protected Object process(Object obj) { + + double[] value = new double[2]; + if (obj instanceof Object[]) { + int i=0; + for (Object aObj : (Object[]) obj) { + value[i]= Double.parseDouble(String.valueOf(aObj)); + i++; + } + }//to do avoid deviding zero number of active instances won't be zero cz there is min + Double unit = (value[0] / value[1]); + if(!unit.isNaN() && !unit.isInfinite()) + return unit; + else + return 0.0; + + } + + @Override + public void destroy() { + + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java new file mode 100644 index 0000000..b04c084 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java @@ -0,0 +1,295 @@ +/* + * Copyright 2005-2009 WSO2, Inc. (http://wso2.com) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.log4j.Logger; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.event.remove.RemoveEvent; +import org.wso2.siddhi.core.event.remove.RemoveListEvent; +import org.wso2.siddhi.core.snapshot.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@SiddhiExtension(namespace = "stratos", function = "secondDerivative") +public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class); + private ScheduledExecutorService eventRemoverScheduler; + private ScheduledFuture<?> lastSchedule; + private long timeToKeep; + private int subjectedAttrIndex; + private Attribute.Type subjectedAttrType; + private List<InEvent> newEventList; + private List<RemoveEvent> oldEventList; + private ThreadBarrier threadBarrier; + private ISchedulerSiddhiQueue<StreamEvent> window; + + @Override + protected void processEvent(InEvent event) { + acquireLock(); + try { + newEventList.add(event); + } finally { + releaseLock(); + } + } + + @Override + protected void processEvent(InListEvent listEvent) { + acquireLock(); + try { + System.out.println(listEvent); + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + newEventList.add((InEvent) listEvent.getEvent(i)); + } + } finally { + releaseLock(); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + + @Override + public void run() { + acquireLock(); + try { + long scheduledTime = System.currentTimeMillis(); + try { + oldEventList.clear(); + while (true) { + threadBarrier.pass(); + RemoveEvent removeEvent = (RemoveEvent) window.poll(); + if (removeEvent == null) { + if (oldEventList.size() > 0) { + nextProcessor.process(new RemoveListEvent( + oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); + oldEventList.clear(); + } + + if (newEventList.size() > 0) { + InEvent[] inEvents = + newEventList.toArray(new InEvent[newEventList.size()]); + for (InEvent inEvent : inEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + + // in order to find second derivative, we need at least 3 events. + if (newEventList.size() > 2) { + + InEvent firstDerivative1 = + gradient(inEvents[0], + inEvents[(newEventList.size() / 2) - 1], + null)[0]; + InEvent firstDerivative2 = + gradient(inEvents[newEventList.size() / 2], + inEvents[newEventList.size() - 1], + null)[0]; + InEvent[] secondDerivative = + gradient(firstDerivative1, + firstDerivative2, Type.DOUBLE); + + for (InEvent inEvent : secondDerivative) { + window.put(new RemoveEvent(inEvent, -1)); + } + nextProcessor.process(new InListEvent(secondDerivative)); + } else { + log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " + + newEventList.size()); + } + + newEventList.clear(); + } + + long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); + if (diff > 0) { + try { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + log.warn("scheduling cannot be accepted for execution: elementID " + + elementId); + } + break; + } + scheduledTime = System.currentTimeMillis(); + } else { + oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); + } + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + } finally { + releaseLock(); + } + } + + + /** + * This function will calculate the linear gradient (per second) of the events received during + * a specified time period. + */ + private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) { + Type attrType = type == null ? subjectedAttrType : type; + double firstVal = 0.0, lastVal = 0.0; + // FIXME I'm not sure whether there's some other good way to do correct casting, + // based on the type. + if (Type.DOUBLE.equals(attrType)) { + firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.INT.equals(attrType)) { + firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.LONG.equals(attrType)) { + firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.FLOAT.equals(attrType)) { + firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; + } + + long t1 = firstInEvent.getTimeStamp(); + long t2 = lastInEvent.getTimeStamp(); + long millisecondsForASecond = 1000; + long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; + double gradient = 0.0; + if (tGap > 0) { + gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; + } + if (log.isDebugEnabled()) { + log.debug("Gradient: " + gradient + " Last val: " + lastVal + + " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ + t2+" hash: "+this.hashCode()); + } + Object[] data = firstInEvent.getData().clone(); + data[subjectedAttrIndex] = gradient; + InEvent gradientEvent = + new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2), + data); + InEvent[] output = new InEvent[1]; + output[0] = gradientEvent; + return output; + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState(), oldEventList, newEventList}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + oldEventList = ((ArrayList<RemoveEvent>) data[1]); + newEventList = ((ArrayList<InEvent>) data[2]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); + subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); + + oldEventList = new ArrayList<RemoveEvent>(); + if (this.siddhiContext.isDistributedProcessingEnabled()) { + newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); + } else { + newEventList = new ArrayList<InEvent>(); + } + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + //Ordinary scheduling + window.schedule(); + + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.eventRemoverScheduler = scheduledExecutorService; + } + + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + oldEventList = null; + newEventList = null; + window = null; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/extensions/cep/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/pom.xml b/extensions/cep/pom.xml index 3c42bcb..9393a04 100644 --- a/extensions/cep/pom.xml +++ b/extensions/cep/pom.xml @@ -33,6 +33,7 @@ <modules> <module>modules/distribution</module> - <module>modules/stratos-cep-extension</module> + <module>modules/stratos-cep-extension/wso2cep-3.0.0</module> + <module>modules/stratos-cep-extension/wso2cep-3.1.0</module> </modules> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/products/stratos/modules/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/pom.xml b/products/stratos/modules/distribution/pom.xml index 73348ab..9ef940a 100755 --- a/products/stratos/modules/distribution/pom.xml +++ b/products/stratos/modules/distribution/pom.xml @@ -71,7 +71,7 @@ </dependency> <dependency> <groupId>org.apache.stratos</groupId> - <artifactId>org.apache.stratos.cep.extension</artifactId> + <artifactId>org.apache.stratos.cep300.extension</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/stratos/blob/8ad1f6e7/products/stratos/modules/distribution/src/assembly/bin.xml ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/assembly/bin.xml b/products/stratos/modules/distribution/src/assembly/bin.xml index c60bd09..54a1a56 100755 --- a/products/stratos/modules/distribution/src/assembly/bin.xml +++ b/products/stratos/modules/distribution/src/assembly/bin.xml @@ -429,34 +429,34 @@ <!-- cep --> <!--creating an empty input event adaptors directory--> <fileSet> - <directory>../../../../extensions/cep/modules/artifacts/inputeventadaptors</directory> + <directory>../../../../extensions/cep/modules/artifacts/wso2cep-3.0.0/inputeventadaptors</directory> <outputDirectory> ${stratos.distribution.name}-${project.version}/repository/deployment/server/inputeventadaptors </outputDirectory> </fileSet> <!--creating an empty output event adaptors directory--> <fileSet> - <directory>../../../../extensions/cep/modules/artifacts/outputeventadaptors</directory> + <directory>../../../../extensions/cep/modules/artifacts/wso2cep-3.0.0/outputeventadaptors</directory> <outputDirectory> ${stratos.distribution.name}-${project.version}/repository/deployment/server/outputeventadaptors </outputDirectory> </fileSet> <!--creating an empty event builders directory--> <fileSet> - <directory>../../../../extensions/cep/modules/artifacts/eventbuilders</directory> + <directory>../../../../extensions/cep/modules/artifacts/wso2cep-3.0.0/eventbuilders</directory> <outputDirectory>${stratos.distribution.name}-${project.version}/repository/deployment/server/eventbuilders </outputDirectory> </fileSet> <!--creating an empty event formatters directory--> <fileSet> - <directory>../../../../extensions/cep/modules/artifacts/eventformatters</directory> + <directory>../../../../extensions/cep/modules/artifacts/wso2cep-3.0.0/eventformatters</directory> <outputDirectory> ${stratos.distribution.name}-${project.version}/repository/deployment/server/eventformatters </outputDirectory> </fileSet> <!--creating an empty execution plans directory--> <fileSet> - <directory>../../../../extensions/cep/modules/artifacts/executionplans</directory> + <directory>../../../../extensions/cep/modules/artifacts/wso2cep-3.0.0/executionplans</directory> <outputDirectory>${stratos.distribution.name}-${project.version}/repository/deployment/server/executionplans </outputDirectory> </fileSet> @@ -603,7 +603,7 @@ <filtered>true</filtered> </file> <file> - <source>../../../../extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml</source> + <source>../../../../extensions/cep/modules/artifacts/wso2cep-3.0.0/streamdefinitions/stream-manager-config.xml</source> <outputDirectory>${stratos.distribution.name}-${project.version}/repository/conf/</outputDirectory> <filtered>true</filtered> </file>
