http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java new file mode 100644 index 0000000..96cff22 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.log4j.Logger; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.event.remove.RemoveEvent; +import org.wso2.siddhi.core.event.remove.RemoveListEvent; +import org.wso2.siddhi.core.persistence.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@SiddhiExtension(namespace = "stratos", function = "secondDerivative") +public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class); + private ScheduledExecutorService eventRemoverScheduler; + private ScheduledFuture<?> lastSchedule; + private long timeToKeep; + private int subjectedAttrIndex; + private Attribute.Type subjectedAttrType; + private List<InEvent> newEventList; + private List<RemoveEvent> oldEventList; + private ThreadBarrier threadBarrier; + private ISchedulerSiddhiQueue<StreamEvent> window; + + @Override + protected void processEvent(InEvent event) { + acquireLock(); + try { + newEventList.add(event); + } finally { + releaseLock(); + } + } + + @Override + protected void processEvent(InListEvent listEvent) { + acquireLock(); + try { + System.out.println(listEvent); + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + newEventList.add((InEvent) listEvent.getEvent(i)); + } + } finally { + releaseLock(); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + + @Override + public void run() { + acquireLock(); + try { + long scheduledTime = System.currentTimeMillis(); + try { + oldEventList.clear(); + while (true) { + threadBarrier.pass(); + RemoveEvent removeEvent = (RemoveEvent) window.poll(); + if (removeEvent == null) { + if (oldEventList.size() > 0) { + nextProcessor.process(new RemoveListEvent( + oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); + oldEventList.clear(); + } + + if (newEventList.size() > 0) { + InEvent[] inEvents = + newEventList.toArray(new InEvent[newEventList.size()]); + for (InEvent inEvent : inEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + + // in order to find second derivative, we need at least 3 events. + if (newEventList.size() > 2) { + + InEvent firstDerivative1 = + gradient(inEvents[0], + inEvents[(newEventList.size() / 2) - 1], + null)[0]; + InEvent firstDerivative2 = + gradient(inEvents[newEventList.size() / 2], + inEvents[newEventList.size() - 1], + null)[0]; + InEvent[] secondDerivative = + gradient(firstDerivative1, + firstDerivative2, Type.DOUBLE); + + for (InEvent inEvent : secondDerivative) { + window.put(new RemoveEvent(inEvent, -1)); + } + nextProcessor.process(new InListEvent(secondDerivative)); + } else { + log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " + + newEventList.size()); + } + + newEventList.clear(); + } + + long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); + if (diff > 0) { + try { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + log.warn("scheduling cannot be accepted for execution: elementID " + + elementId); + } + break; + } + scheduledTime = System.currentTimeMillis(); + } else { + oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); + } + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + } finally { + releaseLock(); + } + } + + + /** + * This function will calculate the linear gradient (per second) of the events received during + * a specified time period. + */ + private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) { + Type attrType = type == null ? subjectedAttrType : type; + double firstVal = 0.0, lastVal = 0.0; + // FIXME I'm not sure whether there's some other good way to do correct casting, + // based on the type. + if (Type.DOUBLE.equals(attrType)) { + firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.INT.equals(attrType)) { + firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.LONG.equals(attrType)) { + firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.FLOAT.equals(attrType)) { + firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; + } + + long t1 = firstInEvent.getTimeStamp(); + long t2 = lastInEvent.getTimeStamp(); + long millisecondsForASecond = 1000; + long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; + double gradient = 0.0; + if (tGap > 0) { + gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; + } + if (log.isDebugEnabled()) { + log.debug("Gradient: " + gradient + " Last val: " + lastVal + + " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ + t2+" hash: "+this.hashCode()); + } + Object[] data = firstInEvent.getData().clone(); + data[subjectedAttrIndex] = gradient; + InEvent gradientEvent = + new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2), + data); + InEvent[] output = new InEvent[1]; + output[0] = gradientEvent; + return output; + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState(), oldEventList, newEventList}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + oldEventList = ((ArrayList<RemoveEvent>) data[1]); + newEventList = ((ArrayList<InEvent>) data[2]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); + subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); + + oldEventList = new ArrayList<RemoveEvent>(); + if (this.siddhiContext.isDistributedProcessingEnabled()) { + newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); + } else { + newEventList = new ArrayList<InEvent>(); + } + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + //Ordinary scheduling + window.schedule(); + + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.eventRemoverScheduler = scheduledExecutorService; + } + + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + oldEventList = null; + newEventList = null; + window = null; + } +}
http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/pom.xml b/extensions/cep/pom.xml new file mode 100644 index 0000000..7c82bdf --- /dev/null +++ b/extensions/cep/pom.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.stratos</groupId> + <artifactId>stratos-parent</artifactId> + <version>4.1.2</version> + </parent> + + <artifactId>cep-extensions</artifactId> + <packaging>pom</packaging> + <name>Apache Stratos - CEP Extensions</name> + <description>Apache Stratos CEP Extensions</description> + + <modules> + <module>modules/distribution</module> + <module>modules/stratos-cep-extension</module> + </modules> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/stratos-cep-extension/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/pom.xml b/extensions/cep/stratos-cep-extension/pom.xml deleted file mode 100644 index e34c79e..0000000 --- a/extensions/cep/stratos-cep-extension/pom.xml +++ /dev/null @@ -1,63 +0,0 @@ -<?xml version="1.0" encoding="utf-8"?> -<!-- - # Licensed to the Apache Software Foundation (ASF) under one - # or more contributor license agreements. See the NOTICE file - # distributed with this work for additional information - # regarding copyright ownership. The ASF licenses this file - # to you under the Apache License, Version 2.0 (the - # "License"); you may not use this file except in compliance - # with the License. You may obtain a copy of the License at - # - # http://www.apache.org/licenses/LICENSE-2.0 - # - # Unless required by applicable law or agreed to in writing, - # software distributed under the License is distributed on an - # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - # KIND, either express or implied. See the License for the - # specific language governing permissions and limitations - # under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.stratos</groupId> - <artifactId>stratos-extensions</artifactId> - <version>4.1.2</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>org.apache.stratos.cep.extension</artifactId> - <name>Apache Stratos - CEP Extensions</name> - <description>Apache Stratos CEP Extensions</description> - - <repositories> - <repository> - <id>wso2-maven2-repository</id> - <name>WSO2 Maven2 Repository</name> - <url>http://dist.wso2.org/maven2</url> - </repository> - </repositories> - - <dependencies> - <dependency> - <groupId>org.wso2.siddhi</groupId> - <artifactId>siddhi-core</artifactId> - <version>2.0.0-wso2v5</version> - </dependency> - <dependency> - <groupId>org.apache.stratos</groupId> - <artifactId>org.apache.stratos.messaging</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java deleted file mode 100644 index 59c70c5..0000000 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.cep.extension; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; -import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; -import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; -import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; -import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; -import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; -import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -/** - * CEP Topology Receiver for Fault Handling Window Processor. - */ -public class CEPTopologyEventReceiver extends TopologyEventReceiver { - - private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); - - private FaultHandlingWindowProcessor faultHandler; - - public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { - this.faultHandler = faultHandler; - addEventListeners(); - } - - @Override - public void execute() { - super.execute(); - log.info("CEP topology event receiver thread started"); - } - - private void addEventListeners() { - // Load member time stamp map from the topology as a one time task - addEventListener(new CompleteTopologyEventListener() { - private boolean initialized; - - @Override - protected void onEvent(Event event) { - if (!initialized) { - try { - TopologyManager.acquireReadLock(); - log.debug("Complete topology event received to fault handling window processor."); - CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; - initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); - } catch (Exception e) { - log.error("Error loading member time stamp map from complete topology event.", e); - } finally { - TopologyManager.releaseReadLock(); - } - } - } - }); - - // Remove member from the time stamp map when MemberTerminated event is received. - addEventListener(new MemberTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; - faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); - log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId()); - } - }); - - // Add member to time stamp map whenever member is activated - addEventListener(new MemberActivatedEventListener() { - @Override - protected void onEvent(Event event) { - MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; - - // do not put this member if we have already received a health event - faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), - System.currentTimeMillis()); - log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java deleted file mode 100644 index 699f036..0000000 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cep.extension; -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.executor.function.FunctionExecutor; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -@SiddhiExtension(namespace = "stratos", function = "concat") -public class ConcatWindowProcessor extends FunctionExecutor { - Attribute.Type returnType = Attribute.Type.STRING; - @Override - public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { - } - - @Override - protected Object process(Object obj) { - if (obj instanceof Object[]) { - StringBuffer sb=new StringBuffer(); - for (Object aObj : (Object[]) obj) { - sb.append(aObj); - } - return sb.toString(); - } else { - return obj.toString(); - } - - } - - @Override - public void destroy() { - } - - @Override - public Attribute.Type getReturnType() { - return returnType; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java deleted file mode 100644 index 0aa01ed..0000000 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cep.extension; - -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Logger; -import org.apache.stratos.common.threading.StratosThreadPool; -import org.apache.stratos.messaging.broker.publish.EventPublisher; -import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import org.apache.stratos.messaging.util.MessagingUtil; -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.event.StreamEvent; -import org.wso2.siddhi.core.event.in.InEvent; -import org.wso2.siddhi.core.event.in.InListEvent; -import org.wso2.siddhi.core.persistence.ThreadBarrier; -import org.wso2.siddhi.core.query.QueryPostProcessingElement; -import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; -import org.wso2.siddhi.core.query.processor.window.WindowProcessor; -import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; -import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import org.wso2.siddhi.query.api.expression.Expression; -import org.wso2.siddhi.query.api.expression.Variable; -import org.wso2.siddhi.query.api.expression.constant.IntConstant; -import org.wso2.siddhi.query.api.expression.constant.LongConstant; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -/** - * CEP window processor to handle faulty member instances. This window processor is responsible for - * publishing MemberFault event if health stats are not received within a given time window. - */ -@SiddhiExtension(namespace = "stratos", function = "faultHandling") -public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - - private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); - - private static final int TIME_OUT = 60 * 1000; - public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; - public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10; - - private ExecutorService executorService; - private ScheduledExecutorService faultHandleScheduler; - private ScheduledFuture<?> lastSchedule; - private ThreadBarrier threadBarrier; - private long timeToKeep; - private ISchedulerSiddhiQueue<StreamEvent> window; - private EventPublisher healthStatPublisher = - EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); - private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); - private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); - - // Map of member id's to their last received health event time stamp - private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); - - // Event receiver to receive topology events published by cloud-controller - private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); - - // Stratos member id attribute index in stream execution plan - private int memberIdAttrIndex; - - @Override - protected void processEvent(InEvent event) { - addDataToMap(event); - } - - @Override - protected void processEvent(InListEvent listEvent) { - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - addDataToMap((InEvent) listEvent.getEvent(i)); - } - } - - /** - * Add new entry to time stamp map from the received event. - * - * @param event Event received by Siddhi. - */ - protected void addDataToMap(InEvent event) { - String id = (String) event.getData()[memberIdAttrIndex]; - //checking whether this member is the topology. - //sometimes there can be a delay between publishing member terminated events - //and actually terminating instances. Hence CEP might get events for already terminated members - //so we are checking the topology for the member existence - Member member = getMemberFromId(id); - if (null == member) { - log.debug("Member not found in the topology. Event rejected"); - return; - } - if (StringUtils.isNotEmpty(id)) { - memberTimeStampMap.put(id, event.getTimeStamp()); - } else { - log.warn("NULL member id found in the event received. Event rejected."); - } - if (log.isDebugEnabled()){ - log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); - } - } - - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } - - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } - - /** - * Retrieve the current activated members from the topology and initialize the timestamp map. - * This will allow the system to recover from a restart - * - * @param topology Topology model object - */ - boolean loadTimeStampMapFromTopology(Topology topology){ - - long currentTimeStamp = System.currentTimeMillis(); - if (topology == null || topology.getServices() == null){ - return false; - } - // TODO make this efficient by adding APIs to messaging component - for (Service service : topology.getServices()) { - if (service.getClusters() != null) { - for (Cluster cluster : service.getClusters()) { - if (cluster.getMembers() != null) { - for (Member member : cluster.getMembers()) { - // we are checking faulty status only in previously activated members - if (member != null && MemberStatus.Active.equals(member.getStatus())) { - // Initialize the member time stamp map from the topology at the beginning - memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); - } - } - } - } - } - } - - if (log.isDebugEnabled()){ - log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + - memberTimeStampMap); - } - return true; - } - - private Member getMemberFromId(String memberId){ - if (StringUtils.isEmpty(memberId)){ - return null; - } - if (TopologyManager.getTopology().isInitialized()){ - try { - TopologyManager.acquireReadLock(); - if (TopologyManager.getTopology().getServices() == null){ - return null; - } - // TODO make this efficient by adding APIs to messaging component - for (Service service : TopologyManager.getTopology().getServices()) { - if (service.getClusters() != null) { - for (Cluster cluster : service.getClusters()) { - if (cluster.getMembers() != null) { - for (Member member : cluster.getMembers()){ - if (memberId.equals(member.getMemberId())){ - return member; - } - } - } - } - } - } - } catch (Exception e) { - log.error("Error while reading topology" + e); - } finally { - TopologyManager.releaseReadLock(); - } - } - return null; - } - - private void publishMemberFault(String memberId){ - Member member = getMemberFromId(memberId); - if (member == null){ - log.warn("Failed to publish member fault event. Member having [member-id] " + memberId + - " does not exist in topology"); - return; - } - log.info("Publishing member fault event for [member-id] " + memberId); - - MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), - member.getMemberId(), member.getPartitionId(), - member.getNetworkPartitionId(), 0); - - memberFaultEventMessageMap.put("message", memberFaultEvent); - healthStatPublisher.publish(MemberFaultEventMap, true); - } - - @Override - public void run() { - try { - threadBarrier.pass(); - - for (Object o : memberTimeStampMap.entrySet()) { - Map.Entry pair = (Map.Entry) o; - long currentTime = System.currentTimeMillis(); - Long eventTimeStamp = (Long) pair.getValue(); - - if ((currentTime - eventTimeStamp) > TIME_OUT) { - log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + - eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); - publishMemberFault((String) pair.getKey()); - } - } - if (log.isDebugEnabled()){ - log.debug("Fault handling processor iteration completed with [time-stamp map length] " + - memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); - } - } catch (Throwable t) { - log.error(t.getMessage(), t); - } finally { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - } - - @Override - protected Object[] currentState() { - return new Object[]{window.currentState()}; - } - - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - window.reSchedule(); - } - - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, - AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { - - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } - - String memberIdAttrName = ((Variable) parameters[1]).getAttributeName(); - memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName); - - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - - executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, - CEP_EXTENSION_THREAD_POOL_SIZE); - cepTopologyEventReceiver.setExecutorService(executorService); - cepTopologyEventReceiver.execute(); - - //Ordinary scheduling - window.schedule(); - if (log.isDebugEnabled()){ - log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + - ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + - ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); - } - } - - @Override - public void schedule() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - - @Override - public void scheduleNow() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.faultHandleScheduler = scheduledExecutorService; - } - - @Override - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } - - @Override - public void destroy(){ - // terminate topology listener thread - cepTopologyEventReceiver.terminate(); - window = null; - - // Shutdown executor service - if(executorService != null) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down cep extension executor service", e); - } - } - } - - public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { - return memberTimeStampMap; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java deleted file mode 100644 index dff0f79..0000000 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cep.extension; - -import org.apache.log4j.Logger; -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.event.StreamEvent; -import org.wso2.siddhi.core.event.in.InEvent; -import org.wso2.siddhi.core.event.in.InListEvent; -import org.wso2.siddhi.core.event.remove.RemoveEvent; -import org.wso2.siddhi.core.event.remove.RemoveListEvent; -import org.wso2.siddhi.core.persistence.ThreadBarrier; -import org.wso2.siddhi.core.query.QueryPostProcessingElement; -import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; -import org.wso2.siddhi.core.query.processor.window.WindowProcessor; -import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; -import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.definition.Attribute.Type; -import org.wso2.siddhi.query.api.expression.Expression; -import org.wso2.siddhi.query.api.expression.Variable; -import org.wso2.siddhi.query.api.expression.constant.IntConstant; -import org.wso2.siddhi.query.api.expression.constant.LongConstant; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -@SiddhiExtension(namespace = "stratos", function = "gradient") -public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - - static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class); - private ScheduledExecutorService eventRemoverScheduler; - private ScheduledFuture<?> lastSchedule; - private long timeToKeep; - private int subjectedAttrIndex; - private Attribute.Type subjectedAttrType; - private List<InEvent> newEventList; - private List<RemoveEvent> oldEventList; - private ThreadBarrier threadBarrier; - private ISchedulerSiddhiQueue<StreamEvent> window; - - @Override - protected void processEvent(InEvent event) { - acquireLock(); - try { - newEventList.add(event); - } finally { - releaseLock(); - } - } - - @Override - protected void processEvent(InListEvent listEvent) { - acquireLock(); - try { - System.out.println(listEvent); - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - newEventList.add((InEvent) listEvent.getEvent(i)); - } - } finally { - releaseLock(); - } - } - - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } - - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } - - - @Override - public void run() { - acquireLock(); - try { - long scheduledTime = System.currentTimeMillis(); - try { - oldEventList.clear(); - while (true) { - threadBarrier.pass(); - RemoveEvent removeEvent = (RemoveEvent) window.poll(); - if (removeEvent == null) { - if (oldEventList.size() > 0) { - nextProcessor.process(new RemoveListEvent( - oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); - oldEventList.clear(); - } - - if (newEventList.size() > 0) { - InEvent[] inEvents = - newEventList.toArray(new InEvent[newEventList.size()]); - for (InEvent inEvent : inEvents) { - window.put(new RemoveEvent(inEvent, -1)); - } - - InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]); - - for (InEvent inEvent : gradientEvents) { - window.put(new RemoveEvent(inEvent, -1)); - } - nextProcessor.process(new InListEvent(gradientEvents)); - - newEventList.clear(); - } - - long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); - if (diff > 0) { - try { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ex) { - log.warn("scheduling cannot be accepted for execution: elementID " + - elementId); - } - break; - } - scheduledTime = System.currentTimeMillis(); - } else { - oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); - } - } - } catch (Throwable t) { - log.error(t.getMessage(), t); - } - } finally { - releaseLock(); - } - } - - - /** - * This function will calculate the linear gradient (per second) of the events received during - * a specified time period. - */ - private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) { - double firstVal = 0.0, lastVal = 0.0; - // FIXME I'm not sure whether there's some other good way to do correct casting, - // based on the type. - if (Type.DOUBLE.equals(subjectedAttrType)) { - firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.INT.equals(subjectedAttrType)) { - firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.LONG.equals(subjectedAttrType)) { - firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.FLOAT.equals(subjectedAttrType)) { - firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; - } - - long t1 = firstInEvent.getTimeStamp(); - long t2 = lastInEvent.getTimeStamp(); - long millisecondsForASecond = 1000; - long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; - double gradient = 0.0; - if (tGap > 0) { - gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; - } - if (log.isDebugEnabled()) { - log.debug("Gradient: " + gradient + " Last val: " + lastVal + - " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ - t2+" hash: "+this.hashCode()); - } - Object[] data = firstInEvent.getData().clone(); - data[subjectedAttrIndex] = gradient; - InEvent gradientEvent = - new InEvent(firstInEvent.getStreamId(), (t1+t2)/2, - data); - InEvent[] output = new InEvent[1]; - output[0] = gradientEvent; - return output; - } - - @Override - protected Object[] currentState() { - return new Object[]{window.currentState(), oldEventList, newEventList}; - } - - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - oldEventList = ((ArrayList<RemoveEvent>) data[1]); - newEventList = ((ArrayList<InEvent>) data[2]); - window.reSchedule(); - } - - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } - - String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); - subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); - subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); - - oldEventList = new ArrayList<RemoveEvent>(); - if (this.siddhiContext.isDistributedProcessingEnabled()) { - newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); - } else { - newEventList = new ArrayList<InEvent>(); - } - - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - //Ordinary scheduling - window.schedule(); - - } - - @Override - public void schedule() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - - public void scheduleNow() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.eventRemoverScheduler = scheduledExecutorService; - } - - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } - - @Override - public void destroy(){ - oldEventList = null; - newEventList = null; - window = null; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java deleted file mode 100755 index 0dc24bd..0000000 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.cep.extension; - -/** - * Member Request Handling Capability Window Processor - */ - -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.executor.function.FunctionExecutor; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -@SiddhiExtension(namespace = "stratos", function = "divider") -public class MemeberRequestHandlingCapabilityWindowProcessor extends FunctionExecutor { - - Attribute.Type returnType = Attribute.Type.DOUBLE; - - @Override - public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { - } - - @Override - protected Object process(Object obj) { - - double[] value = new double[2]; - if (obj instanceof Object[]) { - int i=0; - for (Object aObj : (Object[]) obj) { - value[i]= Double.parseDouble(String.valueOf(aObj)); - i++; - } - }//to do avoid deviding zero number of active instances won't be zero cz there is min - Double unit = (value[0] / value[1]); - if(!unit.isNaN() && !unit.isInfinite()) - return unit; - else - return 0.0; - - } - - @Override - public void destroy() { - - } - - @Override - public Attribute.Type getReturnType() { - return returnType; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java deleted file mode 100644 index 96cff22..0000000 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cep.extension; - -import org.apache.log4j.Logger; -import org.wso2.siddhi.core.config.SiddhiContext; -import org.wso2.siddhi.core.event.StreamEvent; -import org.wso2.siddhi.core.event.in.InEvent; -import org.wso2.siddhi.core.event.in.InListEvent; -import org.wso2.siddhi.core.event.remove.RemoveEvent; -import org.wso2.siddhi.core.event.remove.RemoveListEvent; -import org.wso2.siddhi.core.persistence.ThreadBarrier; -import org.wso2.siddhi.core.query.QueryPostProcessingElement; -import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; -import org.wso2.siddhi.core.query.processor.window.WindowProcessor; -import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; -import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; -import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.definition.Attribute.Type; -import org.wso2.siddhi.query.api.expression.Expression; -import org.wso2.siddhi.query.api.expression.Variable; -import org.wso2.siddhi.query.api.expression.constant.IntConstant; -import org.wso2.siddhi.query.api.expression.constant.LongConstant; -import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -@SiddhiExtension(namespace = "stratos", function = "secondDerivative") -public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { - - static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class); - private ScheduledExecutorService eventRemoverScheduler; - private ScheduledFuture<?> lastSchedule; - private long timeToKeep; - private int subjectedAttrIndex; - private Attribute.Type subjectedAttrType; - private List<InEvent> newEventList; - private List<RemoveEvent> oldEventList; - private ThreadBarrier threadBarrier; - private ISchedulerSiddhiQueue<StreamEvent> window; - - @Override - protected void processEvent(InEvent event) { - acquireLock(); - try { - newEventList.add(event); - } finally { - releaseLock(); - } - } - - @Override - protected void processEvent(InListEvent listEvent) { - acquireLock(); - try { - System.out.println(listEvent); - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - newEventList.add((InEvent) listEvent.getEvent(i)); - } - } finally { - releaseLock(); - } - } - - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } - - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } - - - @Override - public void run() { - acquireLock(); - try { - long scheduledTime = System.currentTimeMillis(); - try { - oldEventList.clear(); - while (true) { - threadBarrier.pass(); - RemoveEvent removeEvent = (RemoveEvent) window.poll(); - if (removeEvent == null) { - if (oldEventList.size() > 0) { - nextProcessor.process(new RemoveListEvent( - oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); - oldEventList.clear(); - } - - if (newEventList.size() > 0) { - InEvent[] inEvents = - newEventList.toArray(new InEvent[newEventList.size()]); - for (InEvent inEvent : inEvents) { - window.put(new RemoveEvent(inEvent, -1)); - } - - // in order to find second derivative, we need at least 3 events. - if (newEventList.size() > 2) { - - InEvent firstDerivative1 = - gradient(inEvents[0], - inEvents[(newEventList.size() / 2) - 1], - null)[0]; - InEvent firstDerivative2 = - gradient(inEvents[newEventList.size() / 2], - inEvents[newEventList.size() - 1], - null)[0]; - InEvent[] secondDerivative = - gradient(firstDerivative1, - firstDerivative2, Type.DOUBLE); - - for (InEvent inEvent : secondDerivative) { - window.put(new RemoveEvent(inEvent, -1)); - } - nextProcessor.process(new InListEvent(secondDerivative)); - } else { - log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " + - newEventList.size()); - } - - newEventList.clear(); - } - - long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); - if (diff > 0) { - try { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ex) { - log.warn("scheduling cannot be accepted for execution: elementID " + - elementId); - } - break; - } - scheduledTime = System.currentTimeMillis(); - } else { - oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); - } - } - } catch (Throwable t) { - log.error(t.getMessage(), t); - } - } finally { - releaseLock(); - } - } - - - /** - * This function will calculate the linear gradient (per second) of the events received during - * a specified time period. - */ - private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) { - Type attrType = type == null ? subjectedAttrType : type; - double firstVal = 0.0, lastVal = 0.0; - // FIXME I'm not sure whether there's some other good way to do correct casting, - // based on the type. - if (Type.DOUBLE.equals(attrType)) { - firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.INT.equals(attrType)) { - firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.LONG.equals(attrType)) { - firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; - } else if (Type.FLOAT.equals(attrType)) { - firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; - lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; - } - - long t1 = firstInEvent.getTimeStamp(); - long t2 = lastInEvent.getTimeStamp(); - long millisecondsForASecond = 1000; - long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; - double gradient = 0.0; - if (tGap > 0) { - gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; - } - if (log.isDebugEnabled()) { - log.debug("Gradient: " + gradient + " Last val: " + lastVal + - " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ - t2+" hash: "+this.hashCode()); - } - Object[] data = firstInEvent.getData().clone(); - data[subjectedAttrIndex] = gradient; - InEvent gradientEvent = - new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2), - data); - InEvent[] output = new InEvent[1]; - output[0] = gradientEvent; - return output; - } - - @Override - protected Object[] currentState() { - return new Object[]{window.currentState(), oldEventList, newEventList}; - } - - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - oldEventList = ((ArrayList<RemoveEvent>) data[1]); - newEventList = ((ArrayList<InEvent>) data[2]); - window.reSchedule(); - } - - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } - - String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); - subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); - subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); - - oldEventList = new ArrayList<RemoveEvent>(); - if (this.siddhiContext.isDistributedProcessingEnabled()) { - newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); - } else { - newEventList = new ArrayList<InEvent>(); - } - - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - //Ordinary scheduling - window.schedule(); - - } - - @Override - public void schedule() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } - - public void scheduleNow() { - if (lastSchedule != null) { - lastSchedule.cancel(false); - } - lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.eventRemoverScheduler = scheduledExecutorService; - } - - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } - - @Override - public void destroy(){ - oldEventList = null; - newEventList = null; - window = null; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/load-balancer/haproxy-extension/INSTALL.md ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/INSTALL.md b/extensions/load-balancer/haproxy-extension/INSTALL.md deleted file mode 100644 index 826419f..0000000 --- a/extensions/load-balancer/haproxy-extension/INSTALL.md +++ /dev/null @@ -1,32 +0,0 @@ -# Installing Apache Stratos HAProxy Extension - -Apache Stratos HAProxy Extension could be used for integrating HAProxy load balancer with Apache Stratos. Please follow -below steps to proceed with the installation: - -1. Download and extract HAProxy binary distribution to a desired location: <haproxy-home>. - -2. Extract org.apache.stratos.haproxy.extension-<version>.zip to a desired location: <haproxy-extension-home>. - -3. Open <haproxy-extension-home>/bin/haproxy-extension.sh file in a text editor and update following system properties: - ``` - # Define haproxy host private ip address: - -Dhaproxy.private.ip=127.0.0.1 - - # Define the haproxy executable file path: - -Dexecutable.file.path=<haproxy-home>/haproxy - - # Enable/disable cep statistics publisher: - -Dcep.stats.publisher.enabled=false - - # If cep statistics publisher is enabled define the following properties: - -Dthrift.receiver.ip=127.0.0.1 - -Dthrift.receiver.port=7615 - -Dnetwork.partition.id=network-partition-1 - ``` - -4. Open <haproxy-extension-home>/conf/jndi.properties file in a text editor and update message broker information: - ``` - java.naming.provider.url=tcp://localhost:61616 - ``` -5. Run <haproxy-extension-home>/bin/haproxy-extension.sh as the root user. - http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/load-balancer/haproxy-extension/README.md ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/README.md b/extensions/load-balancer/haproxy-extension/README.md deleted file mode 100644 index 50a49a1..0000000 --- a/extensions/load-balancer/haproxy-extension/README.md +++ /dev/null @@ -1,20 +0,0 @@ -# Apache Stratos HAProxy Extension - -Apache Stratos HAProxy extension is a load balancer extension for HAProxy. It is an executable program -which can manage the life-cycle of a HAProxy instance according to the topology, composite application model, -tenant application signups and domain mapping information received from Stratos via the message broker. - -## How it works -1. Wait for the complete topology event message to initialize the topology. -2. Configure and start an instance of HAProxy. -3. Listen to topology, application, application signup, domain mapping events. -4. Reload HAProxy instance with the new topology configuration. -5. Periodically publish statistics to Complex Event Processor (CEP). - -## Installation -Please refer INSTALL.md for information on the installation process. - -Thanks to Vaadin for HAProxyController implementation: -https://vaadin.com/license -http://dev.vaadin.com/browser/svn/incubator/Arvue/ArvueMaster/src/org/vaadin/arvue/arvuemaster/HAProxyController.java - http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/load-balancer/haproxy-extension/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/pom.xml b/extensions/load-balancer/haproxy-extension/pom.xml deleted file mode 100644 index d90641b..0000000 --- a/extensions/load-balancer/haproxy-extension/pom.xml +++ /dev/null @@ -1,109 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.stratos</groupId> - <artifactId>stratos-load-balancer-extensions</artifactId> - <version>4.1.2</version> - </parent> - - <artifactId>apache-stratos-haproxy-extension</artifactId> - <name>Apache Stratos - HAProxy Extension</name> - <description>Apache Stratos HAProxy Extension for Load Balancing</description> - - <dependencies> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.7.5</version> - </dependency> - <dependency> - <groupId>org.apache.stratos</groupId> - <artifactId>org.apache.stratos.common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.stratos</groupId> - <artifactId>org.apache.stratos.messaging</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.stratos</groupId> - <artifactId>org.apache.stratos.load.balancer.extension.api</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <version>2.0</version> - </dependency> - <dependency> - <groupId>org.apache.velocity</groupId> - <artifactId>velocity</artifactId> - <version>1.7</version> - </dependency> - <dependency> - <groupId>org.wso2.andes.wso2</groupId> - <artifactId>andes-client</artifactId> - <version>0.13.wso2v8</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <archive> - <manifest> - <mainClass>org.apache.stratos.haproxy.extension.Main</mainClass> - </manifest> - </archive> - </configuration> - </plugin> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptors> - <descriptor>src/main/assembly/bin.xml</descriptor> - </descriptors> - <archiverConfig> - <fileMode>420</fileMode> - <directoryMode>493</directoryMode> - <defaultDirectoryMode>493</defaultDirectoryMode> - </archiverConfig> - <appendAssemblyId>false</appendAssemblyId> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>attached</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/load-balancer/haproxy-extension/src/main/assembly/bin.xml ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/assembly/bin.xml b/extensions/load-balancer/haproxy-extension/src/main/assembly/bin.xml deleted file mode 100644 index 5bfa02e..0000000 --- a/extensions/load-balancer/haproxy-extension/src/main/assembly/bin.xml +++ /dev/null @@ -1,106 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - <id>bin</id> - <formats> - <format>zip</format> - </formats> - <fileSets> - <fileSet> - <directory>${project.basedir}/src/main/bin</directory> - <outputDirectory>/bin</outputDirectory> - <fileMode>0755</fileMode> - <includes> - <include>haproxy-extension.sh</include> - </includes> - </fileSet> - <fileSet> - <directory>${project.basedir}/src/main/conf</directory> - <outputDirectory>/conf</outputDirectory> - <fileMode>0600</fileMode> - <includes> - <include>jndi.properties</include> - <include>log4j.properties</include> - <include>thrift-client-config.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${project.basedir}/src/main/security</directory> - <outputDirectory>/security</outputDirectory> - <fileMode>0600</fileMode> - <includes> - <include>client-truststore.jks</include> - </includes> - </fileSet> - <fileSet> - <directory>${project.basedir}/src/main/templates</directory> - <outputDirectory>/templates</outputDirectory> - <fileMode>0600</fileMode> - <includes> - <include>haproxy.cfg.template</include> - </includes> - </fileSet> - <fileSet> - <directory>${project.basedir}/src/main/scripts</directory> - <outputDirectory>/scripts</outputDirectory> - <fileMode>0755</fileMode> - <includes> - <include>get-weight.sh</include> - </includes> - </fileSet> - <fileSet> - <directory>${project.basedir}</directory> - <outputDirectory>/</outputDirectory> - <fileMode>0600</fileMode> - <includes> - <include>DISCLAIMER</include> - <include>README*</include> - <include>LICENSE*</include> - <include>INSTALL*</include> - </includes> - </fileSet> - <fileSet> - <directory>${project.basedir}/src/main/license</directory> - <outputDirectory>/</outputDirectory> - <fileMode>0600</fileMode> - </fileSet> - <fileSet> - <directory>${project.basedir}/src/main/notice</directory> - <outputDirectory>/</outputDirectory> - <fileMode>0600</fileMode> - </fileSet> - </fileSets> - <dependencySets> - <dependencySet> - <outputDirectory>/lib</outputDirectory> - <excludes> - <exclude>*:icu4j*</exclude> - <exclude>*:jaxen*</exclude> - <exclude>*:jboss-transaction-api*</exclude> - <exclude>*:wrapper*</exclude> - <exclude>*:xom*</exclude> - </excludes> - <useProjectArtifact>true</useProjectArtifact> - <scope>runtime</scope> - </dependencySet> - </dependencySets> -</assembly> http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh b/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh deleted file mode 100755 index bc17399..0000000 --- a/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash -# -------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# -------------------------------------------------------------- - -echo "Starting haproxy extension..." -script_path="$( cd -P "$( dirname "$SOURCE" )" && pwd )/`dirname $0`" -lib_path=${script_path}/../lib/ -class_path=`echo ${lib_path}/*.jar | tr ' ' ':'` -properties="-Dhaproxy.private.ip=127.0.0.1 - -Dexecutable.file.path=haproxy - -Djndi.properties.dir=${script_path}/../conf - -Dtemplates.path=${script_path}/../templates - -Dtemplates.name=haproxy.cfg.template - -Dscripts.path=${script_path}/../scripts - -Dconf.file.path=/tmp/haproxy.cfg - -Dstats.socket.file.path=/tmp/haproxy-stats.socket - -Dlog4j.properties.file.path=${script_path}/../conf/log4j.properties - -Djavax.net.ssl.trustStore=${script_path}/../security/client-truststore.jks - -Djavax.net.ssl.trustStorePassword=wso2carbon - -Dthrift.client.config.file.path=${script_path}/../conf/thrift-client-config.xml - -Dcep.stats.publisher.enabled=false - -Dthrift.receiver.ip=127.0.0.1 - -Dthrift.receiver.port=7615 - -Dnetwork.partition.id=network-partition-1 - -Dcluster.id=cluster-1 - -Dservice.name=service-1" - - -# Uncomment below line to enable remote debugging -#debug="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005" - -java -cp "${class_path}" ${properties} ${debug} org.apache.stratos.haproxy.extension.Main $* http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/load-balancer/haproxy-extension/src/main/conf/jndi.properties ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/conf/jndi.properties b/extensions/load-balancer/haproxy-extension/src/main/conf/jndi.properties deleted file mode 100644 index 21d7420..0000000 --- a/extensions/load-balancer/haproxy-extension/src/main/conf/jndi.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -connectionfactoryName=TopicConnectionFactory -java.naming.provider.url=tcp://localhost:61616 -java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/load-balancer/haproxy-extension/src/main/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/conf/log4j.properties b/extensions/load-balancer/haproxy-extension/src/main/conf/log4j.properties deleted file mode 100644 index ec45878..0000000 --- a/extensions/load-balancer/haproxy-extension/src/main/conf/log4j.properties +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Set root logger level and appenders -log4j.rootLogger=INFO, CONSOLE_APPENDER, FILE_APPENDER - -# CONSOLE_APPENDER is set to be a ConsoleAppender. -log4j.appender.CONSOLE_APPENDER=org.apache.log4j.ConsoleAppender - -# The standard error log where all the warnings, errors and fatal errors will be logged -log4j.appender.FILE_APPENDER=org.apache.log4j.FileAppender -log4j.appender.FILE_APPENDER.File=logs/haproxy-extension.log -log4j.appender.FILE_APPENDER.layout=org.apache.log4j.PatternLayout -log4j.appender.FILE_APPENDER.layout.ConversionPattern=%d{ISO8601} [%X{ip}-%X{host}] [%t] %5p %c{1} %m%n -log4j.appender.FILE_APPENDER.threshold=DEBUG - -# CONSOLE_APPENDER uses PatternLayout. -log4j.appender.CONSOLE_APPENDER.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE_APPENDER.layout.ConversionPattern=[%d{ISO8601}] %5p - [%c{1}] %m%n - -log4j.logger.org.apache.stratos.haproxy.extension=INFO -log4j.logger.org.apache.stratos.load.balancer.extension.api=INFO -log4j.logger.org.apache.stratos.messaging=INFO -log4j.logger.org.wso2.andes.client=ERROR \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/load-balancer/haproxy-extension/src/main/conf/thrift-client-config.xml ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/conf/thrift-client-config.xml b/extensions/load-balancer/haproxy-extension/src/main/conf/thrift-client-config.xml deleted file mode 100644 index 5cacada..0000000 --- a/extensions/load-balancer/haproxy-extension/src/main/conf/thrift-client-config.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<!-- Apache thrift client configuration for publishing statistics to WSO2 CEP --> -<thriftClientConfiguration> - <username>admin</username> - <password>admin</password> - <ip>localhost</ip> - <port>7611</port> -</thriftClientConfiguration> \ No newline at end of file
