Updated Branches: refs/heads/master f02cc2bf9 -> ff900c43c
CEP extention to detect inactive health publishers Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ff900c43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ff900c43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ff900c43 Branch: refs/heads/master Commit: ff900c43c7cb050a624994a6d6e08e26247363f5 Parents: f02cc2b Author: Manula Thantriwatte <[email protected]> Authored: Thu Nov 21 16:11:35 2013 +0530 Committer: Manula Thantriwatte <[email protected]> Committed: Thu Nov 21 16:11:35 2013 +0530 ---------------------------------------------------------------------- .../extension/FaultHandlingWindowProcessor.java | 180 +++++++++++++++++++ .../src/main/bin/health-publisher.sh | 2 +- .../agent/health/publisher/HealthPublisher.java | 23 +-- .../health/publisher/HealthPublisherClient.java | 54 +++--- .../cartridge/agent/health/publisher/Main.java | 26 +-- 5 files changed, 234 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java new file mode 100644 index 0000000..a995331 --- /dev/null +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.log4j.Logger; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.persistence.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@SiddhiExtension(namespace = "stratos", function = "faultHandling") +public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + private static final int MILI_TO_MINUTE = 1000; + private static final int TIME_OUT = 100; + + static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); + private ScheduledExecutorService eventRemoverScheduler; + private int subjectedAttrIndex; + private ThreadBarrier threadBarrier; + private long timeToKeep; + private ISchedulerSiddhiQueue<StreamEvent> window; + private ConcurrentHashMap<String, InEvent> timeStampMap = new ConcurrentHashMap<String, InEvent>(); + private String memberID; + + @Override + protected void processEvent(InEvent event) { + addDataToMap(event); + } + + @Override + protected void processEvent(InListEvent listEvent) { + System.out.println(listEvent); + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + addDataToMap((InEvent) listEvent.getEvent(i)); + } + } + + protected void addDataToMap(InEvent event) { + if (memberID != null) { + String id = (String)event.getData()[subjectedAttrIndex]; + timeStampMap.put(id, event); + } + else { + System.out.println("Member ID null"); + log.error("NULL Member ID"); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + @Override + public void run() { + try { + while (true) { + threadBarrier.pass(); + Iterator it = timeStampMap.entrySet().iterator(); + + while ( it.hasNext() ) { + Map.Entry pair = (Map.Entry)it.next(); + long currentTime = System.currentTimeMillis(); + InEvent event = (InEvent)pair.getValue(); + + if ((currentTime - event.getTimeStamp()) / MILI_TO_MINUTE > TIME_OUT) { + log.info("Member Inactive : " + pair.getKey()); + it.remove(); + nextProcessor.process(event); + } + } + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState()}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + memberID = ((Variable)parameters[1]).getAttributeName(); + + String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); + subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + + //Ordinary scheduling + window.schedule(); + + } + + @Override + public void schedule() { + eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + public void scheduleNow() { + eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.eventRemoverScheduler = scheduledExecutorService; + } + + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + window = null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh ---------------------------------------------------------------------- diff --git a/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh b/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh index 2573f32..f5c0104 100755 --- a/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh +++ b/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh @@ -30,5 +30,5 @@ current_path=`pwd` java -cp $class_path -Dmember.id=$1 -Dkey.file.path=$current_path/../security/client-truststore.jks -Dthrift.receiver.ip=$2 -Dthrift.receiver.port=$3 org.apache.stratos.cartridge.agent.health.publisher.Main $* -echo "Health publisher completed" +echo "Health publisher completed" http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java ---------------------------------------------------------------------- diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java index 7d56d06..671aa85 100644 --- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java +++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java @@ -1,18 +1,18 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -69,7 +69,8 @@ public class HealthPublisher implements Observer { " 'metaData':[]," + " 'payloadData':[" + " {'name':'health_description','type':'STRING'}," + - " {'name':'value','type':'INT'}" + + " {'name':'value','type':'DOUBLE'}," + + " {'name':'member_id','type':'STRING'}" + " ]" + "}"; asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION); @@ -83,23 +84,25 @@ public class HealthPublisher implements Observer { public void update(Observable arg0, Object arg1) { if (arg1 != null && arg1 instanceof Map<?, ?>) { - Map<String, Integer> stats = (Map<String, Integer>) arg1; + Map<String, Double> stats = (Map<String, Double>) arg1; publishEvents(stats); } } public void update(Object healthStatObj) { if (healthStatObj != null && healthStatObj instanceof Map<?, ?>) { - Map<String, Integer> stats = (Map<String, Integer>) healthStatObj; + Map<String, Double> stats = (Map<String, Double>) healthStatObj; publishEvents(stats); } } - private void publishEvents(Map<String, Integer> stats) { + private void publishEvents(Map<String, Double> stats) { + + String memberID = System.getProperty("member.id"); - for (Map.Entry<String, Integer> entry : stats.entrySet()) { + for (Map.Entry<String, Double> entry : stats.entrySet()) { - Object[] payload = new Object[]{entry.getKey(), entry.getValue()}; + Object[] payload = new Object[]{entry.getKey(), entry.getValue(), memberID}; Event event = eventObject(null, null, payload, new HashMap<String, String>()); try { asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java ---------------------------------------------------------------------- diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java index ed270c4..580e499 100644 --- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java +++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java @@ -32,37 +32,37 @@ public class HealthPublisherClient { private static final int MB = 1024 * 1024; - public Object getHealthStats() { - - String memberID = System.getProperty("member.id"); + public Object getHealthStats() { Runtime runtime = Runtime.getRuntime(); - Map<String, Object> statsMap = new HashMap<String, Object>(); + Map<String, Double> statsMap = new HashMap<String, Double>(); //statsMap.put("Available Processors", (int)runtime.availableProcessors()); - statsMap.put("total_memory", (int)(runtime.totalMemory() / MB)); - statsMap.put("max_memory", (int)(runtime.maxMemory() / MB)); - statsMap.put("used_memory", (int)((runtime.totalMemory() - runtime.freeMemory()) / MB)); - statsMap.put("free_memory", (int)(runtime.freeMemory() / MB)); - statsMap.put("load_average", (int)ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()); - statsMap.put("member_id", memberID); + statsMap.put("total_memory", (double)(runtime.totalMemory() / MB)); + //statsMap.put("max_memory", (int)(runtime.maxMemory() / MB)); + statsMap.put("used_memory", (double)((runtime.totalMemory() - runtime.freeMemory()) / MB)); + //statsMap.put("free_memory", (int)(runtime.freeMemory() / MB)); + statsMap.put("load_average", (double)ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()); + //statsMap.put("member_id", Integer.parseInt(memberID)); + + Object statObj = (Object)statsMap; + + return statObj; + } + + public void run() { + try { + HealthPublisher publisher = new HealthPublisher(); + + while (true) { + Object healthStatObj = getHealthStats(); + publisher.update(healthStatObj); - return statsMap; - } - - public void run() { - try { - HealthPublisher publisher = new HealthPublisher(); - - while (true) { - Object healthStatObj = getHealthStats(); - publisher.update(healthStatObj); - - Thread.sleep(10000); - } - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } + Thread.sleep(10000); + } + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java ---------------------------------------------------------------------- diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java index 13a8e67..98c6224 100644 --- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java +++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java @@ -27,27 +27,27 @@ import org.apache.commons.logging.LogFactory; */ public class Main { - private static final Log log = LogFactory.getLog(Main.class); - - public static void main (String args[]) { - try { + private static final Log log = LogFactory.getLog(Main.class); + + public static void main (String args[]) { + try { if (log.isInfoEnabled()) { log.info("Health publisher started"); } - + System.out.println("This is health stat publisher module"); - + HealthPublisherClient client = new HealthPublisherClient(); client.run(); - + System.exit(0); - - } catch (Exception e) { + + } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Could not publish health stats", e); } - } - - System.exit(-1); - } + } + + System.exit(-1); + } } \ No newline at end of file
