Repository: eagle Updated Branches: refs/heads/master ab5bf1076 -> 0071c79ab
[EAGLE-940] HDFS traffic monitor by the namenode jmx data https://issues.apache.org/jira/browse/EAGLE-940 Author: Zhao, Qingwen <[email protected]> Closes #862 from qingwen220/EAGLE-940. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/0071c79a Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/0071c79a Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/0071c79a Branch: refs/heads/master Commit: 0071c79abfd9d6535fe726721c0a17286b29af43 Parents: ab5bf10 Author: Zhao, Qingwen <[email protected]> Authored: Wed Mar 15 18:27:17 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Wed Mar 15 18:27:17 2017 +0800 ---------------------------------------------------------------------- .../environment/builder/MetricDescriptor.java | 30 ++-- .../app/messaging/KafkaStreamProvider.java | 21 ++- .../hadoop_jmx_collector/hadoop_jmx_config.json | 62 ++++++++ .../hadoop_jmx_collector/hadoop_jmx_kafka.py | 15 +- .../hadoop_jmx_collector/metric_collector.py | 101 ++++++------ .../util/resourcefetch/RMResourceFetcher.java | 4 +- .../auditlog/HdfsAuditLogApplication.java | 62 ++++++++ .../security/auditlog/TopWindowResult.java | 126 +++++++++++++++ .../security/auditlog/TrafficParserBolt.java | 157 +++++++++++++++++++ ...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 97 +++++++++--- .../security/traffic/TopWindowResultTest.java | 61 +++++++ 11 files changed, 646 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java index e79e4d7..c33a92d 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java @@ -29,6 +29,18 @@ public class MetricDescriptor implements Serializable { * Support simple and complex name format, by default using "metric" field. */ private MetricNameSelector metricNameSelector = new FieldMetricNameSelector("metric"); + private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME); + private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site"); + + private static final String DEFAULT_METRIC_GROUP_NAME = "Default"; + + public MetricNameSelector getMetricNameSelector() { + return metricNameSelector; + } + + public void setMetricNameSelector(MetricNameSelector metricNameSelector) { + this.metricNameSelector = metricNameSelector; + } public MetricGroupSelector getMetricGroupSelector() { return metricGroupSelector; @@ -38,12 +50,6 @@ public class MetricDescriptor implements Serializable { this.metricGroupSelector = metricGroupSelector; } - - private static final String DEFAULT_METRIC_GROUP_NAME = "Default"; - - private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME); - private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site"); - /** * Support event/system time, by default using system time. */ @@ -59,17 +65,15 @@ public class MetricDescriptor implements Serializable { */ private int granularity = Calendar.MINUTE; - /** - * Metric value field name. - */ private String valueField = "value"; + private String resourceField = "resource"; - public MetricNameSelector getMetricNameSelector() { - return metricNameSelector; + public String getResourceField() { + return resourceField; } - public void setMetricNameSelector(MetricNameSelector metricNameSelector) { - this.metricNameSelector = metricNameSelector; + public void setResourceField(String resourceField) { + this.resourceField = resourceField; } public String getValueField() { http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java index eaa9ea0..8038d42 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java @@ -27,6 +27,7 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamProvider.class); private static final String DEFAULT_SHARED_SINK_TOPIC_CONF_KEY = "dataSinkConfig.topic"; private static final String DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY = "dataSourceConfig.topic"; + private static final String DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY = "dataSourceConfig.schemeCls"; private String getSinkTopicName(String streamId, Config config) { String streamSpecificTopicConfigKey = String.format("dataSinkConfig.%s.topic",streamId); @@ -43,7 +44,7 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk } private String getSourceTopicName(String streamId, Config config) { - String streamSpecificTopicConfigKey = String.format("dataSourceConfig.%s.topic",streamId);; + String streamSpecificTopicConfigKey = String.format("dataSourceConfig.%s.topic",streamId); if (config.hasPath(streamSpecificTopicConfigKey)) { return config.getString(streamSpecificTopicConfigKey); } else if (config.hasPath(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)) { @@ -55,6 +56,17 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk } } + private String getSourceSchemeCls(String streamId, Config config) { + String streamSpecificSchemeClsKey = String.format("dataSourceConfig.%s.schemeCls", streamId); + if (config.hasPath(streamSpecificSchemeClsKey) ) { + return config.getString(streamSpecificSchemeClsKey); + } else if (config.hasPath(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY)) { + LOG.warn("Using default shared source topic {}: {}", DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY, config.getString(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY)); + return config.getString(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY); + } + return null; + } + @Override public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) { KafkaStreamSinkConfig sinkConfig = new KafkaStreamSinkConfig(); @@ -118,11 +130,12 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk if (hasNonBlankConfigPath(config, "dataSourceConfig.forceFromStart")) { sourceConfig.setForceFromStart(config.getBoolean("dataSourceConfig.forceFromStart")); } - if (hasNonBlankConfigPath(config, "dataSourceConfig.schemeCls")) { + String schemeCls = getSourceSchemeCls(streamId, config); + if (schemeCls != null && StringUtils.isNotBlank(schemeCls)) { try { - sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(config.getString("dataSourceConfig.schemeCls"))); + sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(schemeCls)); } catch (ClassNotFoundException e) { - LOG.error("Class not found error, dataSourceConfig.schemeCls = {}",config.getString("dataSourceConfig.schemeCls"),e); + LOG.error("Class not found error, dataSourceConfig.schemeCls = {}", schemeCls, e); } } return sourceConfig; http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json new file mode 100755 index 0000000..23c89b3 --- /dev/null +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json @@ -0,0 +1,62 @@ +{ + "env": { + "site": "sandbox", + "metric_prefix": "hadoop.", + "log_file": "/tmp/hadoop-jmx-collector.log" + }, + "input": [ + { + "component": "namenode", + "host": "sandbox.hortonworks.com", + "port": "50070", + "https": true + } + ], + "filter": { + "bean_group_filter": ["hadoop","java.lang"], + "metric_name_filter": [ + "hadoop.memory.heapmemoryusage.used", + "hadoop.memory.nonheapmemoryusage.used", + "hadoop.namenode.fsnamesystemstate.capacitytotal", + "hadoop.namenode.fsnamesystemstate.topuseropcounts", + "hadoop.namenode.namenodeinfo.corruptfiles", + "hadoop.namenode.dfs.capacityused", + "hadoop.namenode.dfs.capacityremaining", + "hadoop.namenode.dfs.blockstotal", + "hadoop.namenode.dfs.filestotal", + "hadoop.namenode.dfs.underreplicatedblocks", + "hadoop.namenode.dfs.missingblocks", + "hadoop.namenode.dfs.corruptblocks", + "hadoop.namenode.dfs.lastcheckpointtime", + "hadoop.namenode.dfs.transactionssincelastcheckpoint", + "hadoop.namenode.dfs.lastwrittentransactionid", + "hadoop.namenode.dfs.snapshottabledirectories", + "hadoop.namenode.dfs.snapshots", + "hadoop.namenode.rpc.rpcqueuetimeavgtime", + "hadoop.namenode.rpc.rpcprocessingtimeavgtime", + "hadoop.namenode.rpc.numopenconnections", + "hadoop.namenode.rpc.callqueuelength", + + "hadoop.datanode.fsdatasetstate.capacity", + "hadoop.datanode.fsdatasetstate.dfsused", + "hadoop.datanode.datanodeinfo.xceivercount", + "hadoop.datanode.rpc.rpcqueuetimeavgtime", + "hadoop.datanode.rpc.rpcprocessingtimeavgtime", + "hadoop.datanode.rpc.numopenconnections", + "hadoop.datanode.rpc.callqueuelength" + ] + }, + "output": { + "kafka": { + "debug": false, + "default_topic": "hadoop_jmx_metric_sandbox", + "metric_topic_mapping": { + "hadoop.namenode.namenodeinfo.corruptfiles": "hadoop_jmx_resource_sandbox", + "hadoop.namenode.fsnamesystemstate.topuseropcounts" : "hadoop_jmx_resource_sandbox" + }, + "broker_list": [ + "sandbox.hortonworks.com:6667" + ] + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py index 1b036cd..60c6367 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py @@ -16,7 +16,7 @@ # limitations under the License. # -from metric_collector import JmxMetricCollector,JmxMetricListener,Runner +from metric_collector import JmxMetricCollector,JmxMetricListener,Runner,MetricNameConverter import json, logging, fnmatch, sys class NNSafeModeMetric(JmxMetricListener): @@ -38,10 +38,16 @@ class NNHAMetric(JmxMetricListener): else: self.collector.on_bean_kv(self.PREFIX, component, "hastate", 1) -class NameNodeInfo(JmxMetricListener): +class corruptfilesMetric(JmxMetricListener): def on_metric(self, metric): if metric["metric"] == "hadoop.namenode.namenodeinfo.corruptfiles": - self.collector.collect(metric, "string") + self.collector.collect(metric, "string", MetricNameConverter()) + +class TopUserOpCountsMetric(JmxMetricListener): + def on_metric(self, metric): + if metric["metric"] == "hadoop.namenode.fsnamesystemstate.topuseropcounts": + self.collector.collect(metric, "string", MetricNameConverter()) + class MemoryUsageMetric(JmxMetricListener): PREFIX = "hadoop.namenode.jvm" @@ -107,6 +113,7 @@ if __name__ == '__main__': JournalTransactionInfoMetric(), DatanodeFSDatasetState(), HBaseRegionServerMetric(), - NameNodeInfo() + corruptfilesMetric(), + TopUserOpCountsMetric() ) Runner.run(collector) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index c3fdb43..c83fe6b 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -66,8 +66,8 @@ class Helper: datefmt='%m-%d %H:%M') else: logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s', - datefmt='%m-%d %H:%M') + format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s', + datefmt='%m-%d %H:%M') logging.info("Loaded config from %s", abs_file_path) return config @@ -192,7 +192,7 @@ class MetricSender(object): def open(self): pass - def send(self, msg): + def send(self, msg, converter = None): raise Exception("should be overrode") def close(self): @@ -210,11 +210,14 @@ class KafkaMetricSender(MetricSender): self.default_topic = None if kafka_config.has_key("default_topic"): self.default_topic = kafka_config["default_topic"].encode('utf-8') - logging.info("Using default topic: %s" % self.default_topic) self.component_topic_mapping = {} if kafka_config.has_key("component_topic_mapping"): self.component_topic_mapping = kafka_config["component_topic_mapping"] + self.metric_topic_mapping = {} + if kafka_config.has_key("metric_topic_mapping"): + self.metric_topic_mapping = kafka_config["metric_topic_mapping"] + if not self.default_topic and not bool(self.component_topic_mapping): raise Exception("both kafka config 'topic' and 'component_topic_mapping' are empty") @@ -229,6 +232,11 @@ class KafkaMetricSender(MetricSender): logging.info("Overrode output.kafka.debug: " + str(self.debug_enabled)) def get_topic_id(self, msg): + if msg.has_key("metric"): + metric = msg["metric"] + if self.metric_topic_mapping.has_key(metric): + return self.metric_topic_mapping[metric] + if msg.has_key("component"): component = msg["component"] if self.component_topic_mapping.has_key(component): @@ -247,11 +255,14 @@ class KafkaMetricSender(MetricSender): batch_send_every_t=30) self.start_time = time.time() - def send(self, msg): + def send(self, msg, converter = None): if self.debug_enabled: logging.info("Send message: " + str(msg)) self.sent_count += 1 - self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg)) + topic = self.get_topic_id(msg) + if converter is not None: + converter.convert_metric(msg) + self.kafka_producer.send_messages(topic, json.dumps(msg)) def close(self): logging.info("Closing kafka connection and producer") @@ -267,18 +278,12 @@ class MetricCollector(threading.Thread): filters = [] config = None closed = False - collected_event_count = 0 - ignored_event_count = 0 - emit_event_count = 0 def __init__(self, config=None): threading.Thread.__init__(self) self.config = None self.sender = None self.fqdn = socket.getfqdn() - self.ignored_event_count = 0 - self.collected_event_count = 0 - self.emit_event_count = 0 def init(self, config): self.config = config @@ -301,39 +306,29 @@ class MetricCollector(threading.Thread): def start(self): super(MetricCollector, self).start() - def collect(self, msg, type='float'): - try: - self.collected_event_count = self.collected_event_count + 1 - if not msg.has_key("timestamp"): - msg["timestamp"] = int(round(time.time() * 1000)) - if msg.has_key("value") and type == 'float': - msg["value"] = float(str(msg["value"])) - elif msg.has_key("value") and type == 'string': - msg["value"] = str(msg["value"]) - if not msg.has_key("host") or len(msg["host"]) == 0: - raise Exception("host is null: " + str(msg)) - - if not msg.has_key("site"): - msg["site"] = self.config["env"]["site"] - - if len(self.filters) == 0: - self.emit_event_count = self.emit_event_count + 1 - self.sender.send(msg) - return - else: - for filter in self.filters: - if filter.filter_metric(msg): - self.emit_event_count = self.emit_event_count + 1 - self.sender.send(msg) - return - self.ignored_event_count = self.ignored_event_count + 1 - except Exception as e: - logging.error("Failed to emit metric: %s" % msg) - logging.exception(e) + def collect(self, msg, type = 'float', converter = None): + if not msg.has_key("timestamp"): + msg["timestamp"] = int(round(time.time() * 1000)) + if msg.has_key("value") and type == 'float': + msg["value"] = float(str(msg["value"])) + elif msg.has_key("value") and type == 'string': + msg["value"] = str(msg["value"]) + if not msg.has_key("host") or len(msg["host"]) == 0: + raise Exception("host is null: " + str(msg)) + if not msg.has_key("site"): + msg["site"] = self.config["env"]["site"] + if len(self.filters) == 0: + self.sender.send(msg, converter) + return + else: + for filter in self.filters: + if filter.filter_metric(msg): + self.sender.send(msg, converter) + return + if self.sender.debug_enabled: + logging.info("Drop metric: " + str(msg)) def close(self): - logging.info("Collected %s events (emitted: %s, ignored: %s)" - % (self.collected_event_count, self.emit_event_count, self.ignored_event_count)) self.sender.close() self.closed = True @@ -451,7 +446,7 @@ class JmxMetricCollector(MetricCollector): def jmx_reader(self, source): host = source["host"] if source.has_key("source_host"): - host=source["source_host"] + host=source["source_host"] port=source["port"] https=source["https"] protocol = "https" if https else "http" @@ -582,3 +577,21 @@ class MetricNameFilter(MetricFilter): if fnmatch.fnmatch(metric["metric"], name_filter): return True return False + + +# ======================== +# Metric Converter +# ======================== + +class MetricConverter: + def convert_metric(self, metric): + """ + Convert metric + """ + return True + +class MetricNameConverter(MetricConverter): + def convert_metric(self, metric): + metric["resource"] = metric["metric"] + del metric["metric"] + return True \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java index e1991b0..2e967bc 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java @@ -129,8 +129,6 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { Map<String, AppInfo> result = new HashMap(); List<AppInfo> apps = new ArrayList<>(); try { - selector.checkUrl(); - String limit = ""; int requests = 1; int timeRangePerRequestInMin = 60; @@ -191,7 +189,6 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { Object... parameter) throws Exception { List<AppInfo> apps = new ArrayList<>(); try { - selector.checkUrl(); String url = getAcceptedAppURL(); return doFetchApplicationsList(url, compressionType); } catch (Exception e) { @@ -201,6 +198,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { } private List<AppInfo> getResource(Constants.ResourceType resourceType, Constants.CompressionType compressionType, Object... parameter) throws Exception { + selector.checkUrl(); switch (resourceType) { case COMPLETE_SPARK_JOB: final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), (String) parameter[0]); http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java index 5f300f3..6f33517 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java @@ -21,9 +21,17 @@ package org.apache.eagle.security.auditlog; +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.messaging.StormStreamSink; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; +import org.apache.eagle.security.traffic.HadoopLogAccumulatorBolt; /** * Since 8/11/16. @@ -35,6 +43,60 @@ public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication { } @Override + public StormTopology execute(Config config, StormEnvironment environment) { + TopologyBuilder builder = new TopologyBuilder(); + KafkaSpoutProvider provider = new KafkaSpoutProvider(); + IRichSpout spout = provider.getSpout(config); + + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfParserTasks = config.getInt(PARSER_TASK_NUM); + int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM); + int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM); + int numOfSinkTasks = config.getInt(SINK_TASK_NUM); + + builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); + + BaseRichBolt parserBolt = getParserBolt(config); + BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest"); + boltDeclarer.shuffleGrouping("ingest"); + + HdfsSensitivityDataEnrichBolt sensitivityDataJoinBolt = new HdfsSensitivityDataEnrichBolt(config); + BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks).setNumTasks(numOfSensitivityJoinTasks); + // sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1")); + sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt"); + + // ------------------------------ + // sensitivityJoin -> ipZoneJoin + // ------------------------------ + IPZoneDataEnrichBolt ipZoneDataJoinBolt = new IPZoneDataEnrichBolt(config); + BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks).setNumTasks(numOfIPZoneJoinTasks); + // ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user")); + ipZoneDataJoinBoltDeclarer.shuffleGrouping("sensitivityJoin"); + + // ------------------------ + // ipZoneJoin -> kafkaSink + // ------------------------ + + StormStreamSink sinkBolt = environment.getStreamSink("HDFS_AUDIT_LOG_ENRICHED_STREAM", config); + BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks); + kafkaBoltDeclarer.shuffleGrouping("ipZoneJoin"); + + if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) { + builder.setSpout("trafficSpout", environment.getStreamSource("HADOOP_JMX_RESOURCE_STREAM", config), 1) + .setNumTasks(1); + + builder.setBolt("trafficParserBolt", new TrafficParserBolt(config), 1) + .setNumTasks(1) + .shuffleGrouping("trafficSpout"); + builder.setBolt("trafficSinkBolt", environment.getStreamSink("HDFS_AUDIT_LOG_TRAFFIC_STREAM", config), 1) + .setNumTasks(1) + .shuffleGrouping("trafficParserBolt"); + } + + return builder.createTopology(); + } + + @Override public String getSinkStreamName() { return "hdfs_audit_log_stream"; } http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TopWindowResult.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TopWindowResult.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TopWindowResult.java new file mode 100644 index 0000000..a8bfa4b --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TopWindowResult.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.security.auditlog; + +import java.io.Serializable; +import java.util.List; + +public class TopWindowResult implements Serializable { + private String timestamp; + private List<TopWindow> windows; + + public List<TopWindow> getWindows() { + return windows; + } + + public void setWindows(List<TopWindow> windows) { + this.windows = windows; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public static class TopWindow implements Serializable { + private int windowLenMs; + private List<Op> ops; + + public int getWindowLenMs() { + return windowLenMs; + } + + public void setWindowLenMs(int windowLenMs) { + this.windowLenMs = windowLenMs; + } + + public List<Op> getOps() { + return ops; + } + + public void setOps(List<Op> ops) { + this.ops = ops; + } + + } + + /** + * Represents an operation within a TopWindow. It contains a ranked + * set of the top users for the operation. + */ + public static class Op implements Serializable { + private String opType; + private List<User> topUsers; + private long totalCount; + + public String getOpType() { + return opType; + } + + public void setOpType(String opType) { + this.opType = opType; + } + + public List<User> getTopUsers() { + return topUsers; + } + + public void setTopUsers(List<User> topUsers) { + this.topUsers = topUsers; + } + + public long getTotalCount() { + return totalCount; + } + + public void setTotalCount(long totalCount) { + this.totalCount = totalCount; + } + } + + /** + * Represents a user who called an Op within a TopWindow. Specifies the + * user and the number of times the user called the operation. + */ + public static class User implements Serializable { + private String user; + private long count; + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public long getCount() { + return count; + } + + public void setCount(long count) { + this.count = count; + } + + } + +} + http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TrafficParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TrafficParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TrafficParserBolt.java new file mode 100644 index 0000000..6388a8f --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TrafficParserBolt.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.security.auditlog; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import org.apache.eagle.app.environment.builder.MetricDescriptor; +import org.apache.eagle.app.utils.StreamConvertHelper; +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.security.util.LogParseUtil; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TrafficParserBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(TrafficParserBolt.class); + + private static final String TARGET_METRIC_NAME = "hadoop.namenode.fsnamesystemstate.topuseropcounts"; + private static final String USER_METRIC_FORMAT = "hadoop.hdfs.auditlog.user.%sm.count"; + private static final String CLUSTER_METRIC_FORMAT = "hadoop.hdfs.auditlog.cluster.%sm.count"; + + private static final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"); + + private OutputCollector collector; + private Config config; + private MetricDescriptor metricDescriptor; + private ObjectMapper objectMapper; + private IEagleServiceClient client; + + public TrafficParserBolt(Config config) { + this.config = config; + this.metricDescriptor = new MetricDescriptor(); + this.objectMapper = new ObjectMapper(); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.client = new EagleServiceClientImpl(config); + } + + @Override + public void execute(Tuple input) { + Map event = null; + try { + event = StreamConvertHelper.tupleToEvent(input).f1(); + String resource = (String) event.get(metricDescriptor.getResourceField()); + if (resource.equalsIgnoreCase(TARGET_METRIC_NAME)) { + String value = (String) event.get(metricDescriptor.getValueField()); + TopWindowResult rs = objectMapper.readValue(value, TopWindowResult.class); + long tm = df.parse(rs.getTimestamp()).getTime() / DateTimeUtil.ONEMINUTE * DateTimeUtil.ONEMINUTE; + + for (TopWindowResult.TopWindow topWindow : rs.getWindows()) { + for (TopWindowResult.Op op : topWindow.getOps()) { + if (op.getOpType().equalsIgnoreCase("*")) { + generateMetric(op, topWindow.getWindowLenMs(), tm); + } + } + } + } + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + collector.reportError(ex); + } finally { + collector.ack(input); + } + + } + + private void generateMetric(TopWindowResult.Op op, int windowLen, long timestamp) { + List<GenericMetricEntity> metrics = new ArrayList<>(); + GenericMetricEntity clusterMetric = buildMetricEntity(timestamp, CLUSTER_METRIC_FORMAT, null, op.getTotalCount(), windowLen); + metrics.add(clusterMetric); + collector.emit(new Values("", buildStreamEvent(clusterMetric))); + for (TopWindowResult.User user : op.getTopUsers()) { + GenericMetricEntity metric = buildMetricEntity(timestamp, USER_METRIC_FORMAT, user.getUser(), user.getCount(), windowLen); + metrics.add(metric); + collector.emit(new Values("", buildStreamEvent(metric))); + } + try { + client.create(metrics); + LOG.info("successfully create {} metrics", metrics.size()); + } catch (Exception e) { + LOG.error("create {} metrics failed due to {}", metrics.size(), e.getMessage(), e); + } + } + + private GenericMetricEntity buildMetricEntity(long timestamp, String metricFormat, String user, long count, int windowLen) { + GenericMetricEntity entity = new GenericMetricEntity(); + entity.setTimestamp(timestamp); + entity.setValue(new double[]{Double.valueOf(count)}); + entity.setPrefix(String.format(metricFormat, windowLen / 60000)); + Map<String, String> tags = new HashMap<>(); + tags.put("site", config.getString("siteId")); + tags.put("user", LogParseUtil.parseUserFromUGI(user)); + entity.setTags(tags); + return entity; + } + + private Map<String, Object> buildStreamEvent(GenericMetricEntity entity) { + Map<String, Object> map = new HashMap<>(); + map.put("site", entity.getTags().get("site")); + map.put("user", entity.getTags().get("user")); + map.put("timestamp", entity.getTimestamp()); + map.put("metric", entity.getPrefix()); + map.put("value", entity.getValue()[0]); + return map; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("f1", "f2")); + } + + @Override + public void cleanup() { + if (client != null) { + LOG.info("closing service client..."); + try { + client.close(); + } catch (IOException e) { + LOG.error("close service client failed due to {}", e.getMessage(), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml index 426a78f..d48340c 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml @@ -61,12 +61,14 @@ <value>2</value> <description>number of sink tasks</description> </property> + <!-- <property> <name>topology.numOfTrafficMonitorTasks</name> <displayName>Topology Traffic Monitor Tasks</displayName> <value>2</value> <description>number of traffic monitor tasks</description> </property> + --> <property> <name>topology.message.timeout.secs</name> <displayName>topology message timeout (secs)</displayName> @@ -76,10 +78,17 @@ <!-- data source configurations --> <property> + <name>dataSourceConfig.HADOOP_JMX_RESOURCE_STREAM.topic</name> + <displayName>Kafka Consumer Topic for HDFS Traffic Data</displayName> + <value>hadoop_jmx_resource_${siteId}</value> + <description>kafka topic for data consumption</description> + </property> + <property> <name>dataSourceConfig.topic</name> - <displayName>Kafka Topic for Data Consumption</displayName> - <value>hdfs_audit_log</value> + <displayName>Kafka Consumer Topic for HDFS Auditlog</displayName> + <value>hdfs_audit_log_${siteId}</value> <description>kafka topic for data consumption</description> + <required>true</required> </property> <property> <name>dataSourceConfig.zkConnection</name> @@ -90,12 +99,18 @@ </property> <property> <name>dataSourceConfig.schemeCls</name> - <displayName>Kafka Consumer SchemeCls</displayName> + <displayName>Kafka Consumer SchemeCls for Auditlog</displayName> <value>storm.kafka.StringScheme</value> - <description>scheme class</description> + <description>Kafka spout scheme class</description> <required>true</required> </property> <property> + <name>dataSourceConfig.HADOOP_JMX_RESOURCE_STREAM.schemeCls</name> + <displayName>Kafka Consumer SchemeCls for Traffic Data</displayName> + <value>org.apache.eagle.app.messaging.JsonSchema</value> + <description>Kafka spout scheme class</description> + </property> + <property> <name>dataSourceConfig.timeZone</name> <displayName>Log Time Zone</displayName> <description>time zone of hdfs audit log </description> @@ -113,9 +128,16 @@ <!-- data sink configurations --> <property> - <name>dataSinkConfig.topic</name> - <displayName>Kafka Topic for Parsed Data Sink</displayName> - <value>hdfs_audit_event</value> + <name>dataSinkConfig.HDFS_AUDIT_LOG_ENRICHED_STREAM.topic</name> + <displayName>Kafka Topic for Auditlog Event Sink</displayName> + <value>hdfs_audit_event_${site}</value> + <description>topic for kafka data sink</description> + <required>true</required> + </property> + <property> + <name>dataSinkConfig.HDFS_AUDIT_LOG_TRAFFIC_STREAM.topic</name> + <displayName>Kafka Topic for Traffic Data Sink</displayName> + <value>hdfs_traffic_event_${site}</value> <description>topic for kafka data sink</description> </property> <property> @@ -162,25 +184,28 @@ <value>0</value> <description>value controls when a produce request is considered completed</description> </property> + <property> <name>dataSinkConfig.trafficMonitorEnabled</name> - <displayName>Log Traffic Monitor Enabled</displayName> + <displayName>Auditlog Traffic Monitor Enabled</displayName> <value>false</value> - <description>enable the log throughput calculation</description> + <description>enable the log throughput calculation with the source data generated by Eagle metric collector scripts</description> <required>true</required> </property> - <property> - <name>dataSinkConfig.metricWindowSize</name> - <displayName>Window Size for Traffic Counting</displayName> - <value>10</value> - <description>window size to calculate the throughput</description> - </property> - <property> - <name>dataSinkConfig.metricSinkBatchSize</name> - <displayName>Batch Size for Flushing Traffic Metrics</displayName> - <value>10</value> - <description>batch size of flushing metrics</description> - </property> + <!-- + <property> + <name>dataSinkConfig.metricWindowSize</name> + <displayName>Window Size for Traffic Counting</displayName> + <value>10</value> + <description>window size to calculate the throughput</description> + </property> + <property> + <name>dataSinkConfig.metricSinkBatchSize</name> + <displayName>Batch Size for Flushing Traffic Metrics</displayName> + <value>10</value> + <description>batch size of flushing metrics</description> + </property> + --> <!-- web app related configurations --> <property> @@ -192,7 +217,7 @@ </configuration> <streams> <stream> - <streamId>hdfs_audit_log_enriched_stream</streamId> + <streamId>HDFS_AUDIT_LOG_ENRICHED_STREAM</streamId> <group>Hadoop Log</group> <description>Hdfs Audit Log Enriched Stream</description> <validate>true</validate> @@ -236,6 +261,34 @@ </column> </columns> </stream> + <stream> + <streamId>HDFS_AUDIT_LOG_TRAFFIC_STREAM</streamId> + <group>Hadoop Log</group> + <description>Hadoop JMX Metric Stream including name node, resource manager, etc.</description> + <columns> + <column> + <name>timestamp</name> + <type>long</type> + </column> + <column> + <name>metric</name> + <type>string</type> + </column> + <column> + <name>site</name> + <type>string</type> + </column> + <column> + <name>user</name> + <type>string</type> + </column> + <column> + <name>value</name> + <type>double</type> + <defaultValue>0.0</defaultValue> + </column> + </columns> + </stream> </streams> <docs> <install> http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/traffic/TopWindowResultTest.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/traffic/TopWindowResultTest.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/traffic/TopWindowResultTest.java new file mode 100644 index 0000000..f04dfcc --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/traffic/TopWindowResultTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.security.traffic; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.security.auditlog.TopWindowResult; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.TimeZone; + +public class TopWindowResultTest { + + @Test + public void test() { + ObjectMapper objectMapper = new ObjectMapper(); + TopWindowResult result = null; + String data2 = "{\"timestamp\":\"2017-03-08T00:29:33-0700\",\"windows\":[{\"windowLenMs\":60000,\"ops\":[]},{\"windowLenMs\":300000,\"ops\":[]},{\"windowLenMs\":1500000,\"ops\":[]}]}"; + try { + result = objectMapper.readValue(data2, TopWindowResult.class); + } catch (IOException e) { + e.printStackTrace(); + } + Assert.assertTrue(result != null); + Assert.assertTrue(result.getWindows().size() == 3); + } + + @Test + public void testTime() { + String time = "2017-03-07T21:36:51-0700"; + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"); + + try { + long t1 = df.parse(time).getTime(); + String time2 = "2017-03-07 21:36:51"; + long t2 = DateTimeUtil.humanDateToSeconds(time2, TimeZone.getTimeZone("GMT-7")) * 1000; + Assert.assertTrue(t1 == t2); + } catch (ParseException e) { + e.printStackTrace(); + } + } +}
