EAGLE-444 convert eagle-gc app to use new app framework convert eagle-gc app to use new app framework
Author: @yonzhang2012 <[email protected]> Closes: #339 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/98458658 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/98458658 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/98458658 Branch: refs/heads/develop Commit: 984586580b3840f254f36e2f24a786c1eca98370 Parents: 15e1c83 Author: yonzhang <[email protected]> Authored: Fri Aug 12 23:50:10 2016 -0700 Committer: yonzhang <[email protected]> Committed: Fri Aug 12 23:50:10 2016 -0700 ---------------------------------------------------------------------- .../kafka/NewKafkaSourcedSpoutProvider.java | 118 ++++++++++ eagle-gc/pom.xml | 5 + .../org/apache/eagle/gc/GCLogApplication.java | 79 +++++++ .../eagle/gc/GCLogApplicationProvider.java | 34 +++ .../org/apache/eagle/gc/GCLogProcessorMain.java | 42 ---- .../gc/executor/GCLogAnalysorExecutor.java | 77 ------- .../eagle/gc/executor/GCLogAnalyzerBolt.java | 85 +++++++ .../gc/executor/GCMetricGeneratorBolt.java | 125 +++++++++++ .../gc/executor/GCMetricGeneratorExecutor.java | 108 --------- .../eagle/gc/spout/GCLogDeserializer.java | 39 ---- ....security.hbase.GCLogApplicationProvider.xml | 221 +++++++++++++++++++ ...org.apache.eagle.app.spi.ApplicationProvider | 37 ++++ eagle-gc/src/main/resources/application.conf | 48 ++-- eagle-gc/src/main/resources/gc-storm.yaml | 18 -- eagle-gc/src/main/resources/log4j.properties | 3 - eagle-hadoop-metric/pom.xml | 5 + .../hadoop/metric/HadoopJmxApplication.java | 42 ++++ .../hadoop/metric/HadoopJmxMetricMonitor.java | 11 +- .../HadoopJmxMetricMonitoringTopology.java | 37 ---- .../kafka/EagleMetricCollectorApplication.java | 148 +++++++++++++ .../metric/kafka/EagleMetricCollectorMain.java | 122 ---------- .../kafka/KafkaMessageDistributionBolt.java | 114 ++++++++++ .../kafka/KafkaMessageDistributionExecutor.java | 103 --------- .../topo/NewKafkaSourcedSpoutProvider.java | 115 ---------- .../hbase/HBaseAuditLogAppProvider.java | 7 - .../hbase/HBaseAuditLogApplication.java | 3 +- .../AbstractHdfsAuditLogApplication.java | 3 +- .../securitylog/HdfsAuthLogMonitoringMain.java | 3 +- .../eagle-security-oozie-auditlog/pom.xml | 5 + .../oozie/parse/OozieAuditLogApplication.java | 75 +++++++ .../oozie/parse/OozieAuditLogParserBolt.java | 86 ++++++++ .../oozie/parse/OozieAuditLogProcessorMain.java | 33 --- .../OozieResourceSensitivityDataJoinBolt.java | 108 +++++++++ ...ozieResourceSensitivityDataJoinExecutor.java | 89 -------- 34 files changed, 1315 insertions(+), 833 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java new file mode 100644 index 0000000..d764ac1 --- /dev/null +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java @@ -0,0 +1,118 @@ +/* + * + * * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * + */ + +package org.apache.eagle.dataproc.impl.storm.kafka; + +import backtype.storm.spout.Scheme; +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.topology.base.BaseRichSpout; +import com.typesafe.config.Config; +import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.kafka.BrokerHosts; +import storm.kafka.KafkaSpout; +import storm.kafka.SpoutConfig; +import storm.kafka.ZkHosts; + +import java.util.Arrays; + +/** + * Since 6/8/16. + */ +public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider { + private final static Logger LOG = LoggerFactory.getLogger(NewKafkaSourcedSpoutProvider.class); + + private String configPrefix = "dataSourceConfig"; + + public NewKafkaSourcedSpoutProvider(){} + + public NewKafkaSourcedSpoutProvider(String prefix){ + this.configPrefix = prefix; + } + + @Override + public BaseRichSpout getSpout(Config config){ + Config context = config; + if(this.configPrefix!=null) context = config.getConfig(configPrefix); + // Kafka topic + String topic = context.getString("topic"); + // Kafka consumer group id + String groupId = context.getString("consumerGroupId"); + // Kafka fetch size + int fetchSize = context.getInt("fetchSize"); + // Kafka broker zk connection + String zkConnString = context.getString("zkConnection"); + // transaction zkRoot + String zkRoot = context.getString("transactionZKRoot"); + + LOG.info(String.format("Use topic id: %s",topic)); + + String brokerZkPath = null; + if(context.hasPath("brokerZkPath")) { + brokerZkPath = context.getString("brokerZkPath"); + } + + BrokerHosts hosts; + if(brokerZkPath == null) { + hosts = new ZkHosts(zkConnString); + } else { + hosts = new ZkHosts(zkConnString, brokerZkPath); + } + + SpoutConfig spoutConfig = new SpoutConfig(hosts, + topic, + zkRoot + "/" + topic, + groupId); + + // transaction zkServers + spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(",")); + // transaction zkPort + spoutConfig.zkPort = context.getInt("transactionZKPort"); + // transaction update interval + spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS"); + // Kafka fetch size + spoutConfig.fetchSizeBytes = fetchSize; + // "startOffsetTime" is for test usage, prod should not use this + if (context.hasPath("startOffsetTime")) { + spoutConfig.startOffsetTime = context.getInt("startOffsetTime"); + } + // "forceFromStart" is for test usage, prod should not use this + if (context.hasPath("forceFromStart")) { + spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); + } + + if (context.hasPath("schemeCls")) { + try { + Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance(); + spoutConfig.scheme = new SchemeAsMultiScheme(s); + }catch(Exception ex){ + LOG.error("error instantiating scheme object"); + throw new IllegalStateException(ex); + } + }else{ + String err = "schemeCls must be present"; + LOG.error(err); + throw new IllegalStateException(err); + } + return new KafkaSpout(spoutConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-gc/pom.xml b/eagle-gc/pom.xml index 66400cb..cb31156 100644 --- a/eagle-gc/pom.xml +++ b/eagle-gc/pom.xml @@ -46,5 +46,10 @@ <artifactId>eagle-alert-process</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-app-base</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java new file mode 100644 index 0000000..86d8bc4 --- /dev/null +++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.apache.eagle.gc; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider; +import org.apache.eagle.gc.executor.GCLogAnalyzerBolt; +import org.apache.eagle.gc.executor.GCMetricGeneratorBolt; +import storm.kafka.StringScheme; + +/** + * Since 8/12/16. + */ +public class GCLogApplication extends StormApplication{ + public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; + public final static String ANALYZER_TASK_NUM = "topology.numOfAnalyzerTasks"; + public final static String GENERATOR_TASK_NUM = "topology.numOfGeneratorTasks"; + public final static String SINK_TASK_NUM = "topology.numOfSinkTasks"; + + @Override + public StormTopology execute(Config config, StormEnvironment environment) { + TopologyBuilder builder = new TopologyBuilder(); + NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider(); + IRichSpout spout = provider.getSpout(config); + + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfAnalyzerTasks = config.getInt(ANALYZER_TASK_NUM); + int numOfGeneratorTasks = config.getInt(GENERATOR_TASK_NUM); + int numOfSinkTasks = config.getInt(SINK_TASK_NUM); + + builder.setSpout("ingest", spout, numOfSpoutTasks); + + GCLogAnalyzerBolt bolt = new GCLogAnalyzerBolt(); + BoltDeclarer boltDeclarer = builder.setBolt("analyzerBolt", bolt, numOfAnalyzerTasks); + boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY)); + + GCMetricGeneratorBolt generatorBolt = new GCMetricGeneratorBolt(config); + BoltDeclarer joinBoltDeclarer = builder.setBolt("generatorBolt", generatorBolt, numOfGeneratorTasks); + joinBoltDeclarer.fieldsGrouping("analyzerBolt", new Fields("f1")); + + StormStreamSink sinkBolt = environment.getStreamSink("gc_log_stream",config); + BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks); + kafkaBoltDeclarer.fieldsGrouping("generatorBolt", new Fields("f1")); + return builder.createTopology(); + } + + public static void main(String[] args){ + Config config = ConfigFactory.load(); + GCLogApplication app = new GCLogApplication(); + app.run(config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java new file mode 100644 index 0000000..eae170a --- /dev/null +++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java @@ -0,0 +1,34 @@ +/* + * + * * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * + */ + +package org.apache.eagle.gc; + +import org.apache.eagle.app.spi.AbstractApplicationProvider; + +/** + * Since 8/12/16. + */ +public class GCLogApplicationProvider extends AbstractApplicationProvider<GCLogApplication> { + @Override + public GCLogApplication getApplication() { + return new GCLogApplication(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java deleted file mode 100644 index 278a5bc..0000000 --- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.gc; - -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.gc.executor.GCLogAnalysorExecutor; -import org.apache.eagle.gc.executor.GCMetricGeneratorExecutor; - -public class GCLogProcessorMain { - - public static void main(String[] args) throws Exception{ - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args); - Config config = env.getConfig(); - KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider(); - GCLogAnalysorExecutor logAnalysor = new GCLogAnalysorExecutor(); - env.fromSpout(provider.getSpout(config)).withOutputFields(1).nameAs("kafkaMsgConsumer") - .flatMap(logAnalysor) - .flatMap(new GCMetricGeneratorExecutor()) - .alertWithConsumer("NNGCLogStream", "NNGCAlert"); - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java deleted file mode 100644 index e74bc44..0000000 --- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.gc.executor; - -import com.typesafe.config.Config; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.gc.model.GCPausedEvent; -import org.apache.eagle.gc.stream.GCStreamBuilder; -import org.apache.eagle.gc.parser.exception.IgnoredLogFormatException; -import org.apache.eagle.gc.parser.exception.UnrecognizedLogFormatException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.List; -import java.util.Map; - -public class GCLogAnalysorExecutor extends JavaStormStreamExecutor2<String, Map> { - - public final static Logger LOG = LoggerFactory.getLogger(GCLogAnalysorExecutor.class); - - private Config config; - - private long previousLogTime; - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - - } - - @Override - public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> collector) { - String log = (String)input.get(0); - GCStreamBuilder builder = new GCStreamBuilder(); - try { - GCPausedEvent pauseEvent = builder.build(log); - // Because some gc log like concurrent mode failure may miss timestamp info, so we set the previous log's timestamp for it - if (pauseEvent.getTimestamp() == 0) { - pauseEvent.setTimestamp(previousLogTime); - } - previousLogTime = pauseEvent.getTimestamp(); - collector.collect(new Tuple2("GCLog", pauseEvent.toMap())); - } - catch (IgnoredLogFormatException ex1) { - //DO nothing - } - catch (UnrecognizedLogFormatException ex2) { - LOG.warn(ex2.getMessage()); - } - catch (Exception ex3) { - LOG.error("Got an exception when parsing log: ", ex3); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java new file mode 100644 index 0000000..59720a3 --- /dev/null +++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.gc.executor; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import com.typesafe.config.Config; +import org.apache.eagle.datastream.Collector; +import org.apache.eagle.datastream.JavaStormStreamExecutor2; +import org.apache.eagle.gc.model.GCPausedEvent; +import org.apache.eagle.gc.stream.GCStreamBuilder; +import org.apache.eagle.gc.parser.exception.IgnoredLogFormatException; +import org.apache.eagle.gc.parser.exception.UnrecognizedLogFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class GCLogAnalyzerBolt extends BaseRichBolt { + public final static Logger LOG = LoggerFactory.getLogger(GCLogAnalyzerBolt.class); + private OutputCollector collector; + private long previousLogTime; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("f1", "f2")); + } + + + + @Override + public void execute(Tuple input) { + String log = input.getString(0); + GCStreamBuilder builder = new GCStreamBuilder(); + try { + GCPausedEvent pauseEvent = builder.build(log); + // Because some gc log like concurrent mode failure may miss timestamp info, so we set the previous log's timestamp for it + if (pauseEvent.getTimestamp() == 0) { + pauseEvent.setTimestamp(previousLogTime); + } + previousLogTime = pauseEvent.getTimestamp(); + collector.emit(Arrays.asList("GCLog", pauseEvent.toMap())); + } + catch (IgnoredLogFormatException ex1) { + //DO nothing + } + catch (UnrecognizedLogFormatException ex2) { + LOG.warn(ex2.getMessage()); + } + catch (Exception ex3) { + LOG.error("Got an exception when parsing log: ", ex3); + }finally { + collector.ack(input); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java new file mode 100644 index 0000000..2d1023b --- /dev/null +++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.gc.executor; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import com.codahale.metrics.MetricRegistry; +import com.typesafe.config.Config; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.common.config.EagleConfigHelper; +import org.apache.eagle.datastream.*; +import org.apache.eagle.gc.common.GCConstants; +import org.apache.eagle.gc.model.GCPausedEvent; +import org.apache.eagle.metric.reportor.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.*; + +public class GCMetricGeneratorBolt extends BaseRichBolt { + + public final static Logger LOG = LoggerFactory.getLogger(GCMetricGeneratorBolt.class); + private Config config; + private MetricRegistry registry; + private String gcPausedTimeMetricName; + private String youngHeapUsageMetricName; + private String tenuredHeapUsageMetricName; + private String totalHeapUsageMetricName; + private Map<String, String> dimensions; + private List<EagleMetric> metrics = new ArrayList<>(); + + private EagleServiceReporterMetricListener listener; + private OutputCollector collector; + + public GCMetricGeneratorBolt(Config config){ + this.config = config; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + registry = new MetricRegistry(); + String host = EagleConfigHelper.getServiceHost(config); + int port = EagleConfigHelper.getServicePort(config); + String username = EagleConfigHelper.getServiceUser(config); + String password = EagleConfigHelper.getServicePassword(config); + listener = new EagleServiceReporterMetricListener(host, port, username, password); + dimensions = new HashMap<>(); + dimensions.put(EagleConfigConstants.SITE, EagleConfigHelper.getSite(config)); + dimensions.put(EagleConfigConstants.APPLICATION, EagleConfigHelper.getApplication(config)); + gcPausedTimeMetricName = MetricKeyCodeDecoder.codeMetricKey(GCConstants.GC_PAUSE_TIME_METRIC_NAME, dimensions); + + this.collector = collector; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("f1", "f2")); + } + public void registerMetricIfMissing(String metricName, EagleMetric metric) { + if (registry.getMetrics().get(metricName) == null) { + metric.registerListener(listener); + registry.register(metricName, metric); + } + } + @Override + public void execute(Tuple input) { + try { + Map<String, Object> map = (Map<String, Object>) input.getValue(1); + GCPausedEvent event = new GCPausedEvent(map); + // Generate gc paused time metric + EagleCounterMetric metric = new EagleCounterMetric(event.getTimestamp(), gcPausedTimeMetricName, event.getPausedGCTimeSec(), GCConstants.GC_PAUSE_TIME_METRIC_GRANULARITY); + registerMetricIfMissing(gcPausedTimeMetricName, metric); + + // Generate young heap paused time metric + if (event.isYoungAreaGCed()) { + youngHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_YOUNG_MEMORY_METRIC_NAME, dimensions); + EagleGaugeMetric metric2 = new EagleGaugeMetric(event.getTimestamp(), youngHeapUsageMetricName, event.getYoungUsedHeapK()); + metrics.add(metric2); + } + + // Generate tenured heap paused time metric + if (event.isTenuredAreaGCed()) { + tenuredHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TENURED_MEMORY_METRIC_NAME, dimensions); + EagleGaugeMetric metric3 = new EagleGaugeMetric(event.getTimestamp(), tenuredHeapUsageMetricName, event.getTenuredUsedHeapK()); + metrics.add(metric3); + } + + // Generate total heap paused time metric + if (event.isTotalHeapUsageAvailable()) { + totalHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TOTAL_MEMORY_METRIC_NAME, dimensions); + EagleGaugeMetric metric4 = new EagleGaugeMetric(event.getTimestamp(), totalHeapUsageMetricName, event.getUsedTotalHeapK()); + metrics.add(metric4); + } + listener.onMetricFlushed(metrics); + metrics.clear(); + collector.emit(Arrays.asList(input.getValue(0), input.getValue(1))); + }catch(Exception ex){ + LOG.error("error in gc metric generating", ex); + }finally { + collector.ack(input); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java deleted file mode 100644 index ebc5a6c..0000000 --- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.gc.executor; - -import com.codahale.metrics.MetricRegistry; -import com.typesafe.config.Config; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.common.config.EagleConfigHelper; -import org.apache.eagle.datastream.*; -import org.apache.eagle.gc.common.GCConstants; -import org.apache.eagle.gc.model.GCPausedEvent; -import org.apache.eagle.metric.reportor.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.*; - -public class GCMetricGeneratorExecutor extends JavaStormStreamExecutor2<String, Map> { - - public final static Logger LOG = LoggerFactory.getLogger(GCMetricGeneratorExecutor.class); - private Config config; - private MetricRegistry registry; - private String gcPausedTimeMetricName; - private String youngHeapUsageMetricName; - private String tenuredHeapUsageMetricName; - private String totalHeapUsageMetricName; - private Map<String, String> dimensions; - private List<EagleMetric> metrics = new ArrayList<>(); - - private EagleServiceReporterMetricListener listener; - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - registry = new MetricRegistry(); - String host = EagleConfigHelper.getServiceHost(config); - int port = EagleConfigHelper.getServicePort(config); - String username = EagleConfigHelper.getServiceUser(config); - String password = EagleConfigHelper.getServicePassword(config); - listener = new EagleServiceReporterMetricListener(host, port, username, password); - dimensions = new HashMap<>(); - dimensions.put(EagleConfigConstants.SITE, EagleConfigHelper.getSite(config)); - dimensions.put(EagleConfigConstants.APPLICATION, EagleConfigHelper.getApplication(config)); - gcPausedTimeMetricName = MetricKeyCodeDecoder.codeMetricKey(GCConstants.GC_PAUSE_TIME_METRIC_NAME, dimensions); - } - - public void registerMetricIfMissing(String metricName, EagleMetric metric) { - if (registry.getMetrics().get(metricName) == null) { - metric.registerListener(listener); - registry.register(metricName, metric); - } - } - - @Override - public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> collector) { - Map<String, Object> map = (Map<String, Object>) input.get(1); - GCPausedEvent event = new GCPausedEvent(map); - // Generate gc paused time metric - EagleCounterMetric metric = new EagleCounterMetric(event.getTimestamp(), gcPausedTimeMetricName, event.getPausedGCTimeSec(), GCConstants.GC_PAUSE_TIME_METRIC_GRANULARITY); - registerMetricIfMissing(gcPausedTimeMetricName, metric); - - // Generate young heap paused time metric - if (event.isYoungAreaGCed()) { - youngHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_YOUNG_MEMORY_METRIC_NAME, dimensions); - EagleGaugeMetric metric2 = new EagleGaugeMetric(event.getTimestamp(), youngHeapUsageMetricName, event.getYoungUsedHeapK()); - metrics.add(metric2); - } - - // Generate tenured heap paused time metric - if (event.isTenuredAreaGCed()) { - tenuredHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TENURED_MEMORY_METRIC_NAME, dimensions); - EagleGaugeMetric metric3 = new EagleGaugeMetric(event.getTimestamp(), tenuredHeapUsageMetricName, event.getTenuredUsedHeapK()); - metrics.add(metric3); - } - - // Generate total heap paused time metric - if (event.isTotalHeapUsageAvailable()) { - totalHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TOTAL_MEMORY_METRIC_NAME, dimensions); - EagleGaugeMetric metric4 = new EagleGaugeMetric(event.getTimestamp(), totalHeapUsageMetricName, event.getUsedTotalHeapK()); - metrics.add(metric4); - } - listener.onMetricFlushed(metrics); - metrics.clear(); - collector.collect(new Tuple2(input.get(0), input.get(1))); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java b/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java deleted file mode 100644 index f1b3105..0000000 --- a/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.gc.spout; - -import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer; -import org.apache.hadoop.hbase.util.Bytes; - -import java.util.Properties; - -public class GCLogDeserializer implements SpoutKafkaMessageDeserializer { - - private Properties props; - - public GCLogDeserializer(Properties props){ - this.props = props; - } - - @Override - public Object deserialize(byte[] arg0) { - return Bytes.toString(arg0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml b/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml new file mode 100644 index 0000000..213132d --- /dev/null +++ b/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml @@ -0,0 +1,221 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ /* + ~ * + ~ * Licensed to the Apache Software Foundation (ASF) under one or more + ~ * contributor license agreements. See the NOTICE file distributed with + ~ * this work for additional information regarding copyright ownership. + ~ * The ASF licenses this file to You under the Apache License, Version 2.0 + ~ * (the "License"); you may not use this file except in compliance with + ~ * the License. You may obtain a copy of the License at + ~ * + ~ * http://www.apache.org/licenses/LICENSE-2.0 + ~ * + ~ * Unless required by applicable law or agreed to in writing, software + ~ * distributed under the License is distributed on an "AS IS" BASIS, + ~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ * See the License for the specific language governing permissions and + ~ * limitations under the License. + ~ * + ~ */ + --> + +<application> + <type>GCLogApplication</type> + <name>GC Log Monitoring Application</name> + <version>0.5.0-incubating</version> + <appClass>org.apache.eagle.gc.GCLogApplication</appClass> + <viewPath>/apps/example</viewPath> + <configuration> + <property> + <name>dataSourceConfig.topic</name> + <displayName>dataSourceConfig.topic</displayName> + <value>gc_log</value> + <description>data source topic</description> + </property> + <property> + <name>dataSourceConfig.zkConnection</name> + <displayName>dataSourceConfig.zkConnection</displayName> + <value>server.eagle.apache.org:2181</value> + <description>zk connection</description> + </property> + <property> + <name>dataSourceConfig.zkConnectionTimeoutMS</name> + <displayName>dataSourceConfig.zkConnectionTimeoutMS</displayName> + <value>15000</value> + <description>zk connection timeout in milliseconds</description> + </property> + <property> + <name>dataSourceConfig.fetchSize</name> + <displayName>dataSourceConfig.fetchSize</displayName> + <value>1048586</value> + <description>kafka fetch size</description> + </property> + <property> + <name>dataSourceConfig.transactionZKServers</name> + <displayName>dataSourceConfig.transactionZKServers</displayName> + <value>server.eagle.apache.org</value> + <description>zookeeper server for offset transaction</description> + </property> + <property> + <name>dataSourceConfig.transactionZKPort</name> + <displayName>dataSourceConfig.transactionZKPort</displayName> + <value>2181</value> + <description>zookeeper server port for offset transaction</description> + </property> + <property> + <name>dataSourceConfig.transactionZKRoot</name> + <displayName>dataSourceConfig.transactionZKRoot</displayName> + <value>/consumers</value> + <description>offset transaction root</description> + </property> + <property> + <name>dataSourceConfig.consumerGroupId</name> + <displayName>dataSourceConfig.consumerGroupId</displayName> + <value>eagle.hbaseaudit.consumer</value> + <description>kafka consumer group Id</description> + </property> + <property> + <name>dataSourceConfig.transactionStateUpdateMS</name> + <displayName>dataSourceConfig.transactionStateUpdateMS</displayName> + <value>2000</value> + <description>zk upate</description> + </property> + <property> + <name>dataSourceConfig.schemeCls</name> + <displayName>dataSourceConfig.schemeCls</displayName> + <value>storm.kafka.StringScheme</value> + <description>scheme class</description> + </property> + <property> + <name>topology.numOfSpoutTasks</name> + <displayName>topology.numOfSpoutTasks</displayName> + <value>2</value> + <description>number of spout tasks</description> + </property> + <property> + <name>topology.numOfAnalyzerTasks</name> + <displayName>topology.numOfAnalyzerTasks</displayName> + <value>2</value> + <description>number of analyzer tasks</description> + </property> + <property> + <name>topology.numOfGeneratorTasks</name> + <displayName>topology.numOfGeneratorTasks</displayName> + <value>2</value> + <description>number of generator tasks</description> + </property> + <property> + <name>topology.numOfSinkTasks</name> + <displayName>topology.numOfSinkTasks</displayName> + <value>2</value> + <description>number of sink tasks</description> + </property> + <property> + <name>eagleProps.eagleService.host</name> + <displayName>eagleProps.eagleService.host</displayName> + <value>localhost</value> + <description>eagle service host</description> + </property> + <property> + <name>eagleProps.eagleService.port</name> + <displayName>eagleProps.eagleService.port</displayName> + <value>8080</value> + <description>eagle service port</description> + </property> + <property> + <name>eagleProps.eagleService.username</name> + <displayName>eagleProps.eagleService.username</displayName> + <value>admin</value> + <description>eagle service username</description> + </property> + <property> + <name>eagleProps.eagleService.password</name> + <displayName>eagleProps.eagleService.password</displayName> + <value>secret</value> + <description>eagle service password</description> + </property> + <property> + <name>dataSinkConfig.topic</name> + <displayName>dataSinkConfig.topic</displayName> + <value>hbase_audit_log_parsed</value> + <description>topic for kafka data sink</description> + </property> + <property> + <name>dataSinkConfig.brokerList</name> + <displayName>dataSinkConfig.brokerList</displayName> + <value>sandbox.hortonworks.com:6667</value> + <description>kafka broker list</description> + </property> + <property> + <name>dataSinkConfig.serializerClass</name> + <displayName>dataSinkConfig.serializerClass</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>serializer class Kafka message value</description> + </property> + <property> + <name>dataSinkConfig.keySerializerClass</name> + <displayName>dataSinkConfig.keySerializerClass</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>serializer class Kafka message key</description> + </property> + </configuration> + <streams> + <stream> + <streamId>gc_log_stream</streamId> + <description>GC Log Stream</description> + <validate>true</validate> + <timeseries>true</timeseries> + <columns> + <column> + <name>action</name> + <type>string</type> + </column> + <column> + <name>host</name> + <type>string</type> + </column> + <column> + <name>status</name> + <type>string</type> + </column> + <column> + <name>timestamp</name> + <type>long</type> + </column> + </columns> + </stream> + </streams> + <docs> + <install> + # Step 1: Create source kafka topic named "${site}_example_source_topic" + + ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1 + + # Step 2: Set up data collector to flow data into kafka topic in + + ./bin/logstash -f log_collector.conf + + ## `log_collector.conf` sample as following: + + input { + + } + filter { + + } + output{ + + } + + # Step 3: start application + + # Step 4: monitor with featured portal or alert with policies + </install> + <uninstall> + # Step 1: stop and uninstall application + # Step 2: delete kafka topic named "${site}_example_source_topic" + # Step 3: stop logstash + </uninstall> + </docs> +</application> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider new file mode 100644 index 0000000..fc9406f --- /dev/null +++ b/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -0,0 +1,37 @@ +# +# /* +# * +# * Licensed to the Apache Software Foundation (ASF) under one or more +# * contributor license agreements. See the NOTICE file distributed with +# * this work for additional information regarding copyright ownership. +# * The ASF licenses this file to You under the Apache License, Version 2.0 +# * (the "License"); you may not use this file except in compliance with +# * the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +# */ +# + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.eagle.gc.GCLogApplicationProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/resources/application.conf b/eagle-gc/src/main/resources/application.conf index ed14063..33fa733 100644 --- a/eagle-gc/src/main/resources/application.conf +++ b/eagle-gc/src/main/resources/application.conf @@ -13,48 +13,40 @@ # See the License for the specific language governing permissions and # limitations under the License. { - "envContextConfig" : { - "env" : "storm", - "mode" : "local", - "topologyName" : "GCAnalysorTopology", - "stormConfigFile" : "gc-storm.yaml", - "parallelismConfig" : { - "kafkaMsgConsumer" : 1 - } + "appId" : "GCLogApp", + "mode" : "LOCAL", + "siteId" : "testsite", + "topology" : { + "numOfTotalWorkers" : 2, + "numOfSpoutTasks" : 2, + "numOfAnalyzerTasks" : 2, + "numOfGeneratorTasks" : 2, + "numOfSinkTasks" : 2 }, "dataSourceConfig": { - "site" : "sandbox", - "topic" : "sandbox-namenode-gc_log", - "consumerGroupId" : "gc.log.eagle.consumer", - "zkSessionTimeoutMs" : 15000, - "zkRetryTimes" : 3, - "zkRetryInterval" : 2000, + "topic" : "gc_log", + "zkConnection" : "server.eagle.apache.org:2181", "zkConnectionTimeoutMS" : 15000, "fetchSize" : 1048586, - "deserializerClass" : "org.apache.eagle.gc.spout.GCLogDeserializer", - "zkConnection" : "localhost:2181", - "transactionZKServers" : "localhost", + "transactionZKServers" : "server.eagle.apache.org", "transactionZKPort" : "2181", "transactionZKRoot" : "/consumers", + "consumerGroupId" : "gc.log.eagle.consumer", "transactionStateUpdateMS" : 2000, - "kafkaEndPoints" : "localhost:6667" + "schemeCls" : "storm.kafka.StringScheme" }, "eagleProps" : { - "site" : "sandbox", - "application": "NNGCLog", - "mailHost" : "www.xyz.com", - "mailSmtpPort":"25", - "mailDebug" : "true", "eagleService": { "host": "localhost", - "port": 38080, + "port": 9090, "username": "admin", "password": "secret" } }, - "dynamicConfigSource" : { - "enabled" : true, - "initDelayMillis" : 0, - "delayMillis" : 30000 + "dataSinkConfig": { + "topic" : "gc_log_parsed", + "brokerList" : "server.eagle.apache.org:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/gc-storm.yaml ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/resources/gc-storm.yaml b/eagle-gc/src/main/resources/gc-storm.yaml deleted file mode 100644 index a68a323..0000000 --- a/eagle-gc/src/main/resources/gc-storm.yaml +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -topology.workers: 1 -topology.acker.executors: 1 -topology.tasks: 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/resources/log4j.properties b/eagle-gc/src/main/resources/log4j.properties index 8a0919a..f5fb8a8 100644 --- a/eagle-gc/src/main/resources/log4j.properties +++ b/eagle-gc/src/main/resources/log4j.properties @@ -19,9 +19,6 @@ eagle.log.dir=./logs eagle.log.file=eagle.log -#log4j.logger.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG -#log4j.logger.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG -#log4j.logger.eagle.executor.AlertExecutor=DEBUG # standard output log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml index ce13ee9..389a481 100644 --- a/eagle-hadoop-metric/pom.xml +++ b/eagle-hadoop-metric/pom.xml @@ -35,6 +35,11 @@ <artifactId>eagle-stream-application-manager</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-app-base</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java new file mode 100644 index 0000000..20ef5d0 --- /dev/null +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.metric; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.impl.StormEnvironment; + +/** + * Since 8/12/16. + */ +public class HadoopJmxApplication extends StormApplication { + @Override + public StormTopology execute(Config config, StormEnvironment environment) { + TopologyBuilder builder = new TopologyBuilder(); + return builder.createTopology(); + } + + public static void main(String[] args){ + Config config = ConfigFactory.load(); + HadoopJmxApplication app = new HadoopJmxApplication(); + app.run(config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java index 9202da4..c0b183b 100644 --- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java @@ -17,7 +17,6 @@ package org.apache.eagle.hadoop.metric; import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.core.StreamProducer; import org.apache.eagle.datastream.storm.StormExecutionEnvironment; /** @@ -26,10 +25,10 @@ import org.apache.eagle.datastream.storm.StormExecutionEnvironment; public class HadoopJmxMetricMonitor { public static void main(String[] args) { - StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class); - String streamName = "hadoopJmxMetricEventStream"; - StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName); - sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor"); - env.execute(); +// StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class); +// String streamName = "hadoopJmxMetricEventStream"; +// StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName); +// sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor"); +// env.execute(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java deleted file mode 100644 index 044a48f..0000000 --- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.hadoop.metric; - - -import com.typesafe.config.Config; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.core.StreamProducer; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.stream.application.TopologyExecutable; - -public class HadoopJmxMetricMonitoringTopology implements TopologyExecutable { - @Override - public void submit(String topology, Config config) { - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config); - String streamName = "hadoopJmxMetricEventStream"; - StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName); - sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor"); - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java new file mode 100644 index 0000000..c738b90 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import backtype.storm.generated.StormTopology; +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.kafka.BrokerHosts; +import storm.kafka.KafkaSpout; +import storm.kafka.SpoutConfig; +import storm.kafka.ZkHosts; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Since 8/12/16. + */ +public class EagleMetricCollectorApplication extends StormApplication{ + private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorApplication.class); + + public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; + public final static String DISTRIBUTION_TASK_NUM = "topology.numOfDistributionTasks"; + + @Override + public StormTopology execute(Config config, StormEnvironment environment) { + String deserClsName = config.getString("dataSourceConfig.deserializerClass"); + final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { + @Override + public List<Object> deserialize(byte[] ser) { + Object tmp = deserializer.deserialize(ser); + Map<String, Object> map = (Map<String, Object>)tmp; + if(tmp == null) return null; + return Arrays.asList(map.get("user"), map.get("timestamp")); + } + }; + + // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method + KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() { + @Override + public BaseRichSpout getSpout(Config context) { + // Kafka topic + String topic = context.getString("dataSourceConfig.topic"); + // Kafka consumer group id + String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId"); + // Kafka fetch size + int fetchSize = context.getInt("dataSourceConfig.fetchSize"); + // Kafka deserializer class + String deserClsName = context.getString("dataSourceConfig.deserializerClass"); + + // Kafka broker zk connection + String zkConnString = context.getString("dataSourceConfig.zkQuorum"); + + // transaction zkRoot + String zkRoot = context.getString("dataSourceConfig.transactionZKRoot"); + + LOG.info(String.format("Use topic id: %s",topic)); + + String brokerZkPath = null; + if(context.hasPath("dataSourceConfig.brokerZkPath")) { + brokerZkPath = context.getString("dataSourceConfig.brokerZkPath"); + } + + BrokerHosts hosts; + if(brokerZkPath == null) { + hosts = new ZkHosts(zkConnString); + } else { + hosts = new ZkHosts(zkConnString, brokerZkPath); + } + + SpoutConfig spoutConfig = new SpoutConfig(hosts, + topic, + zkRoot + "/" + topic, + groupId); + + // transaction zkServers + String[] zkConnections = zkConnString.split(","); + List<String> zkHosts = new ArrayList<>(); + for (String zkConnection : zkConnections) { + zkHosts.add(zkConnection.split(":")[0]); + } + Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]); + + spoutConfig.zkServers = zkHosts; + // transaction zkPort + spoutConfig.zkPort = zkPort; + // transaction update interval + spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS"); + // Kafka fetch size + spoutConfig.fetchSizeBytes = fetchSize; + + spoutConfig.scheme = new SchemeAsMultiScheme(scheme); + KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); + return kafkaSpout; + } + }; + + TopologyBuilder builder = new TopologyBuilder(); + BaseRichSpout spout1 = new KafkaOffsetSourceSpoutProvider().getSpout(config); + BaseRichSpout spout2 = kafkaMessageSpoutProvider.getSpout(config); + + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfDistributionTasks = config.getInt(DISTRIBUTION_TASK_NUM); + + builder.setSpout("kafkaLogLagChecker", spout1, numOfSpoutTasks); + builder.setSpout("kafkaMessageFetcher", spout2, numOfSpoutTasks); + + KafkaMessageDistributionBolt bolt = new KafkaMessageDistributionBolt(config); + BoltDeclarer bolteclarer = builder.setBolt("distributionBolt", bolt, numOfDistributionTasks); + bolteclarer.fieldsGrouping("kafkaLogLagChecker", new Fields("f1")); + bolteclarer.fieldsGrouping("kafkaLogLagChecker", new Fields("f1")); + return builder.createTopology(); + } + + + public static void main(String[] args){ + Config config = ConfigFactory.load(); + EagleMetricCollectorApplication app = new EagleMetricCollectorApplication(); + app.run(config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java deleted file mode 100644 index 4eeda05..0000000 --- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.metric.kafka; - -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.topology.base.BaseRichSpout; -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.BrokerHosts; -import storm.kafka.KafkaSpout; -import storm.kafka.SpoutConfig; -import storm.kafka.ZkHosts; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public class EagleMetricCollectorMain { - - private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class); - - public static void main(String[] args) throws Exception { - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args); - Config config = env.getConfig(); - String deserClsName = config.getString("dataSourceConfig.deserializerClass"); - final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { - @Override - public List<Object> deserialize(byte[] ser) { - Object tmp = deserializer.deserialize(ser); - Map<String, Object> map = (Map<String, Object>)tmp; - if(tmp == null) return null; - return Arrays.asList(map.get("user"), map.get("timestamp")); - } - }; - - - // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method - KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() { - @Override - public BaseRichSpout getSpout(Config context) { - // Kafka topic - String topic = context.getString("dataSourceConfig.topic"); - // Kafka consumer group id - String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId"); - // Kafka fetch size - int fetchSize = context.getInt("dataSourceConfig.fetchSize"); - // Kafka deserializer class - String deserClsName = context.getString("dataSourceConfig.deserializerClass"); - - // Kafka broker zk connection - String zkConnString = context.getString("dataSourceConfig.zkQuorum"); - - // transaction zkRoot - String zkRoot = context.getString("dataSourceConfig.transactionZKRoot"); - - LOG.info(String.format("Use topic id: %s",topic)); - - String brokerZkPath = null; - if(context.hasPath("dataSourceConfig.brokerZkPath")) { - brokerZkPath = context.getString("dataSourceConfig.brokerZkPath"); - } - - BrokerHosts hosts; - if(brokerZkPath == null) { - hosts = new ZkHosts(zkConnString); - } else { - hosts = new ZkHosts(zkConnString, brokerZkPath); - } - - SpoutConfig spoutConfig = new SpoutConfig(hosts, - topic, - zkRoot + "/" + topic, - groupId); - - // transaction zkServers - String[] zkConnections = zkConnString.split(","); - List<String> zkHosts = new ArrayList<>(); - for (String zkConnection : zkConnections) { - zkHosts.add(zkConnection.split(":")[0]); - } - Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]); - - spoutConfig.zkServers = zkHosts; - // transaction zkPort - spoutConfig.zkPort = zkPort; - // transaction update interval - spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS"); - // Kafka fetch size - spoutConfig.fetchSizeBytes = fetchSize; - - spoutConfig.scheme = new SchemeAsMultiScheme(scheme); - KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); - return kafkaSpout; - } - }; - - env.fromSpout(new KafkaOffsetSourceSpoutProvider()).withOutputFields(0).nameAs("kafkaLogLagChecker"); - env.fromSpout(kafkaMessageSpoutProvider).withOutputFields(2).nameAs("kafkaMessageFetcher").groupBy(Arrays.asList(0)) - .flatMap(new KafkaMessageDistributionExecutor()); - env.execute(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java new file mode 100644 index 0000000..0caa64c --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.metric.kafka; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import com.codahale.metrics.MetricRegistry; +import com.typesafe.config.Config; +import org.apache.commons.lang.time.DateUtils; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.datastream.Collector; +import org.apache.eagle.datastream.JavaStormStreamExecutor1; +import org.apache.eagle.metric.reportor.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple1; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class KafkaMessageDistributionBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionBolt.class); + private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000; + private Config config; + private Map<String, String> baseMetricDimension; + private MetricRegistry registry; + private EagleMetricListener listener; + private long granularity; + private OutputCollector collector; + + public KafkaMessageDistributionBolt(Config config){ + this.config = config; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + String site = config.getString("dataSourceConfig.site"); + String topic = config.getString("dataSourceConfig.topic"); + this.baseMetricDimension = new HashMap<>(); + this.baseMetricDimension.put("site", site); + this.baseMetricDimension.put("topic", topic); + registry = new MetricRegistry(); + + this.granularity = DEFAULT_METRIC_GRANULARITY; + if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) { + this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE; + } + + String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); + int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); + String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME); + String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD); + listener = new EagleServiceReporterMetricListener(host, port, username, password); + } + + public String generateMetricKey(String user) { + Map<String, String> dimensions = new HashMap<>(); + dimensions.putAll(baseMetricDimension); + dimensions.put("user", user); + String metricName = "eagle.kafka.message.count"; + String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions); + return encodedMetricName; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void execute(Tuple input) { + try { + String user = input.getString(0); + Long timestamp = input.getLong(1); + String metricKey = generateMetricKey(user); + if (registry.getMetrics().get(metricKey) == null) { + EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity); + metric.registerListener(listener); + registry.register(metricKey, metric); + } + else { + EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey); + metric.update(1, timestamp); + //TODO: if we need to remove metric from registry + } + } + catch (Exception ex) { + LOG.error("Got an exception, ex: ", ex); + }finally { + collector.ack(input); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java deleted file mode 100644 index dbb6e9a..0000000 --- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.metric.kafka; - -import com.codahale.metrics.MetricRegistry; -import com.typesafe.config.Config; -import org.apache.commons.lang.time.DateUtils; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor1; -import org.apache.eagle.metric.reportor.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple1; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> { - - private Config config; - private Map<String, String> baseMetricDimension; - private MetricRegistry registry; - private EagleMetricListener listener; - private long granularity; - private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000; - private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class); - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - String site = config.getString("dataSourceConfig.site"); - String topic = config.getString("dataSourceConfig.topic"); - this.baseMetricDimension = new HashMap<>(); - this.baseMetricDimension.put("site", site); - this.baseMetricDimension.put("topic", topic); - registry = new MetricRegistry(); - - this.granularity = DEFAULT_METRIC_GRANULARITY; - if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) { - this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE; - } - - String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME); - String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD); - listener = new EagleServiceReporterMetricListener(host, port, username, password); - } - - public String generateMetricKey(String user) { - Map<String, String> dimensions = new HashMap<>(); - dimensions.putAll(baseMetricDimension); - dimensions.put("user", user); - String metricName = "eagle.kafka.message.count"; - String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions); - return encodedMetricName; - } - - @Override - public void flatMap(List<Object> input, Collector<Tuple1<String>> collector) { - try { - String user = (String) input.get(0); - Long timestamp = (Long) input.get(1); - String metricKey = generateMetricKey(user); - if (registry.getMetrics().get(metricKey) == null) { - EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity); - metric.registerListener(listener); - registry.register(metricKey, metric); - } - else { - EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey); - metric.update(1, timestamp); - //TODO: if we need to remove metric from registry - } - } - catch (Exception ex) { - LOG.error("Got an exception, ex: ", ex); - } - } -}
