http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfHealthRequest.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfHealthRequest.xml b/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfHealthRequest.xml new file mode 100644 index 0000000..0fa4a0a --- /dev/null +++ b/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfHealthRequest.xml @@ -0,0 +1,68 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<executionPlan name="SecondDerivativeOfHealthRequest" + statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventprocessor"> + <description>This will find the second derivative of health stats over a minute.</description> + <siddhiConfiguration> + <property name="siddhi.enable.distributed.processing">false</property> + <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property> + </siddhiConfiguration> + <importedStreams> + <stream as="health_second_der_request" name="cartridge_agent_health_stats" version="1.0.0"/> + </importedStreams> + <queryExpressions><![CDATA[ + from health_second_der_request + select member_id, cluster_id, cluster_instance_id, network_partition_id, health_description, value, + stratos:concat(cluster_id, '-' ,cluster_instance_id) as health_second_der_cluster_network + insert into health_second_der_concat; + define partition health_second_der_cluster_partition by health_second_der_concat.health_second_der_cluster_network; + from health_second_der_concat [health_description == 'memory_consumption'] + #window.stratos:secondDerivative(1 min, value) select cluster_id, cluster_instance_id, network_partition_id, value as second_derivative_memory_consumption + insert into second_derivative_memory_consumption_stats + partition by health_second_der_cluster_partition; + from health_second_der_concat [health_description == 'load_average'] + #window.stratos:secondDerivative(1 min, value) select cluster_id, cluster_instance_id, network_partition_id, value as second_derivative_load_average + insert into second_derivative_load_average_stats + partition by health_second_der_cluster_partition; + define partition health_second_der_member_partition by health_second_der_request.member_id; + from health_second_der_request [health_description == 'memory_consumption'] + #window.stratos:secondDerivative(1 min, value) + select member_id, cluster_id, cluster_instance_id, network_partition_id, value as member_second_derivative_memory_consumption + insert into member_second_derivative_memory_consumption_stats + partition by health_second_der_member_partition; + from health_second_der_request [health_description == 'load_average'] + #window.stratos:secondDerivative(1 min, value) + select member_id, cluster_id, cluster_instance_id, network_partition_id, value as member_second_derivative_load_average + insert into member_second_derivative_load_average_stats + partition by health_second_der_member_partition;]]></queryExpressions> + <exportedStreams> + <stream name="second_derivative_memory_consumption_stats" + valueOf="second_derivative_memory_consumption_stats" version="1.0.0"/> + <stream name="second_derivative_load_average_stats" + valueOf="second_derivative_load_average_stats" version="1.0.0"/> + <stream name="member_second_derivative_memory_consumption_stats" + valueOf="member_second_derivative_memory_consumption_stats" version="1.0.0"/> + <stream name="member_second_derivative_load_average_stats" + valueOf="member_second_derivative_load_average_stats" version="1.0.0"/> + </exportedStreams> +</executionPlan>
http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml b/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml new file mode 100644 index 0000000..c8e4ed5 --- /dev/null +++ b/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<executionPlan name="SecondDerivativeOfRequestsInFlightFinder" + statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor"> + <description>This will find the second derivative of the number of requests in flight over a minute.</description> + <siddhiConfiguration> + <property name="siddhi.enable.distributed.processing">false</property> + <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property> + </siddhiConfiguration> + <importedStreams> + <stream as="second_der_rif" name="in_flight_requests" version="1.0.0"/> + </importedStreams> + <queryExpressions><![CDATA[ + from second_der_rif + select cluster_id, cluster_instance_id, network_partition_id, in_flight_request_count, + stratos:concat(cluster_id, '-' ,network_partition_id) as rif_second_der_cluster_network + insert into rif_second_der_concat; + define partition rif_second_der_cluster_partition by rif_second_der_concat.rif_second_der_cluster_network; + from rif_second_der_concat#window.stratos:secondDerivative(1 min, in_flight_request_count) + select cluster_id, cluster_instance_id, network_partition_id,in_flight_request_count as count + insert into second_derivative_in_flight_requests + partition by rif_second_der_cluster_partition; + ]]></queryExpressions> + <exportedStreams> + <stream name="second_derivative_in_flight_requests" + valueOf="second_derivative_in_flight_requests" version="1.0.0"/> + </exportedStreams> +</executionPlan> http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml b/extensions/cep/modules/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml new file mode 100644 index 0000000..77af463 --- /dev/null +++ b/extensions/cep/modules/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<inputEventAdaptor name="DefaultWSO2EventInputAdaptor" + statistics="disable" trace="enable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager"/> http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml b/extensions/cep/modules/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml new file mode 100644 index 0000000..4438d2c --- /dev/null +++ b/extensions/cep/modules/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<outputEventAdaptor name="DefaultWSO2EventOutputAdaptor" + statistics="disable" trace="disable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager"> + <property name="username">admin</property> + <property name="receiverURL">tcp://localhost:7661</property> + <property name="password">admin</property> + <property name="authenticatorURL">ssl://localhost:7761</property> +</outputEventAdaptor> http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/outputeventadaptors/JMSOutputAdaptor.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/artifacts/outputeventadaptors/JMSOutputAdaptor.xml b/extensions/cep/modules/artifacts/outputeventadaptors/JMSOutputAdaptor.xml new file mode 100644 index 0000000..59c3653 --- /dev/null +++ b/extensions/cep/modules/artifacts/outputeventadaptors/JMSOutputAdaptor.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<outputEventAdaptor name="JMSOutputAdaptor" statistics="disable" + trace="enable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager"> + <!--property name="java.naming.provider.url">CEP_HOME/repository/conf/jndi.properties</property--> + <property name="java.naming.provider.url">tcp://localhost:61616</property> + <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property> + <property name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</property> + <property name="transport.jms.DestinationType">topic</property> +</outputEventAdaptor> http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml new file mode 100644 index 0000000..4c4c7e0 --- /dev/null +++ b/extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml @@ -0,0 +1,309 @@ +<?xml version='1.0'?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<streamManagerConfiguration xmlns="http://wso2.org/carbon/streammanager"> + <!-- in-flight requests stream definitions start --> + <streamDefinition name="in_flight_requests" version="1.0.0"> + <description>in-flight request count</description> + <nickName>in-flight requests</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String"/> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="in_flight_request_count" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="gradient_in_flight_requests" version="1.0.0"> + <description>gradient of in flight request count</description> + <nickName>gradient in flight requests</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String"/> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="count" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="average_in_flight_requests" version="1.0.0"> + <description>average of in-flight request count</description> + <nickName>average in-flight requests</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String"/> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="count" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="second_derivative_in_flight_requests" version="1.0.0"> + <description>second derivative of in-flight request count</description> + <nickName>second derivative in-flight requests</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String"/> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="count" type="double"/> + </payloadData> + </streamDefinition> + <!-- in-flight requests stream definitions end --> + + <!-- cartridge agent health stats stream definitions start --> + <streamDefinition name="cartridge_agent_health_stats" version="1.0.0"> + <description>agent health stats</description> + <nickName>agent health stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="member_id" type="String" /> + <property name="partition_id" type="String" /> + <property name="health_description" type="String"/> + <property name="value" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="average_load_average_stats" version="1.0.0"> + <description>average load average stats</description> + <nickName>average load average stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="average_load_average" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="average_memory_consumption_stats" version="1.0.0"> + <description>average memory consumption stats</description> + <nickName>average memory consumption stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String"/> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="average_memory_consumption" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="gradient_load_average_stats" version="1.0.0"> + <description>gradient load average stats</description> + <nickName>gradient load average stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="gradient_load_average" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="gradient_memory_consumption_stats" version="1.0.0"> + <description>gradient memoryconsumption stats</description> + <nickName>gradient memoryconsumption stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="gradient_memory_consumption" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="second_derivative_memory_consumption_stats" version="1.0.0"> + <description>second derivative memory consumption stats</description> + <nickName>second derivative memory consumption stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="second_derivative_memory_consumption" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="second_derivative_load_average_stats" version="1.0.0"> + <description>second derivative load average stats</description> + <nickName>second derivative load average stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="second_derivative_load_average" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="fault_message" version="1.0.0"> + <description>fault message</description> + <nickName>fault message</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="cluster_id" type="String"/> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="member_id" type="String"/> + <property name="partition_id" type="String"/> + </payloadData> + </streamDefinition> + <!-- cartridge agent health stats stream definitions end --> + + <!-- This is for member_id wise grouping--> + <streamDefinition name="member_average_load_average_stats" version="1.0.0"> + <description>average load average stats</description> + <nickName>average load average stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="member_id" type="String" /> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="member_average_load_average" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="member_average_memory_consumption_stats" version="1.0.0"> + <description>average memory consumption stats</description> + <nickName>average memory consumption stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="member_id" type="String"/> + <property name="cluster_id" type="String"/> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="member_average_memory_consumption" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="member_gradient_load_average_stats" version="1.0.0"> + <description>gradient load average stats</description> + <nickName>gradient load average stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="member_id" type="String" /> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="member_gradient_load_average" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="member_gradient_memory_consumption_stats" version="1.0.0"> + <description>gradient memoryconsumption stats</description> + <nickName>gradient memoryconsumption stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="member_id" type="String" /> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="member_gradient_memory_consumption" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="member_second_derivative_memory_consumption_stats" version="1.0.0"> + <description>second derivative memory consumption stats</description> + <nickName>second derivative memory consumption stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="member_id" type="String" /> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="member_second_derivative_memory_consumption" type="double"/> + </payloadData> + </streamDefinition> + + <streamDefinition name="member_second_derivative_load_average_stats" version="1.0.0"> + <description>second derivative load average stats</description> + <nickName>second derivative load average stats</nickName> + <metaData> + </metaData> + <correlationData> + </correlationData> + <payloadData> + <property name="member_id" type="String" /> + <property name="cluster_id" type="String" /> + <property name="cluster_instance_id" type="String"/> + <property name="network_partition_id" type="String"/> + <property name="member_second_derivative_load_average" type="double"/> + </payloadData> + </streamDefinition> + +</streamManagerConfiguration> http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/README.md ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/distribution/README.md b/extensions/cep/modules/distribution/README.md new file mode 100755 index 0000000..ebf6bf2 --- /dev/null +++ b/extensions/cep/modules/distribution/README.md @@ -0,0 +1,12 @@ +# Apache Stratos CEP Extensions + +Apache Stratos Complex Event Processor (CEP) extensions include Window Processors for processing +health statistic events. These extensions are available in Stratos binary distribution, in a +distributed deployment where CEP is run externally, these extensions need to be deployed manually. + +Please refer below link for more information on WSO2 CEP. +http://wso2.com/products/complex-event-processor/ + + +Thank you for using Apache Stratos! +The Stratos Team \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/distribution/pom.xml b/extensions/cep/modules/distribution/pom.xml new file mode 100644 index 0000000..1fe52f0 --- /dev/null +++ b/extensions/cep/modules/distribution/pom.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.stratos</groupId> + <artifactId>cep-extensions</artifactId> + <version>4.1.2</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>apache-stratos-cep-artifacts</artifactId> + <packaging>pom</packaging> + <name>Apache Stratos CEP artifacts</name> + <description>Apache Stratos CEP artifacts</description> + + <profiles> + <profile> + <id>default</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>4-dist</id> + <phase>package</phase> + <goals> + <goal>attached</goal> + </goals> + <configuration> + <descriptors> + <descriptor>${basedir}/src/assembly/bin.xml</descriptor> + </descriptors> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/src/assembly/bin.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/distribution/src/assembly/bin.xml b/extensions/cep/modules/distribution/src/assembly/bin.xml new file mode 100755 index 0000000..509a6e2 --- /dev/null +++ b/extensions/cep/modules/distribution/src/assembly/bin.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<assembly> + <id>bin</id> + <formats> + <format>zip</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>../artifacts/</directory> + <outputDirectory>${project.artifactId}-${project.version}</outputDirectory> + <includes> + <include>**/*.xml</include> + </includes> + <excludes> + <exclude>pom.xml</exclude> + <exclude>src/**</exclude> + </excludes> + </fileSet> + </fileSets> + <files> + <file> + <source>${project.basedir}/README.md</source> + <outputDirectory>${project.artifactId}-${project.version}</outputDirectory> + <filtered>true</filtered> + <fileMode>644</fileMode> + </file> + <file> + <source>src/main/notice/NOTICE</source> + <outputDirectory>${project.artifactId}-${project.version}</outputDirectory> + <filtered>true</filtered> + <fileMode>644</fileMode> + </file> + <file> + <source>src/main/license/LICENSE</source> + <outputDirectory>${project.artifactId}-${project.version}</outputDirectory> + <filtered>true</filtered> + <fileMode>644</fileMode> + </file> + </files> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/src/main/license/LICENSE ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/distribution/src/main/license/LICENSE b/extensions/cep/modules/distribution/src/main/license/LICENSE new file mode 100644 index 0000000..25202d8 --- /dev/null +++ b/extensions/cep/modules/distribution/src/main/license/LICENSE @@ -0,0 +1,204 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +=================================================================================== http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/src/main/notice/NOTICE ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/distribution/src/main/notice/NOTICE b/extensions/cep/modules/distribution/src/main/notice/NOTICE new file mode 100644 index 0000000..46ddddd --- /dev/null +++ b/extensions/cep/modules/distribution/src/main/notice/NOTICE @@ -0,0 +1,7 @@ +Apache Stratos CEP Extensions +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +================================================================================ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/pom.xml b/extensions/cep/modules/stratos-cep-extension/pom.xml new file mode 100644 index 0000000..1905ed4 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/pom.xml @@ -0,0 +1,63 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- + # Licensed to the Apache Software Foundation (ASF) under one + # or more contributor license agreements. See the NOTICE file + # distributed with this work for additional information + # regarding copyright ownership. The ASF licenses this file + # to you under the Apache License, Version 2.0 (the + # "License"); you may not use this file except in compliance + # with the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, + # software distributed under the License is distributed on an + # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + # KIND, either express or implied. See the License for the + # specific language governing permissions and limitations + # under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.stratos</groupId> + <artifactId>cep-extensions</artifactId> + <version>4.1.2</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>org.apache.stratos.cep.extension</artifactId> + <name>Apache Stratos - CEP Extensions</name> + <description>Apache Stratos CEP Extensions</description> + + <repositories> + <repository> + <id>wso2-maven2-repository</id> + <name>WSO2 Maven2 Repository</name> + <url>http://dist.wso2.org/maven2</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.wso2.siddhi</groupId> + <artifactId>siddhi-core</artifactId> + <version>2.0.0-wso2v5</version> + </dependency> + <dependency> + <groupId>org.apache.stratos</groupId> + <artifactId>org.apache.stratos.messaging</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java new file mode 100644 index 0000000..59c70c5 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cep.extension; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/** + * CEP Topology Receiver for Fault Handling Window Processor. + */ +public class CEPTopologyEventReceiver extends TopologyEventReceiver { + + private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); + + private FaultHandlingWindowProcessor faultHandler; + + public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { + this.faultHandler = faultHandler; + addEventListeners(); + } + + @Override + public void execute() { + super.execute(); + log.info("CEP topology event receiver thread started"); + } + + private void addEventListeners() { + // Load member time stamp map from the topology as a one time task + addEventListener(new CompleteTopologyEventListener() { + private boolean initialized; + + @Override + protected void onEvent(Event event) { + if (!initialized) { + try { + TopologyManager.acquireReadLock(); + log.debug("Complete topology event received to fault handling window processor."); + CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event; + initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology()); + } catch (Exception e) { + log.error("Error loading member time stamp map from complete topology event.", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + } + }); + + // Remove member from the time stamp map when MemberTerminated event is received. + addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId()); + log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId()); + } + }); + + // Add member to time stamp map whenever member is activated + addEventListener(new MemberActivatedEventListener() { + @Override + protected void onEvent(Event event) { + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + + // do not put this member if we have already received a health event + faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), + System.currentTimeMillis()); + log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java new file mode 100644 index 0000000..699f036 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cep.extension; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +@SiddhiExtension(namespace = "stratos", function = "concat") +public class ConcatWindowProcessor extends FunctionExecutor { + Attribute.Type returnType = Attribute.Type.STRING; + @Override + public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { + } + + @Override + protected Object process(Object obj) { + if (obj instanceof Object[]) { + StringBuffer sb=new StringBuffer(); + for (Object aObj : (Object[]) obj) { + sb.append(aObj); + } + return sb.toString(); + } else { + return obj.toString(); + } + + } + + @Override + public void destroy() { + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java new file mode 100644 index 0000000..0aa01ed --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.MessagingUtil; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.persistence.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * CEP window processor to handle faulty member instances. This window processor is responsible for + * publishing MemberFault event if health stats are not received within a given time window. + */ +@SiddhiExtension(namespace = "stratos", function = "faultHandling") +public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); + + private static final int TIME_OUT = 60 * 1000; + public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool"; + public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10; + + private ExecutorService executorService; + private ScheduledExecutorService faultHandleScheduler; + private ScheduledFuture<?> lastSchedule; + private ThreadBarrier threadBarrier; + private long timeToKeep; + private ISchedulerSiddhiQueue<StreamEvent> window; + private EventPublisher healthStatPublisher = + EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName()); + private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + + // Map of member id's to their last received health event time stamp + private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); + + // Event receiver to receive topology events published by cloud-controller + private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this); + + // Stratos member id attribute index in stream execution plan + private int memberIdAttrIndex; + + @Override + protected void processEvent(InEvent event) { + addDataToMap(event); + } + + @Override + protected void processEvent(InListEvent listEvent) { + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + addDataToMap((InEvent) listEvent.getEvent(i)); + } + } + + /** + * Add new entry to time stamp map from the received event. + * + * @param event Event received by Siddhi. + */ + protected void addDataToMap(InEvent event) { + String id = (String) event.getData()[memberIdAttrIndex]; + //checking whether this member is the topology. + //sometimes there can be a delay between publishing member terminated events + //and actually terminating instances. Hence CEP might get events for already terminated members + //so we are checking the topology for the member existence + Member member = getMemberFromId(id); + if (null == member) { + log.debug("Member not found in the topology. Event rejected"); + return; + } + if (StringUtils.isNotEmpty(id)) { + memberTimeStampMap.put(id, event.getTimeStamp()); + } else { + log.warn("NULL member id found in the event received. Event rejected."); + } + if (log.isDebugEnabled()){ + log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + /** + * Retrieve the current activated members from the topology and initialize the timestamp map. + * This will allow the system to recover from a restart + * + * @param topology Topology model object + */ + boolean loadTimeStampMapFromTopology(Topology topology){ + + long currentTimeStamp = System.currentTimeMillis(); + if (topology == null || topology.getServices() == null){ + return false; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : topology.getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()) { + // we are checking faulty status only in previously activated members + if (member != null && MemberStatus.Active.equals(member.getStatus())) { + // Initialize the member time stamp map from the topology at the beginning + memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); + } + } + } + } + } + } + + if (log.isDebugEnabled()){ + log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + + memberTimeStampMap); + } + return true; + } + + private Member getMemberFromId(String memberId){ + if (StringUtils.isEmpty(memberId)){ + return null; + } + if (TopologyManager.getTopology().isInitialized()){ + try { + TopologyManager.acquireReadLock(); + if (TopologyManager.getTopology().getServices() == null){ + return null; + } + // TODO make this efficient by adding APIs to messaging component + for (Service service : TopologyManager.getTopology().getServices()) { + if (service.getClusters() != null) { + for (Cluster cluster : service.getClusters()) { + if (cluster.getMembers() != null) { + for (Member member : cluster.getMembers()){ + if (memberId.equals(member.getMemberId())){ + return member; + } + } + } + } + } + } + } catch (Exception e) { + log.error("Error while reading topology" + e); + } finally { + TopologyManager.releaseReadLock(); + } + } + return null; + } + + private void publishMemberFault(String memberId){ + Member member = getMemberFromId(memberId); + if (member == null){ + log.warn("Failed to publish member fault event. Member having [member-id] " + memberId + + " does not exist in topology"); + return; + } + log.info("Publishing member fault event for [member-id] " + memberId); + + MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), + member.getMemberId(), member.getPartitionId(), + member.getNetworkPartitionId(), 0); + + memberFaultEventMessageMap.put("message", memberFaultEvent); + healthStatPublisher.publish(MemberFaultEventMap, true); + } + + @Override + public void run() { + try { + threadBarrier.pass(); + + for (Object o : memberTimeStampMap.entrySet()) { + Map.Entry pair = (Map.Entry) o; + long currentTime = System.currentTimeMillis(); + Long eventTimeStamp = (Long) pair.getValue(); + + if ((currentTime - eventTimeStamp) > TIME_OUT) { + log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); + publishMemberFault((String) pair.getKey()); + } + } + if (log.isDebugEnabled()){ + log.debug("Fault handling processor iteration completed with [time-stamp map length] " + + memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap); + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } finally { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState()}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, + AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String memberIdAttrName = ((Variable) parameters[1]).getAttributeName(); + memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName); + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); + + executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, + CEP_EXTENSION_THREAD_POOL_SIZE); + cepTopologyEventReceiver.setExecutorService(executorService); + cepTopologyEventReceiver.execute(); + + //Ordinary scheduling + window.schedule(); + if (log.isDebugEnabled()){ + log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep + + ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex + + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()); + } + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + @Override + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.faultHandleScheduler = scheduledExecutorService; + } + + @Override + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + // terminate topology listener thread + cepTopologyEventReceiver.terminate(); + window = null; + + // Shutdown executor service + if(executorService != null) { + try { + executorService.shutdownNow(); + } catch (Exception e) { + log.warn("An error occurred while shutting down cep extension executor service", e); + } + } + } + + public ConcurrentHashMap<String, Long> getMemberTimeStampMap() { + return memberTimeStampMap; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java new file mode 100644 index 0000000..dff0f79 --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cep.extension; + +import org.apache.log4j.Logger; +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.event.StreamEvent; +import org.wso2.siddhi.core.event.in.InEvent; +import org.wso2.siddhi.core.event.in.InListEvent; +import org.wso2.siddhi.core.event.remove.RemoveEvent; +import org.wso2.siddhi.core.event.remove.RemoveListEvent; +import org.wso2.siddhi.core.persistence.ThreadBarrier; +import org.wso2.siddhi.core.query.QueryPostProcessingElement; +import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor; +import org.wso2.siddhi.core.query.processor.window.WindowProcessor; +import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue; +import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@SiddhiExtension(namespace = "stratos", function = "gradient") +public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { + + static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class); + private ScheduledExecutorService eventRemoverScheduler; + private ScheduledFuture<?> lastSchedule; + private long timeToKeep; + private int subjectedAttrIndex; + private Attribute.Type subjectedAttrType; + private List<InEvent> newEventList; + private List<RemoveEvent> oldEventList; + private ThreadBarrier threadBarrier; + private ISchedulerSiddhiQueue<StreamEvent> window; + + @Override + protected void processEvent(InEvent event) { + acquireLock(); + try { + newEventList.add(event); + } finally { + releaseLock(); + } + } + + @Override + protected void processEvent(InListEvent listEvent) { + acquireLock(); + try { + System.out.println(listEvent); + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + newEventList.add((InEvent) listEvent.getEvent(i)); + } + } finally { + releaseLock(); + } + } + + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } + + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } + + + @Override + public void run() { + acquireLock(); + try { + long scheduledTime = System.currentTimeMillis(); + try { + oldEventList.clear(); + while (true) { + threadBarrier.pass(); + RemoveEvent removeEvent = (RemoveEvent) window.poll(); + if (removeEvent == null) { + if (oldEventList.size() > 0) { + nextProcessor.process(new RemoveListEvent( + oldEventList.toArray(new RemoveEvent[oldEventList.size()]))); + oldEventList.clear(); + } + + if (newEventList.size() > 0) { + InEvent[] inEvents = + newEventList.toArray(new InEvent[newEventList.size()]); + for (InEvent inEvent : inEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + + InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]); + + for (InEvent inEvent : gradientEvents) { + window.put(new RemoveEvent(inEvent, -1)); + } + nextProcessor.process(new InListEvent(gradientEvents)); + + newEventList.clear(); + } + + long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime); + if (diff > 0) { + try { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + log.warn("scheduling cannot be accepted for execution: elementID " + + elementId); + } + break; + } + scheduledTime = System.currentTimeMillis(); + } else { + oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis())); + } + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + } finally { + releaseLock(); + } + } + + + /** + * This function will calculate the linear gradient (per second) of the events received during + * a specified time period. + */ + private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) { + double firstVal = 0.0, lastVal = 0.0; + // FIXME I'm not sure whether there's some other good way to do correct casting, + // based on the type. + if (Type.DOUBLE.equals(subjectedAttrType)) { + firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.INT.equals(subjectedAttrType)) { + firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.LONG.equals(subjectedAttrType)) { + firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex]; + } else if (Type.FLOAT.equals(subjectedAttrType)) { + firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex]; + lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex]; + } + + long t1 = firstInEvent.getTimeStamp(); + long t2 = lastInEvent.getTimeStamp(); + long millisecondsForASecond = 1000; + long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond; + double gradient = 0.0; + if (tGap > 0) { + gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap; + } + if (log.isDebugEnabled()) { + log.debug("Gradient: " + gradient + " Last val: " + lastVal + + " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+ + t2+" hash: "+this.hashCode()); + } + Object[] data = firstInEvent.getData().clone(); + data[subjectedAttrIndex] = gradient; + InEvent gradientEvent = + new InEvent(firstInEvent.getStreamId(), (t1+t2)/2, + data); + InEvent[] output = new InEvent[1]; + output[0] = gradientEvent; + return output; + } + + @Override + protected Object[] currentState() { + return new Object[]{window.currentState(), oldEventList, newEventList}; + } + + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + oldEventList = ((ArrayList<RemoveEvent>) data[1]); + newEventList = ((ArrayList<InEvent>) data[2]); + window.reSchedule(); + } + + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } + + String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); + subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr); + + oldEventList = new ArrayList<RemoveEvent>(); + if (this.siddhiContext.isDistributedProcessingEnabled()) { + newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList"); + } else { + newEventList = new ArrayList<InEvent>(); + } + + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + //Ordinary scheduling + window.schedule(); + + } + + @Override + public void schedule() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } + + public void scheduleNow() { + if (lastSchedule != null) { + lastSchedule.cancel(false); + } + lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } + + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.eventRemoverScheduler = scheduledExecutorService; + } + + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } + + @Override + public void destroy(){ + oldEventList = null; + newEventList = null; + window = null; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java new file mode 100755 index 0000000..0dc24bd --- /dev/null +++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cep.extension; + +/** + * Member Request Handling Capability Window Processor + */ + +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +@SiddhiExtension(namespace = "stratos", function = "divider") +public class MemeberRequestHandlingCapabilityWindowProcessor extends FunctionExecutor { + + Attribute.Type returnType = Attribute.Type.DOUBLE; + + @Override + public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { + } + + @Override + protected Object process(Object obj) { + + double[] value = new double[2]; + if (obj instanceof Object[]) { + int i=0; + for (Object aObj : (Object[]) obj) { + value[i]= Double.parseDouble(String.valueOf(aObj)); + i++; + } + }//to do avoid deviding zero number of active instances won't be zero cz there is min + Double unit = (value[0] / value[1]); + if(!unit.isNaN() && !unit.isInfinite()) + return unit; + else + return 0.0; + + } + + @Override + public void destroy() { + + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } +}
