EAGLE-149 Enable hadoop jmx metric cases https://issues.apache.org/jira/browse/EAGLE-149
Closes #81 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f213bab0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f213bab0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f213bab0 Branch: refs/heads/master Commit: f213bab049c9f19e7a1e8ecee25c7cbd76a3c318 Parents: bd3b555 Author: Hao Chen <h...@apache.org> Authored: Wed Feb 3 22:25:35 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Wed Feb 3 22:25:35 2016 +0800 ---------------------------------------------------------------------- .gitignore | 1 + .../src/main/bin/eagle-topology-init.sh | 56 +++--- .../src/main/bin/hadoop-metric-monitor.sh | 96 ++++----- .../eagle/alert/cep/TestSiddhiEvaluator.java | 9 +- .../alert/dao/TestAlertDefinitionDAOImpl.java | 6 +- .../policy/TestPolicyDistributionUpdater.java | 5 +- .../datastream/core/StreamAlertExpansion.scala | 8 +- .../policy/dao/AlertDefinitionDAOImpl.java | 86 -------- eagle-external/hadoop_jmx_collector/.gitignore | 2 + .../hadoop_jmx_collector/config-sample.json | 19 ++ eagle-external/hadoop_jmx_collector/config.json | 19 -- .../hadoop_jmx_collector/hadoop_jmx_kafka.py | 30 +-- .../hadoop_jmx_collector/metric_extensions.py | 57 +++++- .../hadoop_jmx_collector/util_func.py | 19 +- eagle-gc/src/main/resources/alert-gc-policy.sh | 4 +- .../main/resources/alert-metadata-create-gc.sh | 8 +- .../metric/HadoopJmxMetricDeserializer.java | 56 ------ .../hadoop/metric/HadoopJmxMetricMonitor.java | 35 ++++ .../eagle/hadoop/metric/NameNodeLagMonitor.java | 67 ------- .../org/apache/eagle/hadoop/metric/Utils.java | 64 ++++++ .../src/main/resources/application.conf | 27 +-- .../src/main/resources/eagle-env.sh | 44 +++++ .../src/main/resources/hadoop-metric-init.sh | 165 ++++++++++++++++ .../src/main/resources/hadoopjmx.yaml | 18 ++ .../src/main/resources/hastate-policy-import.sh | 51 +++++ .../lastcheckpointtime-policy-import.sh | 51 +++++ .../resources/missingblock-policy-import.sh | 51 +++++ .../main/resources/namenodelag-policy-import.sh | 49 +++++ .../src/main/resources/namenodelag.yaml | 18 -- .../src/main/resources/namenodelage-init.sh | 194 ------------------- .../main/resources/nodecount-policy-import.sh | 51 +++++ .../resources/safemodecheck-policy-import.sh | 51 +++++ .../metric/HadoopJmxMetricDeserializerTest.java | 17 +- .../hadoop/metric/TestHadoopMetricSiddhiQL.java | 82 +++++++- eagle-topology-assembly/pom.xml | 5 + pom.xml | 2 + 36 files changed, 923 insertions(+), 600 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 8a40f3a..033e79f 100644 --- a/.gitignore +++ b/.gitignore @@ -73,6 +73,7 @@ logs/ .DS_Store *.cache-tests +application-local.conf *.orig **/*.pyc http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-assembly/src/main/bin/eagle-topology-init.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/eagle-topology-init.sh b/eagle-assembly/src/main/bin/eagle-topology-init.sh index f30749f..03b2a9a 100755 --- a/eagle-assembly/src/main/bin/eagle-topology-init.sh +++ b/eagle-assembly/src/main/bin/eagle-topology-init.sh @@ -90,34 +90,34 @@ echo "Importing AlertStreamService for USERPROFILE" curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H "Content-Type: application/json" "http://$EAGLE_SERVICE_HOST:$EAGLE_SERVICE_PORT/eagle-service/rest/entities?serviceName=AlertStreamService" \ -d '[ { "prefix": "alertStream", "tags": { "streamName": "userActivity", "site":"sandbox", "dataSource":"userProfile" }, "alertExecutorIdList": [ "userProfileAnomalyDetectionExecutor" ] } ]' -##################################################################### -# Import stream metadata for HADOOP METRIC -##################################################################### - -## AlertDataSource: data sources bound to sites -echo "Importing AlertDataSourceService for HADOOP METRIC ... " - -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]' - - -## AlertStreamService: alert streams generated from data source -echo "" -echo "Importing AlertStreamService for HADOOP METRIC ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \ --H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ --d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]' - -## AlertExecutorService: what alert streams are consumed by alert executor -echo "" -echo "Importing AlertExecutorService for HADOOP METRIC ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]' -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]' - -## AlertStreamSchemaService: schema for event from alert stream -echo "" -echo "Importing AlertStreamSchemaService for HADOOP METRIC ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":" service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":" service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' +###################################################################### +## Import stream metadata for HADOOP METRIC +###################################################################### +# +### AlertDataSource: data sources bound to sites +#echo "Importing AlertDataSourceService for HADOOP METRIC ... " +# +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]' +# +# +### AlertStreamService: alert streams generated from data source +#echo "" +#echo "Importing AlertStreamService for HADOOP METRIC ... " +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \ +#-H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ +#-d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]' +# +### AlertExecutorService: what alert streams are consumed by alert executor +#echo "" +#echo "Importing AlertExecutorService for HADOOP METRIC ... " +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]' +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]' +# +### AlertStreamSchemaService: schema for event from alert stream +#echo "" +#echo "Importing AlertStreamSchemaService for HADOOP METRIC ... " +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription": "service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription": "service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' ## Finished echo "" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh b/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh index 706154a..d88f9d9 100644 --- a/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh +++ b/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh @@ -1,48 +1,48 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -source $(dirname $0)/eagle-env.sh - -##################################################################### -# Import stream metadata for HADOOP METRIC -##################################################################### -## AlertDataSource: data sources bound to sites -echo "Importing AlertDataSourceService for HADOOP METRIC ... " - -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]' - - -## AlertStreamService: alert streams generated from data source -echo "" -echo "Importing AlertStreamService for HADOOP METRIC ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \ --H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ --d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]' - -## AlertExecutorService: what alert streams are consumed by alert executor -echo "" -echo "Importing AlertExecutorService for HADOOP METRIC ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]' -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]' - -## AlertStreamSchemaService: schema for event from alert stream -echo "" -echo "Importing AlertStreamSchemaService for HADOOP METRIC ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":" service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":" service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' - -$EAGLE_HOME/kafka-stream-monitor.sh hadoopJmxMetric hadoopJmxMetricExecutor $EAGLE_HOME/conf/sandbox-hadoopjmx-topology.conf \ No newline at end of file +##!/bin/bash +# +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +# +#source $(dirname $0)/eagle-env.sh +# +###################################################################### +## Import stream metadata for HADOOP METRIC +###################################################################### +### AlertDataSource: data sources bound to sites +#echo "Importing AlertDataSourceService for HADOOP METRIC ... " +# +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]' +# +# +### AlertStreamService: alert streams generated from data source +#echo "" +#echo "Importing AlertStreamService for HADOOP METRIC ... " +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \ +#-H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ +#-d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]' +# +### AlertExecutorService: what alert streams are consumed by alert executor +#echo "" +#echo "Importing AlertExecutorService for HADOOP METRIC ... " +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]' +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]' +# +### AlertStreamSchemaService: schema for event from alert stream +#echo "" +#echo "Importing AlertStreamSchemaService for HADOOP METRIC ... " +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription": "service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' +#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription": "service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' +# +#$EAGLE_HOME/kafka-stream-monitor.sh hadoopJmxMetric hadoopJmxMetricExecutor $EAGLE_HOME/conf/sandbox-hadoopjmx-topology.conf \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java index b29e4c1..ab086aa 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java @@ -28,10 +28,8 @@ import org.apache.eagle.dataproc.core.ValuesArray; import org.apache.eagle.datastream.Collector; import org.apache.eagle.datastream.Tuple2; import org.apache.eagle.policy.PolicyEvaluationContext; -import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl; -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; +import org.apache.eagle.policy.common.Constants; +import org.apache.eagle.policy.dao.*; import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition; import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator; import org.apache.eagle.policy.siddhi.StreamMetadataManager; @@ -95,7 +93,8 @@ public class TestSiddhiEvaluator { "insert into outputStream ;"; policyDef.setExpression(expression); - PolicyDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) { + PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, null), + Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) { @Override public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java index 783abea..79173f9 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java @@ -18,10 +18,11 @@ package org.apache.eagle.alert.dao; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl; import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.policy.common.Constants; import org.apache.eagle.policy.dao.PolicyDefinitionDAO; +import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl; import org.apache.eagle.service.client.EagleServiceConnector; import org.junit.Assert; import org.junit.Test; @@ -54,7 +55,8 @@ public class TestAlertDefinitionDAOImpl { String site = "sandbox"; String dataSource = "UnitTest"; - PolicyDefinitionDAO dao = new AlertDefinitionDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort)) { + PolicyDefinitionDAO dao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort), + Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) { @Override public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception { List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java index 7c02db9..7ca0846 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java @@ -27,9 +27,9 @@ import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; import org.apache.eagle.alert.executor.AlertExecutor; import org.apache.eagle.policy.DefaultPolicyPartitioner; import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl; import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; import org.apache.eagle.policy.dao.PolicyDefinitionDAO; +import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl; import org.apache.eagle.service.client.EagleServiceConnector; import org.junit.Test; import org.slf4j.Logger; @@ -45,7 +45,8 @@ public class TestPolicyDistributionUpdater { @Test public void testPolicyDistributionReporter() throws Exception{ - PolicyDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, 1)) { + PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, 1), + Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) { @Override public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception { final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala index 97ea669..01d3a70 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala @@ -20,8 +20,10 @@ package org.apache.eagle.datastream.core import java.util +import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity import org.apache.eagle.alert.executor.AlertExecutorCreationUtils -import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl +import org.apache.eagle.policy.common.Constants +import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl import scala.collection.JavaConversions.asScalaSet import scala.collection.mutable.ListBuffer @@ -89,7 +91,9 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi /** * step 2: partition alert executor by policy partitioner class */ - val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(new EagleServiceConnector(config)), upStreamNames, alertExecutorId) + val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, + new PolicyDefinitionEntityDAOImpl[AlertDefinitionAPIEntity](new EagleServiceConnector(config), Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME), + upStreamNames, alertExecutorId) var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]] alertExecutors.foreach(exec => { val t = FlatMapProducer(exec).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config, hook = false) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java deleted file mode 100644 index 8694f52..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.dao; - -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.service.client.EagleServiceConnector; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Utility methods to load alert definitions for a program - */ -public class AlertDefinitionDAOImpl implements PolicyDefinitionDAO<AlertDefinitionAPIEntity> { - - private static final long serialVersionUID = 7717408104714443056L; - private static final Logger LOG = LoggerFactory.getLogger(AlertDefinitionDAOImpl.class); - private final EagleServiceConnector connector; - - public AlertDefinitionDAOImpl(EagleServiceConnector connector){ - this.connector = connector; - } - - @Override - public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception { - try { - IEagleServiceClient client = new EagleServiceClientImpl(connector); - String query = Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\" AND @dataSource=\"" + dataSource + "\"]{*}"; - GenericServiceAPIResponseEntity<AlertDefinitionAPIEntity> response = client.search() - .pageSize(Integer.MAX_VALUE) - .query(query) - .send(); - client.close(); - if (response.getException() != null) { - throw new Exception("Got an exception when query eagle service: " + response.getException()); - } - List<AlertDefinitionAPIEntity> list = response.getObj(); - List<AlertDefinitionAPIEntity> enabledList = new ArrayList<AlertDefinitionAPIEntity>(); - for (AlertDefinitionAPIEntity entity : list) { - if (entity.isEnabled()) enabledList.add(entity); - } - return enabledList; - } - catch (Exception ex) { - LOG.error("Got an exception when query alert Def service", ex); - throw new IllegalStateException(ex); - } - } - - @Override - public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception { - List<AlertDefinitionAPIEntity> list = findActivePolicies(site, dataSource); - Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>(); - for (AlertDefinitionAPIEntity entity : list) { - String executorID = entity.getTags().containsKey(Constants.EXECUTOR_ID) ? entity.getTags().get(Constants.EXECUTOR_ID) - : entity.getTags().get(Constants.ALERT_EXECUTOR_ID); - if (map.get(executorID) == null) { - map.put(executorID, new HashMap<String, AlertDefinitionAPIEntity>()); - } - map.get(executorID).put(entity.getTags().get("policyId"), entity); - } - return map; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/.gitignore ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/.gitignore b/eagle-external/hadoop_jmx_collector/.gitignore new file mode 100644 index 0000000..adad264 --- /dev/null +++ b/eagle-external/hadoop_jmx_collector/.gitignore @@ -0,0 +1,2 @@ +config.json +*.pyc \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json new file mode 100644 index 0000000..1f5cb7e --- /dev/null +++ b/eagle-external/hadoop_jmx_collector/config-sample.json @@ -0,0 +1,19 @@ +{ + "env": { + "site": "sandbox" + }, + "input": { + "component": "namenode", + "port": "50070", + "https": false + }, + "filter": { + "monitoring.group.selected": ["hadoop", "java.lang"] + }, + "output": { + "kafka": { + "topic": "nn_jmx_metric_sandbox", + "brokerList": ["sandbox.hortonworks.com:6667"] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/config.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/config.json b/eagle-external/hadoop_jmx_collector/config.json deleted file mode 100644 index 1f5cb7e..0000000 --- a/eagle-external/hadoop_jmx_collector/config.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "env": { - "site": "sandbox" - }, - "input": { - "component": "namenode", - "port": "50070", - "https": false - }, - "filter": { - "monitoring.group.selected": ["hadoop", "java.lang"] - }, - "output": { - "kafka": { - "topic": "nn_jmx_metric_sandbox", - "brokerList": ["sandbox.hortonworks.com:6667"] - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py index e0667af..b342a5e 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py @@ -70,8 +70,7 @@ def get_metric_prefix_name(mbean_attribute, context): name_index = [i[0] for i in mbean_list].index('name') mbean_list[name_index][1] = context metric_prefix_name = '.'.join([i[1] for i in mbean_list]) - return DATA_TYPE + "." + metric_prefix_name - + return (DATA_TYPE + "." + metric_prefix_name).replace(" ","").lower() def parse_hadoop_jmx(producer, topic, config, beans, dataMap, fat_bean): selected_group = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')] @@ -97,7 +96,7 @@ def parse_hadoop_jmx(producer, topic, config, beans, dataMap, fat_bean): for key, value in bean.iteritems(): #print key, value key = key.lower() - if not isNumber(value) or re.match(r'tag.*', key): + if re.match(r'tag.*', key): continue if mbean_domain == 'hadoop' and re.match(r'^namespace', key): @@ -111,8 +110,8 @@ def parse_hadoop_jmx(producer, topic, config, beans, dataMap, fat_bean): key = items[1] metric = metric_prefix_name + '.' + key - send_output_message(producer, topic, kafka_dict, metric, value) + single_metric_callback(producer, topic, kafka_dict, metric, value) def get_jmx_beans(host, port, https): # port = inputConfig.get('port') @@ -135,12 +134,16 @@ def main(): kafka = None producer = None topic = None + brokerList = None try: #start = time.clock() # read the kafka.ini - config = load_config('config.json') + if (len(sys.argv) > 1): + config = load_config(sys.argv[1]) + else: + config = load_config('config.json') #print config site = config[u'env'].get('site').encode('utf-8') @@ -154,18 +157,21 @@ def main(): port = config[u'input'].get('port') https = config[u'input'].get('https') kafkaConfig = config[u'output'].get(u'kafka') - brokerList = kafkaConfig.get('brokerList') - topic = kafkaConfig.get('topic').encode('utf-8') + if kafkaConfig != None : + brokerList = kafkaConfig.get('brokerList') + topic = kafkaConfig.get('topic').encode('utf-8') beans = get_jmx_beans(host, port, https) #print brokerList - kafka, producer = kafka_connect(brokerList) + if brokerList != None: + kafka, producer = kafka_connect(brokerList) + default_metric = {"site": site, "host": host, "timestamp": '', "component": component, "metric": '', "value": ''} fat_bean = dict() parse_hadoop_jmx(producer, topic, config, beans, default_metric, fat_bean) - extend_jmx_metrics(producer, topic, default_metric, fat_bean) - except Exception, e: - print 'main except: ', e + metrics_bean_callback(producer, topic, default_metric, fat_bean) + # except Exception, e: + # print 'main except: ', e finally: if kafka != None and producer != None: kafka_close(kafka, producer) @@ -174,4 +180,4 @@ def main(): #print("Time used:",elapsed) if __name__ == "__main__": - main() \ No newline at end of file + main() http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/metric_extensions.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_extensions.py b/eagle-external/hadoop_jmx_collector/metric_extensions.py index 4ea89c9..0028204 100644 --- a/eagle-external/hadoop_jmx_collector/metric_extensions.py +++ b/eagle-external/hadoop_jmx_collector/metric_extensions.py @@ -14,11 +14,55 @@ # limitations under the License. -#!/usr/bin/python +# !/usr/bin/python from util_func import * import json + +# Metric Parsing Callback Entry +def single_metric_callback(producer, topic, kafka_dict, metric, value): + if isNumber(value): + numeric_metric_callack(producer, topic, kafka_dict, metric, value) + else: + nonnumeric_metric_callack(producer, topic, kafka_dict, metric, value) + + +def metrics_bean_callback(producer, topic, metric, bean): + cal_mem_usage(producer, topic, bean, metric, "hadoop.namenode.jvm") + journal_transaction_info(producer, topic, bean, metric, "hadoop.namenode.journaltransaction") + nn_hastate(producer,topic,bean,metric,"hadoop.namenode.fsnamesystem") + +################################################# +# Metric Parsing Extensions +################################################# + +def numeric_metric_callack(producer, topic, kafka_dict, metric, value): + # Send out numeric value directly + send_output_message(producer, topic, kafka_dict, metric, value) + + +def nonnumeric_metric_callack(producer, topic, kafka_dict, metric, value): + nn_safe_mode_metric(producer, topic, kafka_dict, metric, value) + +def nn_safe_mode_metric(producer, topic, kafka_dict, metric, value): + if metric == "hadoop.namenode.fsnamesystemstate.fsstate": + if value == "safeMode": + value = 1 + else: + value = 0 + + send_output_message(producer, topic, kafka_dict, metric, value) + +def nn_hastate(producer, topic, bean, metricMap, metric_prefix_name="hadoop.namenode.fsnamesystem"): + kafka_dict = metricMap.copy() + if bean[u'tag.HAState'] == "active": + value = 0 + else: + value = 1 + + send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".hastate", value) + def cal_mem_usage(producer, topic, bean, metricMap, metric_prefix_name): kafka_dict = metricMap.copy() PercentVal = None @@ -34,6 +78,7 @@ def cal_mem_usage(producer, topic, bean, metricMap, metric_prefix_name): PercentVal = round(float(bean['MemHeapCommittedM']) / float(bean['MemHeapMaxM']) * 100, 2) send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemHeapCommittedUsage", PercentVal) + def journal_transaction_info(producer, topic, bean, metric, metric_prefix_name): new_metric = metric.copy() if bean.has_key("JournalTransactionInfo"): @@ -42,11 +87,9 @@ def journal_transaction_info(producer, topic, bean, metric, metric_prefix_name): LastAppliedOrWrittenTxId = int(JournalTransactionInfo.get("LastAppliedOrWrittenTxId")) MostRecentCheckpointTxId = int(JournalTransactionInfo.get("MostRecentCheckpointTxId")) - send_output_message(producer, topic, new_metric, metric_prefix_name + ".LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId) - send_output_message(producer, topic, new_metric, metric_prefix_name + ".MostRecentCheckpointTxId", MostRecentCheckpointTxId) + send_output_message(producer, topic, new_metric, metric_prefix_name + ".LastAppliedOrWrittenTxId", + LastAppliedOrWrittenTxId) + send_output_message(producer, topic, new_metric, metric_prefix_name + ".MostRecentCheckpointTxId", + MostRecentCheckpointTxId) else: raise Exception("JournalTransactionInfo not found") - -def extend_jmx_metrics(producer, topic, metric, bean): - cal_mem_usage(producer, topic, bean, metric, "hadoop.namenode.jvm") - journal_transaction_info(producer,topic,bean,metric,"hadoop.namenode.JournalTransaction") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/util_func.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/util_func.py b/eagle-external/hadoop_jmx_collector/util_func.py index c02abbd..a2ca44a 100644 --- a/eagle-external/hadoop_jmx_collector/util_func.py +++ b/eagle-external/hadoop_jmx_collector/util_func.py @@ -52,22 +52,21 @@ def send_output_message(producer, topic, kafka_dict, metric, value): def load_config(filename): # read the self-defined filters - script_dir = os.path.dirname(__file__) - rel_path = "./" + filename - abs_file_path = os.path.join(script_dir, rel_path) - f = open(abs_file_path, 'r') - json_file = f.read() - f.close() - #print json_file - try: + script_dir = os.path.dirname(__file__) + rel_path = "./" + filename + abs_file_path = os.path.join(script_dir, rel_path) + if not os.path.isfile(abs_file_path): + print abs_file_path+" doesn't exist, please rename config-sample.json to config.json" + exit(1) + f = open(abs_file_path, 'r') + json_file = f.read() + f.close() config = json.loads(json_file) - except ValueError: print "configuration file load error" return config - def isNumber(str): try: if str == None or isinstance(str, (bool)): http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-gc/src/main/resources/alert-gc-policy.sh ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/resources/alert-gc-policy.sh b/eagle-gc/src/main/resources/alert-gc-policy.sh index ffa6549..7f32c22 100644 --- a/eagle-gc/src/main/resources/alert-gc-policy.sh +++ b/eagle-gc/src/main/resources/alert-gc-policy.sh @@ -15,7 +15,7 @@ #!/bin/sh #### AlertDefinitionService: alert definition for NNGCLog Pause Time Long -curl -X POST -H 'Content-Type:application/json' -H "Authorization:Basic YWRtaW46c2VjcmV0" "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox"","dataSource":"NNGCLog","policyId":"NNGCPauseTimeLong","alertExecutorId":"NNGCAlert","policyType":"siddhiCEPEngine"},"desc":"alert when namenode pause time long","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from NNGCLogStream#window.externalTime(timestamp,10 min) select sum(pausedGCTimeSec) as sumPausedSec having sumPausedSec >= 30 insert into outputStream;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":true}]' +curl -X POST -H 'Content-Type:application/json' -H "Authorization:Basic YWRtaW46c2VjcmV0" "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"NNGCLog","policyId":"NNGCPauseTimeLong","alertExecutorId":"NNGCAlert","policyType":"siddhiCEPEngine"},"desc":"alert when namenode pause time long","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from NNGCLogStream#window.externalTime(timestamp,10 min) select sum(pausedGCTimeSec) as sumPausedSec having sumPausedSec >= 30 insert into outputStream;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":true}]' #### AlertDefinitionService: alert definition for NNGCLog Full GC -curl -X POST -H 'Content-Type:application/json' -H "Authorization:Basic YWRtaW46c2VjcmV0" "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"NNGCLog","policyId":"NNGCPauseTimeLong","alertExecutorId":"NNGCAlert","policyType":"siddhiCEPEngine"},"desc":"alert when namenode has full gc","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from NNGCLogStream[(permAreaGCed == true)] select * insert into outputStream;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":true}]' \ No newline at end of file +curl -X POST -H 'Content-Type:application/json' -H "Authorization:Basic YWRtaW46c2VjcmV0" "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"NNGCLog","policyId":"NNGCPauseTimeLong","alertExecutorId":"NNGCAlert","policyType":"siddhiCEPEngine"},"desc":"alert when namenode has full gc","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from NNGCLogStream[(permAreaGCed == true)] select * insert into outputStream;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":true}]' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-gc/src/main/resources/alert-metadata-create-gc.sh ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/resources/alert-metadata-create-gc.sh b/eagle-gc/src/main/resources/alert-metadata-create-gc.sh index 1e57c28..2e34490 100644 --- a/eagle-gc/src/main/resources/alert-metadata-create-gc.sh +++ b/eagle-gc/src/main/resources/alert-metadata-create-gc.sh @@ -15,13 +15,13 @@ #!/bin/sh #### AlertDataSourceService: alert streams generated from data source -curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix": "alertDataSource", "tags": {"site": "sandbox","dataSource": "NNGCLog"}}]' +curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix": "alertDataSource", "tags": {"site": "sandbox","dataSource": "NNGCLog"}}]' #### AlertStreamService: alert streams generated from data source -curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamService" -d '[{"prefix":"alertStream","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream"},"desc":"alert event stream from namenode gc log"}]' +curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertStreamService" -d '[{"prefix":"alertStream","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream"},"desc":"alert event stream from namenode gc log"}]' #### AlertExecutorService: what alert streams are consumed by alert executor -curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"NNGCLog","alertExecutorId":"NNGCAlert","streamName":"NNGCLogStream"},"desc":"alert executor for namenode gc log"}]' +curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"NNGCLog","alertExecutorId":"NNGCAlert","streamName":"NNGCLogStream"},"desc":"alert executor for namenode gc log"}]' #### AlertStreamSchemaService: schema for event from alert stream -curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"timestamp"}},{"prefix":"alertStreamSchema","category":"","attrType":"string","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"eventType"}},{"prefix":"alertStreamSchema","category":"","attrType":"double","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"pausedGCTimeSec"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"youngAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog"," streamName":"NNGCLogStream","attrName":"youngUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"youngTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long ","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"totalHeapUsageAvailable"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"usedTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"totalHeapK"}},{ "prefix":"alertStreamSchema","category":"","attrType":"string","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"logLine"}}]' \ No newline at end of file +curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"timestamp"}},{"prefix":"alertStreamSchema","category":"","attrType":"string","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"eventType"}},{"prefix":"alertStreamSchema","category":"","attrType":"double","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"pausedGCTimeSec"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"youngAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","s treamName":"NNGCLogStream","attrName":"youngUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"youngTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long" ,"attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"totalHeapUsageAvailable"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"usedTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"totalHeapK"}},{" prefix":"alertStreamSchema","category":"","attrType":"string","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"logLine"}}]' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java deleted file mode 100644 index 27a416d..0000000 --- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.hadoop.metric; - -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; -import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.charset.Charset; -import java.util.Map; -import java.util.Properties; -import java.util.SortedMap; - -/** - * Created on 1/19/16. - */ -public class HadoopJmxMetricDeserializer implements SpoutKafkaMessageDeserializer { - - private static final Logger LOG = LoggerFactory.getLogger(HadoopJmxMetricDeserializer.class); - - private Properties props; - - public HadoopJmxMetricDeserializer(Properties props){ - this.props = props; - } - - - // convert to a map of <key, map<>> - @Override - public Object deserialize(byte[] arg0) { - try { - String content = new String(arg0, Charset.defaultCharset().name()); - Map<String, Object> metricItem = JsonSerDeserUtils.deserialize(content, SortedMap.class); - return metricItem; - } catch (Exception e) { - e.printStackTrace(); - LOG.error("unrecognizable content", e); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java new file mode 100644 index 0000000..9202da4 --- /dev/null +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.hadoop.metric; + +import org.apache.eagle.datastream.ExecutionEnvironments; +import org.apache.eagle.datastream.core.StreamProducer; +import org.apache.eagle.datastream.storm.StormExecutionEnvironment; + +/** + * Created on 1/12/16. + */ +public class HadoopJmxMetricMonitor { + + public static void main(String[] args) { + StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class); + String streamName = "hadoopJmxMetricEventStream"; + StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName); + sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor"); + env.execute(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java deleted file mode 100644 index 7c574f8..0000000 --- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.hadoop.metric; - -import backtype.storm.spout.SchemeAsMultiScheme; -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.core.StreamProducer; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * Created on 1/12/16. - */ -public class NameNodeLagMonitor { - - public static void main(String[] args) { - StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class); - String streamName = "hadoopJmxMetricEventStream"; - StreamProducer sp = env.fromSpout(createProvider(env.getConfig())).withOutputFields(2).parallelism(1).nameAs(streamName); - sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor"); - - env.execute(); - } - - // create a tuple kafka source - private static KafkaSourcedSpoutProvider createProvider(Config config) { - String deserClsName = config.getString("dataSourceConfig.deserializerClass"); - final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { - @Override - public List<Object> deserialize(byte[] ser) { - Object tmp = deserializer.deserialize(ser); - Map<String, Object> map = (Map<String, Object>)tmp; - if(tmp == null) return null; - // this is the key to be grouped by - return Arrays.asList(String.format("%s-%s", map.get("host"), map.get("metric")), tmp); - } - }; - - KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() { - @Override - public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) { - return new SchemeAsMultiScheme(scheme); - } - }; - return provider; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java new file mode 100644 index 0000000..173441c --- /dev/null +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.hadoop.metric; + +import backtype.storm.spout.SchemeAsMultiScheme; +import com.typesafe.config.Config; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Created on 1/25/16. + */ +public class Utils { + + /** + * Creates a spout provider that have host-metric as the first tuple data, so that it's feasible for alert grouping. + * + * @param config + * @return + */ + public static KafkaSourcedSpoutProvider createProvider(Config config) { + String deserClsName = config.getString("dataSourceConfig.deserializerClass"); + final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { + + @Override + public List<Object> deserialize(byte[] ser) { + Object tmp = deserializer.deserialize(ser); + Map<String, Object> map = (Map<String, Object>) tmp; + if (tmp == null) return null; + // this is the key to be grouped by + return Arrays.asList(String.format("%s-%s", map.get("host"), map.get("metric")), tmp); + } + + }; + + KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() { + + @Override + public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) { + return new SchemeAsMultiScheme(scheme); + } + + }; + return provider; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/resources/application.conf b/eagle-hadoop-metric/src/main/resources/application.conf index aa7e340..d72cc26 100644 --- a/eagle-hadoop-metric/src/main/resources/application.conf +++ b/eagle-hadoop-metric/src/main/resources/application.conf @@ -17,8 +17,8 @@ "envContextConfig" : { "env" : "storm", "mode" : "local", - "topologyName" : "nameNodeLagTopology", - "stormConfigFile" : "namenodelage.yaml", + "topologyName" : "hadoopJmxMetricTopology", + "stormConfigFile" : "hadoopjmx.yaml", "parallelismConfig" : { "kafkaMsgConsumer" : 1, "hadoopJmxMetricAlertExecutor*" : 1 @@ -26,12 +26,12 @@ }, "dataSourceConfig": { "topic" : "nn_jmx_metric_sandbox", - "zkConnection" : "localhost:2181", + "zkConnection" : "sandbox.hortonworks.com:2181", "zkConnectionTimeoutMS" : 15000, "consumerGroupId" : "EagleConsumer", "fetchSize" : 1048586, - "deserializerClass" : "org.apache.eagle.hadoop.metric.HadoopJmxMetricDeserializer", - "transactionZKServers" : "localhost", + "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer", + "transactionZKServers" : "sandbox.hortonworks.com", "transactionZKPort" : 2181, "transactionZKRoot" : "/consumers", "transactionStateUpdateMS" : 2000 @@ -43,23 +43,6 @@ "needValidation" : "true" } }, - "persistExecutorConfigs" { - "persistExecutor1" : { - "kafka": { - "bootstrap_servers" : "localhost", - "topics" : { - "defaultOutput" : "downSampleOutput" - } - } - } - }, - "aggregateExecutorConfigs" : { - "aggregateExecutor1" : { - "parallelism" : 1, - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } - }, "eagleProps" : { "site" : "sandbox", "dataSource": "hadoopJmxMetricDataSource", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/eagle-env.sh ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/resources/eagle-env.sh b/eagle-hadoop-metric/src/main/resources/eagle-env.sh new file mode 100755 index 0000000..79ff5fa --- /dev/null +++ b/eagle-hadoop-metric/src/main/resources/eagle-env.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# set EAGLE_HOME +export EAGLE_HOME=$(dirname $0)/.. + +# The java implementation to use. please use jdk 1.7 or later +# export JAVA_HOME=${JAVA_HOME} +# export JAVA_HOME=/usr/java/jdk1.7.0_80/ + +# nimbus.host, default is localhost +export EAGLE_NIMBUS_HOST=localhost + +# EAGLE_SERVICE_HOST, default is `hostname -f` +export EAGLE_SERVICE_HOST=localhost + +# EAGLE_SERVICE_PORT, default is 9099 +export EAGLE_SERVICE_PORT=9099 + +# EAGLE_SERVICE_USER +export EAGLE_SERVICE_USER=admin + +# EAGLE_SERVICE_PASSWORD +export EAGLE_SERVICE_PASSWD=secret + +export EAGLE_CLASSPATH=$EAGLE_HOME/conf +# Add eagle shared library jars +for file in $EAGLE_HOME/lib/share/*;do + EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$file +done http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh b/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh new file mode 100644 index 0000000..8405c19 --- /dev/null +++ b/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh @@ -0,0 +1,165 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with` +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +source $(dirname $0)/eagle-env.sh + +##################################################################### +# Import stream metadata for HDFS +##################################################################### + +## AlertDataSource: data sources bound to sites +echo "Importing AlertDataSourceService for persist... " + +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ + "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" \ + -d ' + [ + { + "prefix":"alertDataSource", + "tags":{ + "site":"sandbox", + "dataSource":"hadoopJmxMetricDataSource" + }, + "enabled": true, + "config":"", + "desc":"hadoop" + } + ] + ' + +## AlertStreamService: alert streams generated from data source +echo "" +echo "Importing AlertStreamService for HDFS... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ + "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ + -d ' + [ + { + "prefix":"alertStream", + "tags":{ + "dataSource":"hadoopJmxMetricDataSource", + "streamName":"hadoopJmxMetricEventStream" + }, + "desc":"hadoop" + } + ] + ' + +## AlertExecutorService: what alert streams are consumed by alert executor +echo "" +echo "Importing AlertExecutorService for HDFS... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ + "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \ + -d ' + [ + { + "prefix":"alertExecutor", + "tags":{ + "dataSource":"hadoopJmxMetricDataSource", + "alertExecutorId":"hadoopJmxMetricAlertExecutor", + "streamName":"hadoopJmxMetricEventStream" + }, + "desc":"aggregate executor for hadoop jmx metric event stream" + } + ] + ' + +## AlertStreamSchemaService: schema for event from alert stream +echo "" +echo "Importing AlertStreamSchemaService for HDFS... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ +"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \ + -d ' + [ + { + "prefix": "alertStreamSchema", + "tags": { + "dataSource": "hadoopJmxMetricDataSource", + "streamName": "hadoopJmxMetricEventStream", + "attrName": "host" + }, + "attrDescription": "the host that current metric comes form", + "attrType": "string", + "category": "", + "attrValueResolver": "" + }, + { + "prefix": "alertStreamSchema", + "tags": { + "dataSource": "hadoopJmxMetricDataSource", + "streamName": "hadoopJmxMetricEventStream", + "attrName": "timestamp" + }, + "attrDescription": "the metric timestamp", + "attrType": "long", + "category": "", + "attrValueResolver": "" + }, + { + "prefix": "alertStreamSchema", + "tags": { + "dataSource": "hadoopJmxMetricDataSource", + "streamName": "hadoopJmxMetricEventStream", + "attrName": "metric" + }, + "attrDescription": "the metric name", + "attrType": "string", + "category": "", + "attrValueResolver": "" + }, + { + "prefix": "alertStreamSchema", + "tags": { + "dataSource": "hadoopJmxMetricDataSource", + "streamName": "hadoopJmxMetricEventStream", + "attrName": "component" + }, + "attrDescription": "the component that the metric comes from", + "attrType": "string", + "category": "", + "attrValueResolver": "" + }, + { + "prefix": "alertStreamSchema", + "tags": { + "dataSource": "hadoopJmxMetricDataSource", + "streamName": "hadoopJmxMetricEventStream", + "attrName": "site" + }, + "attrDescription": "the site that the metric belongs to", + "attrType": "string", + "category": "", + "attrValueResolver": "" + }, + { + "prefix": "alertStreamSchema", + "tags": { + "dataSource": "hadoopJmxMetricDataSource", + "streamName": "hadoopJmxMetricEventStream", + "attrName": "value" + }, + "attrDescription": "the metric value in string presentation", + "attrType": "double", + "category": "", + "attrValueResolver": "" + } + ] + ' + +## Finished +echo "" +echo "Finished initialization for eagle topology" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml b/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml new file mode 100644 index 0000000..a68a323 --- /dev/null +++ b/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +topology.workers: 1 +topology.acker.executors: 1 +topology.tasks: 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh b/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh new file mode 100644 index 0000000..a043cd4 --- /dev/null +++ b/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with` +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +source $(dirname $0)/eagle-env.sh +source $(dirname $0)/hadoop-metric-init.sh + + +##### add policies ########## +echo "" +echo "Importing policy: haStatePolicy " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ + "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \ + -d ' + [ + { + "prefix": "alertdef", + "tags": { + "site": "sandbox", + "dataSource": "hadoopJmxMetricDataSource", + "policyId": "haStatePolicy", + "alertExecutorId": "hadoopJmxMetricAlertExecutor", + "policyType": "siddhiCEPEngine" + }, + "description": "jmx metric ", + "policyDef": "{\"expression\":\"from every a = hadoopJmxMetricEventStream[metric==\\\"hadoop.namenode.fsnamesystem.hastate\\\"] -> b = hadoopJmxMetricEventStream[metric==a.metric and b.host == a.host and (convert(a.value, \\\"long\\\") != convert(value, \\\"long\\\"))] within 10 min select a.host, a.value as oldHaState, b.value as newHaState, b.timestamp as timestamp, b.metric as metric, b.component as component, b.site as site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}", + "enabled": true, + "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}", + "notificationDef": "[{\"sender\":\"ea...@apache.org\",\"recipients\":\"ea...@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]" + } + ] + ' + + ## Finished +echo "" +echo "Finished initialization for eagle topology" + +exit 0