http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java new file mode 100644 index 0000000..ccbce98 --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java @@ -0,0 +1,117 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 + * * <p/> + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.security.auditlog; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.commons.lang3.time.DateUtils; +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.dataproc.impl.storm.partition.CustomPartitionGrouping; +import org.apache.eagle.partition.DataDistributionDao; +import org.apache.eagle.partition.PartitionAlgorithm; +import org.apache.eagle.partition.PartitionStrategy; +import org.apache.eagle.partition.PartitionStrategyImpl; +import org.apache.eagle.security.partition.DataDistributionDaoImpl; +import org.apache.eagle.security.partition.GreedyPartitionAlgorithm; +import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider; +import storm.kafka.StringScheme; + +/** + * Since 8/10/16. + */ +public abstract class AbstractHdfsAuditLogApplication extends StormApplication { + public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; + public final static String PARSER_TASK_NUM = "topology.numOfParserTasks"; + public final static String SENSITIVITY_JOIN_TASK_NUM = "topology.numOfSensitivityJoinTasks"; + public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks"; + public final static String SINK_TASK_NUM = "topology.numOfSinkTasks"; + + @Override + public StormTopology execute(Config config, StormEnvironment environment) { + TopologyBuilder builder = new TopologyBuilder(); + NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider(); + IRichSpout spout = provider.getSpout(config); + + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfParserTasks = config.getInt(PARSER_TASK_NUM); + int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM); + int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM); + int numOfSinkTasks = config.getInt(SINK_TASK_NUM); + + builder.setSpout("ingest", spout, numOfSpoutTasks); + + + HdfsAuditLogParserBolt parserBolt = new HdfsAuditLogParserBolt(); + BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks); + + Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition") || config.getBoolean("eagleProps.useDefaultPartition"); + if(useDefaultPartition){ + boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY)); + }else{ + boltDeclarer.customGrouping("ingest", new CustomPartitionGrouping(createStrategy(config))); + } + + FileSensitivityDataJoinBolt sensitivityDataJoinBolt = new FileSensitivityDataJoinBolt(config); + BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks); + sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1")); + + IPZoneDataJoinBolt ipZoneDataJoinBolt = new IPZoneDataJoinBolt(config); + BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks); + ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user")); + + StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream",config); + BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks); + kafkaBoltDeclarer.fieldsGrouping("ipZoneJoin", new Fields("user")); + return builder.createTopology(); + + + } + + public abstract BaseRichBolt getParserBolt(); + public abstract String getSinkStreamName(); + + public static PartitionStrategy createStrategy(Config config) { + // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao + String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); + Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); + String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME); + String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD); + String topic = config.getString("dataSourceConfig.topic"); + DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic); + PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm(); + String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin"; + Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60; + String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin"; + Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60; + PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE); + return strategy; + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java new file mode 100644 index 0000000..6cbbde6 --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.security.auditlog; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import com.typesafe.config.Config; +import org.apache.eagle.datastream.Collector; +import org.apache.eagle.datastream.JavaStormStreamExecutor2; +import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob; +import org.apache.eagle.security.auditlog.util.SimplifyPath; +import org.apache.eagle.security.entity.FileSensitivityAPIEntity; +import org.apache.eagle.security.util.ExternalDataCache; +import org.apache.eagle.security.util.ExternalDataJoiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class FileSensitivityDataJoinBolt extends BaseRichBolt { + private static Logger LOG = LoggerFactory.getLogger(FileSensitivityDataJoinBolt.class); + private Config config; + private OutputCollector collector; + + public FileSensitivityDataJoinBolt(Config config){ + this.config = config; + } + + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + // start hdfs sensitivity data polling + try{ + ExternalDataJoiner joiner = new ExternalDataJoiner( + FileSensitivityPollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex()); + joiner.start(); + }catch(Exception ex){ + LOG.error("Fail bringing up quartz scheduler", ex); + throw new IllegalStateException(ex); + } + } + + @Override + public void execute(Tuple input) { + try { + Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(0); + Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); + Map<String, FileSensitivityAPIEntity> map = + (Map<String, FileSensitivityAPIEntity>) ExternalDataCache.getInstance(). + getJobResult(FileSensitivityPollingJob.class); + FileSensitivityAPIEntity e = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Receive map: " + map + "event: " + event); + } + + String src = (String) event.get("src"); + if (map != null && src != null) { + String simplifiedPath = new SimplifyPath().build(src); + for (String fileDir : map.keySet()) { + Pattern pattern = Pattern.compile(simplifiedPath, Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(fileDir); + boolean isMatched = matcher.matches(); + if (isMatched) { + e = map.get(fileDir); + break; + } + } + } + event.put("sensitivityType", e == null ? "NA" : e.getSensitivityType()); + if (LOG.isDebugEnabled()) { + LOG.debug("After file sensitivity lookup: " + event); + } + // LOG.info(">>>> After file sensitivity lookup: " + event); + collector.emit(Arrays.asList(event.get("user"), event)); + }catch(Exception ex){ + LOG.error("error joining data, ignore it", ex); + }finally { + collector.ack(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("user", "message")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java deleted file mode 100644 index 33d29d0..0000000 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.security.auditlog; - -import com.typesafe.config.Config; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob; -import org.apache.eagle.security.auditlog.util.SimplifyPath; -import org.apache.eagle.security.entity.FileSensitivityAPIEntity; -import org.apache.eagle.security.util.ExternalDataCache; -import org.apache.eagle.security.util.ExternalDataJoiner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.Map; -import java.util.TreeMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class FileSensitivityDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> { - private static final Logger LOG = LoggerFactory.getLogger(FileSensitivityDataJoinExecutor.class); - private Config config; - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - // start IPZone data polling - try{ - ExternalDataJoiner joiner = new ExternalDataJoiner(FileSensitivityPollingJob.class, config, "1"); - joiner.start(); - }catch(Exception ex){ - LOG.error("Fail bring up quartz scheduler", ex); - throw new IllegalStateException(ex); - } - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){ - Map<String, Object> toBeCopied = (Map<String, Object>)input.get(1); - Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); - Map<String, FileSensitivityAPIEntity> map = (Map<String, FileSensitivityAPIEntity>) ExternalDataCache.getInstance().getJobResult(FileSensitivityPollingJob.class); - FileSensitivityAPIEntity e = null; - if (LOG.isDebugEnabled()) { - LOG.debug("Receive map: " + map + "event: " + event); - } - - String src = (String)event.get("src"); - if(map != null && src != null) { - String simplifiedPath = new SimplifyPath().build(src); - for (String fileDir : map.keySet()) { - Pattern pattern = Pattern.compile(simplifiedPath,Pattern.CASE_INSENSITIVE); - Matcher matcher = pattern.matcher(fileDir); - boolean isMatched = matcher.matches(); - if (isMatched) { - e = map.get(fileDir); - break; - } - } - } - event.put("sensitivityType", e == null ? "NA" : e.getSensitivityType()); - if(LOG.isDebugEnabled()) { - LOG.debug("After file sensitivity lookup: " + event); - } - // LOG.info(">>>> After file sensitivity lookup: " + event); - outputCollector.collect(new Tuple2(event.get("user"), event)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java new file mode 100644 index 0000000..fcf9d4f --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java @@ -0,0 +1,34 @@ +/* + * + * * + * * * Licensed to the Apache Software Foundation (ASF) under one or more + * * * contributor license agreements. See the NOTICE file distributed with + * * * this work for additional information regarding copyright ownership. + * * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * * (the "License"); you may not use this file except in compliance with + * * * the License. You may obtain a copy of the License at + * * * <p/> + * * * http://www.apache.org/licenses/LICENSE-2.0 + * * * <p/> + * * * Unless required by applicable law or agreed to in writing, software + * * * distributed under the License is distributed on an "AS IS" BASIS, + * * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * * See the License for the specific language governing permissions and + * * * limitations under the License. + * * + * + */ + +package org.apache.eagle.security.auditlog; + +import org.apache.eagle.app.spi.AbstractApplicationProvider; + +/** + * Since 8/11/16. + */ +public class HdfsAuditLogAppProvider extends AbstractApplicationProvider<HdfsAuditLogApplication> { + @Override + public HdfsAuditLogApplication getApplication() { + return new HdfsAuditLogApplication(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java new file mode 100644 index 0000000..791572b --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java @@ -0,0 +1,47 @@ +/* + * + * * + * * * Licensed to the Apache Software Foundation (ASF) under one or more + * * * contributor license agreements. See the NOTICE file distributed with + * * * this work for additional information regarding copyright ownership. + * * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * * (the "License"); you may not use this file except in compliance with + * * * the License. You may obtain a copy of the License at + * * * <p/> + * * * http://www.apache.org/licenses/LICENSE-2.0 + * * * <p/> + * * * Unless required by applicable law or agreed to in writing, software + * * * distributed under the License is distributed on an "AS IS" BASIS, + * * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * * See the License for the specific language governing permissions and + * * * limitations under the License. + * * + * + */ + +package org.apache.eagle.security.auditlog; + +import backtype.storm.topology.base.BaseRichBolt; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +/** + * Since 8/11/16. + */ +public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication { + @Override + public BaseRichBolt getParserBolt() { + return new HdfsAuditLogParserBolt(); + } + + @Override + public String getSinkStreamName() { + return "hdfs_audit_log_stream"; + } + + public static void main(String[] args){ + Config config = ConfigFactory.load(); + HdfsAuditLogApplication app = new HdfsAuditLogApplication(); + app.run(config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java deleted file mode 100644 index 08ab993..0000000 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.security.auditlog; - -import java.util.Map; -import java.util.Properties; -import java.util.TreeMap; - -import org.apache.eagle.security.hdfs.HDFSAuditLogObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer; -import org.apache.eagle.security.hdfs.HDFSAuditLogParser; -import org.apache.eagle.security.hdfs.HDFSAuditLogObject; - -public class HdfsAuditLogKafkaDeserializer implements SpoutKafkaMessageDeserializer{ - private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogKafkaDeserializer.class); - private Properties props; - - public HdfsAuditLogKafkaDeserializer(Properties props){ - this.props = props; - } - - /** - * the steps for deserializing message from kafka - * 1. convert byte array to string - * 2. parse string to eagle entity - */ - @Override - public Object deserialize(byte[] arg0) { - String logLine = new String(arg0); - - HDFSAuditLogParser parser = new HDFSAuditLogParser(); - HDFSAuditLogObject entity = null; - try{ - entity = parser.parse(logLine); - }catch(Exception ex){ - LOG.error("Failing parse audit log message", ex); - } - if(entity == null){ - LOG.warn("Event ignored as it can't be correctly parsed, the log is ", logLine); - return null; - } - Map<String, Object> map = new TreeMap<String, Object>(); - map.put("src", entity.src); - map.put("dst", entity.dst); - map.put("host", entity.host); - map.put("timestamp", entity.timestamp); - map.put("allowed", entity.allowed); - map.put("user", entity.user); - map.put("cmd", entity.cmd); - - return map; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java deleted file mode 100644 index a7f207e..0000000 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.security.auditlog; - - -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.stream.application.TopologyExecutable; - -public class HdfsAuditLogMonitoringTopology implements TopologyExecutable { - @Override - public void submit(String topology, Config config) { - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config); - KafkaSourcedSpoutProvider provider = HdfsAuditLogProcessorMain.createProvider(env.getConfig()); - Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled"); - if (balancePartition) { - HdfsAuditLogProcessorMain.execWithBalancedPartition(env, provider); - } else { - HdfsAuditLogProcessorMain.execWithDefaultPartition(env, provider); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java new file mode 100644 index 0000000..5ea5950 --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java @@ -0,0 +1,77 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 + * * <p/> + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.security.auditlog; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.security.hdfs.HDFSAuditLogObject; +import org.apache.eagle.security.hdfs.HDFSAuditLogParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +/** + * Since 8/10/16. + */ +public class HdfsAuditLogParserBolt extends BaseRichBolt { + private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class); + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + String logLine = input.getString(0); + + HDFSAuditLogParser parser = new HDFSAuditLogParser(); + HDFSAuditLogObject entity = null; + try{ + entity = parser.parse(logLine); + Map<String, Object> map = new TreeMap<String, Object>(); + map.put("src", entity.src); + map.put("dst", entity.dst); + map.put("host", entity.host); + map.put("timestamp", entity.timestamp); + map.put("allowed", entity.allowed); + map.put("user", entity.user); + map.put("cmd", entity.cmd); + collector.emit(Arrays.asList(map)); + }catch(Exception ex){ + LOG.error("Failing parse audit log message", ex); + }finally { + collector.ack(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("f1")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java deleted file mode 100644 index 60b0e36..0000000 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.security.auditlog; - -import backtype.storm.spout.SchemeAsMultiScheme; -import com.typesafe.config.Config; -import org.apache.commons.lang3.time.DateUtils; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.core.StreamProducer; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.partition.DataDistributionDao; -import org.apache.eagle.partition.PartitionAlgorithm; -import org.apache.eagle.partition.PartitionStrategy; -import org.apache.eagle.partition.PartitionStrategyImpl; -import org.apache.eagle.security.partition.DataDistributionDaoImpl; -import org.apache.eagle.security.partition.GreedyPartitionAlgorithm; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public class HdfsAuditLogProcessorMain { - public static PartitionStrategy createStrategy(Config config) { - // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao - String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME); - String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD); - String topic = config.getString("dataSourceConfig.topic"); - DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic); - PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm(); - String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin"; - Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60; - String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin"; - Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60; - PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE); - return strategy; - } - - public static KafkaSourcedSpoutProvider createProvider(Config config) { - String deserClsName = config.getString("dataSourceConfig.deserializerClass"); - final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { - @Override - public List<Object> deserialize(byte[] ser) { - Object tmp = deserializer.deserialize(ser); - Map<String, Object> map = (Map<String, Object>)tmp; - if(tmp == null) return null; - return Arrays.asList(map.get("user"), tmp); - } - }; - - KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() { - @Override - public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) { - return new SchemeAsMultiScheme(scheme); - } - }; - return provider; - } - - @SuppressWarnings("unchecked") - public static void execWithDefaultPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) { - StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(Arrays.asList(0)); - //StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0)); - //source.streamUnion(reassembler) - source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0)) - .flatMap(new IPZoneDataJoinExecutor()) - .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor"); - env.execute(); - } - - @SuppressWarnings("unchecked") - public static void execWithBalancedPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) { - PartitionStrategy strategy = createStrategy(env.getConfig()); - StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(strategy); - //StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0)); - //source.streamUnion(reassembler) - source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0)) - .flatMap(new IPZoneDataJoinExecutor()) - .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor"); - env.execute(); - } - - public static void main(String[] args) throws Exception{ - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args); - Config config = env.getConfig(); - KafkaSourcedSpoutProvider provider = createProvider(env.getConfig()); - Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled"); - if (balancePartition) { - execWithBalancedPartition(env, provider); - } else { - execWithDefaultPartition(env, provider); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java new file mode 100644 index 0000000..d02f959 --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.security.auditlog; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import com.typesafe.config.Config; +import org.apache.eagle.datastream.Collector; +import org.apache.eagle.datastream.JavaStormStreamExecutor2; +import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob; +import org.apache.eagle.security.auditlog.timer.IPZonePollingJob; +import org.apache.eagle.security.entity.IPZoneEntity; +import org.apache.eagle.security.util.ExternalDataCache; +import org.apache.eagle.security.util.ExternalDataJoiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +public class IPZoneDataJoinBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataJoinBolt.class); + private Config config; + private OutputCollector collector; + + public IPZoneDataJoinBolt(Config config){ + this.config = config; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + // start ipzone data polling + try{ + ExternalDataJoiner joiner = new ExternalDataJoiner(IPZonePollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex()); + joiner.start(); + }catch(Exception ex){ + LOG.error("Fail bring up quartz scheduler", ex); + throw new IllegalStateException(ex); + } + } + + @Override + public void execute(Tuple input) { + try { + Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(1); + Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow copy + Map<String, IPZoneEntity> map = (Map<String, IPZoneEntity>) ExternalDataCache.getInstance().getJobResult(IPZonePollingJob.class); + IPZoneEntity e = null; + if (map != null) { + e = map.get(event.get("host")); + } + event.put("securityZone", e == null ? "NA" : e.getSecurityZone()); + if (LOG.isDebugEnabled()) LOG.debug("After IP zone lookup: " + event); + collector.emit(Arrays.asList(event.get("user"), event)); + }catch(Exception ex){ + LOG.error("error joining data, ignore it", ex); + }finally { + collector.ack(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("user", "message")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java deleted file mode 100644 index d633dcd..0000000 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.security.auditlog; - -import com.typesafe.config.Config; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.security.auditlog.timer.IPZonePollingJob; -import org.apache.eagle.security.entity.IPZoneEntity; -import org.apache.eagle.security.util.ExternalDataCache; -import org.apache.eagle.security.util.ExternalDataJoiner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.Map; -import java.util.TreeMap; - -public class IPZoneDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> { - private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataJoinExecutor.class); - private Config config; - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - // start IPZone data polling - try{ - ExternalDataJoiner joiner = new ExternalDataJoiner(IPZonePollingJob.class, config, "1"); - joiner.start(); - }catch(Exception ex){ - LOG.error("Fail bring up quartz scheduler", ex); - throw new IllegalStateException(ex); - } - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){ - Map<String, Object> toBeCopied = (Map<String, Object>)input.get(1); - Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow copy - Map<String, IPZoneEntity> map = (Map<String, IPZoneEntity>) ExternalDataCache.getInstance().getJobResult(IPZonePollingJob.class); - IPZoneEntity e = null; - if(map != null){ - e = map.get(event.get("host")); - } - event.put("securityZone", e == null ? "NA" : e.getSecurityZone()); - if(LOG.isDebugEnabled()) LOG.debug("After IP zone lookup: " + event); - outputCollector.collect(new Tuple2(event.get("user"), event)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java index a4fed79..375edc7 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java @@ -16,10 +16,14 @@ */ package org.apache.eagle.security.auditlog.timer; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.security.service.HdfsSensitivityEntity; +import org.apache.eagle.security.service.IMetadataServiceClient; +import org.apache.eagle.security.service.MetadataServiceClientImpl; import org.apache.eagle.security.util.ExternalDataCache; import org.apache.eagle.security.entity.FileSensitivityAPIEntity; import org.quartz.Job; @@ -29,9 +33,6 @@ import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import com.google.common.base.Function; import com.google.common.collect.Maps; @@ -43,15 +44,15 @@ public class FileSensitivityPollingJob implements Job{ throws JobExecutionException { JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); try{ - List<FileSensitivityAPIEntity> ipZones = load(jobDataMap); - if(ipZones == null){ + Collection<HdfsSensitivityEntity> sensitivityAPIEntities = load(jobDataMap); + if(sensitivityAPIEntities == null){ LOG.warn("File sensitivity information is empty"); return; } - Map<String, FileSensitivityAPIEntity> map = Maps.uniqueIndex(ipZones, new Function<FileSensitivityAPIEntity, String>(){ + Map<String, HdfsSensitivityEntity> map = Maps.uniqueIndex(sensitivityAPIEntities, new Function<HdfsSensitivityEntity, String>(){ @Override - public String apply(FileSensitivityAPIEntity input) { - return input.getTags().get("filedir"); + public String apply(HdfsSensitivityEntity input) { + return input.getFiledir(); } }); ExternalDataCache.getInstance().setJobResult(getClass(), map); @@ -60,7 +61,7 @@ public class FileSensitivityPollingJob implements Job{ } } - private List<FileSensitivityAPIEntity> load(JobDataMap jobDataMap) throws Exception{ + private Collection<HdfsSensitivityEntity> load(JobDataMap jobDataMap) throws Exception{ Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE); String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST); Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString()); @@ -68,15 +69,7 @@ public class FileSensitivityPollingJob implements Job{ String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null; // load from eagle database LOG.info("Load file sensitivity information from eagle service " + eagleServiceHost + ":" + eagleServicePort); - IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password); - String query = "FileSensitivityService[]{*}"; - GenericServiceAPIResponseEntity<FileSensitivityAPIEntity> response = client.search() - .pageSize(Integer.MAX_VALUE) - .query(query) - .send(); - client.close(); - if(response.getException() != null) - throw new IllegalStateException(response.getException()); - return response.getObj(); + IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest"); + return client.listHdfsSensitivities(); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java index 2f7efc8..dc80eb9 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java @@ -16,10 +16,13 @@ */ package org.apache.eagle.security.auditlog.timer; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.security.service.IMetadataServiceClient; +import org.apache.eagle.security.service.MetadataServiceClientImpl; import org.apache.eagle.security.util.ExternalDataCache; import org.quartz.Job; import org.quartz.JobDataMap; @@ -29,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.security.entity.IPZoneEntity; +import org.apache.eagle.security.service.IPZoneEntity; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import com.google.common.base.Function; @@ -44,7 +47,7 @@ public class IPZonePollingJob implements Job{ throws JobExecutionException { JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); try{ - List<IPZoneEntity> ipZones = load(jobDataMap); + Collection<IPZoneEntity> ipZones = load(jobDataMap); if(ipZones == null){ LOG.warn("Ipzone information is empty"); return; @@ -52,7 +55,7 @@ public class IPZonePollingJob implements Job{ Map<String, IPZoneEntity> map = Maps.uniqueIndex(ipZones, new Function<IPZoneEntity, String>(){ @Override public String apply(IPZoneEntity input) { - return input.getTags().get("iphost"); + return input.getIphost(); } }); ExternalDataCache.getInstance().setJobResult(getClass(), map); @@ -61,7 +64,7 @@ public class IPZonePollingJob implements Job{ } } - private List<IPZoneEntity> load(JobDataMap jobDataMap) throws Exception{ + private Collection<IPZoneEntity> load(JobDataMap jobDataMap) throws Exception{ Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE); String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST); Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString()); @@ -69,15 +72,7 @@ public class IPZonePollingJob implements Job{ String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null; // load from eagle database LOG.info("Load ip zone information from eagle service " + eagleServiceHost + ":" + eagleServicePort); - IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password); - String query = "IPZoneService[]{*}"; - GenericServiceAPIResponseEntity<IPZoneEntity> response = client.search() - .pageSize(Integer.MAX_VALUE) - .query(query) - .send(); - client.close(); - if(response.getException() != null) - throw new IllegalStateException(response.getException()); - return response.getObj(); + IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest"); + return client.listIPZones(); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml new file mode 100644 index 0000000..dadab98 --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml @@ -0,0 +1,247 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ /* + ~ * + ~ * * Licensed to the Apache Software Foundation (ASF) under one or more + ~ * * contributor license agreements. See the NOTICE file distributed with + ~ * * this work for additional information regarding copyright ownership. + ~ * * The ASF licenses this file to You under the Apache License, Version 2.0 + ~ * * (the "License"); you may not use this file except in compliance with + ~ * * the License. You may obtain a copy of the License at + ~ * * <p/> + ~ * * http://www.apache.org/licenses/LICENSE-2.0 + ~ * * <p/> + ~ * * Unless required by applicable law or agreed to in writing, software + ~ * * distributed under the License is distributed on an "AS IS" BASIS, + ~ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ * * See the License for the specific language governing permissions and + ~ * * limitations under the License. + ~ * + ~ */ + --> + +<application> + <type>HdfsAuditLogApplication</type> + <name>Hdfs Audit Log Monitoring Application</name> + <version>0.5.0-incubating</version> + <appClass>org.apache.eagle.security.auditlog.HdfsAuditLogApplication</appClass> + <viewPath>/apps/example</viewPath> + <configuration> + <property> + <name>dataSourceConfig.topic</name> + <displayName>dataSourceConfig.topic</displayName> + <value>hdfs_audit_log</value> + <description>data source topic</description> + </property> + <property> + <name>dataSourceConfig.zkConnection</name> + <displayName>dataSourceConfig.zkConnection</displayName> + <value>server.eagle.apache.org</value> + <description>zk connection</description> + </property> + <property> + <name>dataSourceConfig.zkConnectionTimeoutMS</name> + <displayName>dataSourceConfig.zkConnectionTimeoutMS</displayName> + <value>15000</value> + <description>zk connection timeout in milliseconds</description> + </property> + <property> + <name>dataSourceConfig.fetchSize</name> + <displayName>dataSourceConfig.fetchSize</displayName> + <value>1048586</value> + <description>kafka fetch size</description> + </property> + <property> + <name>dataSourceConfig.transactionZKServers</name> + <displayName>dataSourceConfig.transactionZKServers</displayName> + <value>server.eagle.apache.org</value> + <description>zookeeper server for offset transaction</description> + </property> + <property> + <name>dataSourceConfig.transactionZKPort</name> + <displayName>dataSourceConfig.transactionZKPort</displayName> + <value>2181</value> + <description>zookeeper server port for offset transaction</description> + </property> + <property> + <name>dataSourceConfig.transactionZKRoot</name> + <displayName>dataSourceConfig.transactionZKRoot</displayName> + <value>/consumers</value> + <description>offset transaction root</description> + </property> + <property> + <name>dataSourceConfig.consumerGroupId</name> + <displayName>dataSourceConfig.consumerGroupId</displayName> + <value>eagle.hdfsaudit.consumer</value> + <description>kafka consumer group Id</description> + </property> + <property> + <name>dataSourceConfig.transactionStateUpdateMS</name> + <displayName>dataSourceConfig.transactionStateUpdateMS</displayName> + <value>2000</value> + <description>zk upate</description> + </property> + <property> + <name>dataSourceConfig.schemeCls</name> + <displayName>dataSourceConfig.schemeCls</displayName> + <value>storm.kafka.StringScheme</value> + <description>scheme class</description> + </property> + <property> + <name>dataSourceConfig.transactionZKPort</name> + <displayName>dataSourceConfig.transactionZKPort</displayName> + <value>2181</value> + <description>zookeeper server port for offset transaction</description> + </property> + <property> + <name>dataSourceConfig.transactionZKPort</name> + <displayName>dataSourceConfig.transactionZKPort</displayName> + <value>2181</value> + <description>zookeeper server port for offset transaction</description> + </property> + <property> + <name>topology.numOfSpoutTasks</name> + <displayName>topology.numOfSpoutTasks</displayName> + <value>2</value> + <description>number of spout tasks</description> + </property> + <property> + <name>topology.numOfParserTasks</name> + <displayName>topology.numOfParserTasks</displayName> + <value>2</value> + <description>number of parser tasks</description> + </property> + <property> + <name>topology.numOfJoinTasks</name> + <displayName>topology.numOfJoinTasks</displayName> + <value>2</value> + <description>number of external join tasks</description> + </property> + <property> + <name>topology.numOfSinkTasks</name> + <displayName>topology.numOfSinkTasks</displayName> + <value>2</value> + <description>number of sink tasks</description> + </property> + <property> + <name>eagleProps.dataJoinPollIntervalSec</name> + <displayName>eagleProps.dataJoinPollIntervalSec</displayName> + <value>30</value> + <description>interval in seconds for polling</description> + </property> + <property> + <name>eagleProps.eagleService.host</name> + <displayName>eagleProps.eagleService.host</displayName> + <value>localhost</value> + <description>eagle service host</description> + </property> + <property> + <name>eagleProps.eagleService.port</name> + <displayName>eagleProps.eagleService.port</displayName> + <value>8080</value> + <description>eagle service port</description> + </property> + <property> + <name>eagleProps.eagleService.username</name> + <displayName>eagleProps.eagleService.username</displayName> + <value>admin</value> + <description>eagle service username</description> + </property> + <property> + <name>eagleProps.eagleService.password</name> + <displayName>eagleProps.eagleService.password</displayName> + <value>secret</value> + <description>eagle service password</description> + </property> + <property> + <name>dataSinkConfig.topic</name> + <displayName>dataSinkConfig.topic</displayName> + <value>hdfs_audit_log_parsed</value> + <description>topic for kafka data sink</description> + </property> + <property> + <name>dataSinkConfig.brokerList</name> + <displayName>dataSinkConfig.brokerList</displayName> + <value>sandbox.hortonworks.com:6667</value> + <description>kafka broker list</description> + </property> + <property> + <name>dataSinkConfig.serializerClass</name> + <displayName>dataSinkConfig.serializerClass</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>serializer class Kafka message value</description> + </property> + <property> + <name>dataSinkConfig.keySerializerClass</name> + <displayName>dataSinkConfig.keySerializerClass</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>serializer class Kafka message key</description> + </property> + + <!-- properties for hdfs file system access and attribute resolver--> + <property> + <name>fs.defaultFS</name> + <displayName>fs.defaultFS</displayName> + <value>hdfs://server.eagle.apache.org:8020</value> + <description>hdfs endpoint</description> + </property> + </configuration> + <streams> + <stream> + <streamId>hdfs_audit_log_stream</streamId> + <description>Hdfs Audit Log Stream</description> + <validate>true</validate> + <timeseries>true</timeseries> + <columns> + <column> + <name>action</name> + <type>string</type> + </column> + <column> + <name>host</name> + <type>string</type> + </column> + <column> + <name>status</name> + <type>string</type> + </column> + <column> + <name>timestamp</name> + <type>long</type> + </column> + </columns> + </stream> + </streams> + <docs> + <install> +# Step 1: Create source kafka topic named "${site}_example_source_topic" + +./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1 + +# Step 2: Set up data collector to flow data into kafka topic in + +./bin/logstash -f log_collector.conf + +## `log_collector.conf` sample as following: + +input { + +} +filter { + +} +output{ + +} + +# Step 3: start application + +# Step 4: monitor with featured portal or alert with policies + </install> + <uninstall> +# Step 1: stop and uninstall application +# Step 2: delete kafka topic named "${site}_example_source_topic" +# Step 3: stop logstash + </uninstall> + </docs> +</application> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider new file mode 100644 index 0000000..42cf62b --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -0,0 +1,37 @@ +# +# /* +# * +# * * Licensed to the Apache Software Foundation (ASF) under one or more +# * * contributor license agreements. See the NOTICE file distributed with +# * * this work for additional information regarding copyright ownership. +# * * The ASF licenses this file to You under the Apache License, Version 2.0 +# * * (the "License"); you may not use this file except in compliance with +# * * the License. You may obtain a copy of the License at +# * * <p/> +# * * http://www.apache.org/licenses/LICENSE-2.0 +# * * <p/> +# * * Unless required by applicable law or agreed to in writing, software +# * * distributed under the License is distributed on an "AS IS" BASIS, +# * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * * See the License for the specific language governing permissions and +# * * limitations under the License. +# * +# */ +# + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf index 3c3572e..efa6467 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf @@ -14,56 +14,42 @@ # limitations under the License. { - "envContextConfig" : { - "env" : "storm", - "mode" : "local", - "topologyName" : "auditLogProcessTopology", - "stormConfigFile" : "security-auditlog-storm.yaml", - "parallelismConfig" : { - "kafkaMsgConsumer" : 1, - "hdfsAuditLogAlertExecutor*" : 1 - } + "appId" : "HdfsAuditLogApp", + "mode" : "LOCAL", + "siteId" : "testsite", + "topology" : { + "numOfTotalWorkers" : 2, + "numOfSpoutTasks" : 2, + "numOfParserTasks" : 2, + "numOfSensitivityJoinTasks" : 2, + "numOfIPZoneJoinTasks" : 2, + "numOfSinkTasks" : 2 }, "dataSourceConfig": { - "topic" : "sandbox_hdfs_audit_log", - "zkConnection" : "sandbox.hortonworks.com:2181", + "topic" : "hdfs_audit_log", + "zkConnection" : "server.eagle.apache.org:2181", "zkConnectionTimeoutMS" : 15000, "consumerGroupId" : "EagleConsumer", "fetchSize" : 1048586, - "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer", - "transactionZKServers" : "sandbox.hortonworks.com", + "transactionZKServers" : "server.eagle.apache.org", "transactionZKPort" : 2181, "transactionZKRoot" : "/consumers", - "transactionStateUpdateMS" : 2000 - }, - "alertExecutorConfigs" : { - "hdfsAuditLogAlertExecutor" : { - "parallelism" : 1, - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } + "transactionStateUpdateMS" : 2000, + "schemeCls" : "storm.kafka.StringScheme" }, "eagleProps" : { - "site" : "sandbox", - "application": "hdfsAuditLog", "dataJoinPollIntervalSec" : 30, - "mailHost" : "mailHost.com", - "mailSmtpPort":"25", - "mailDebug" : "true", - "balancePartitionEnabled" : true, - #"partitionRefreshIntervalInMin" : 60, - #"kafkaStatisticRangeInMin" : 60, "eagleService": { "host": "localhost", - "port": 38080, + "port": 9090, "username": "admin", "password": "secret" - }, - "readHdfsUserCommandPatternFrom" : "file" + } }, - "dynamicConfigSource" : { - "enabled" : true, - "initDelayMillis" : 0, - "delayMillis" : 30000 + "dataSinkConfig": { + "topic" : "hdfs_audit_log_parsed", + "brokerList" : "server.eagle.apache.org:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties index 4a22987..e442c46 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties @@ -13,17 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=DEBUG, stdout, DRFA +log4j.rootLogger=INFO, stdout, DRFA eagle.log.dir=./logs eagle.log.file=eagle.log -#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG -#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG -log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG -#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG -# standard output log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml deleted file mode 100644 index a68a323..0000000 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -topology.workers: 1 -topology.acker.executors: 1 -topology.tasks: 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java index a19e9b6..753eb41 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java @@ -22,6 +22,8 @@ package org.apache.eagle.security.auditlog; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.eagle.datastream.Collector; +import org.apache.eagle.security.hdfs.HDFSAuditLogObject; +import org.apache.eagle.security.hdfs.HDFSAuditLogParser; import org.junit.Assert; import org.junit.Test; import scala.Tuple2; @@ -32,9 +34,18 @@ import java.util.*; * Created by yonzhang on 11/24/15. */ public class TestUserCommandReassembler { - private Map<String, Object> parseEvent(String log){ - HdfsAuditLogKafkaDeserializer deserializer = new HdfsAuditLogKafkaDeserializer(null); - return (Map<String, Object>)deserializer.deserialize(log.getBytes()); + private Map parseEvent(String log) throws Exception{ + HDFSAuditLogParser deserializer = new HDFSAuditLogParser(); + HDFSAuditLogObject entity = deserializer.parse(log); + Map<String, Object> map = new TreeMap<String, Object>(); + map.put("src", entity.src); + map.put("dst", entity.dst); + map.put("host", entity.host); + map.put("timestamp", entity.timestamp); + map.put("allowed", entity.allowed); + map.put("user", entity.user); + map.put("cmd", entity.cmd); + return map; } /** http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog new file mode 100644 index 0000000..361304d --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog @@ -0,0 +1,17 @@ +2015-04-24 12:49:16,145 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc +2015-04-24 12:49:16,192 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user/ambari-qa dst=null perm=null proto=rpc +2015-04-24 12:49:20,518 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc +2015-04-24 12:49:20,570 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user/ambari-qa dst=null perm=null proto=rpc +2015-04-24 12:49:20,587 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/ dst=null perm=null proto=rpc +2015-04-24 12:49:20,664 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=mkdirs src=/tmp dst=null perm=hdfs:hdfs:rwxr-xr-x proto=rpc +2015-04-24 12:49:20,677 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user dst=null perm=null proto=rpc +2015-04-24 12:49:20,686 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=mkdirs src=/user/ambari-qa dst=null perm=hdfs:hdfs:rwxr-xr-x proto=rpc +2015-04-24 12:49:24,828 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc +2015-04-24 12:49:24,915 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=setPermission src=/tmp dst=null perm=hdfs:hdfs:rwxrwxrwx proto=rpc +2015-04-24 12:49:29,375 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user/ambari-qa dst=null perm=null proto=rpc +2015-04-24 12:49:29,453 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=setPermission src=/user/ambari-qa dst=null perm=hdfs:hdfs:rwxrwx--- proto=rpc +2015-04-24 12:49:33,542 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc +2015-04-24 12:49:37,844 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user/ambari-qa dst=null perm=null proto=rpc +2015-04-24 12:49:37,929 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=setOwner src=/user/ambari-qa dst=null perm=ambari-qa:hdfs:rwxrwx--- proto=rpc +2015-04-24 12:51:31,798 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/apps/hbase/data dst=null perm=null proto=rpc +2015-04-24 12:51:31,863 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/apps/hbase/staging dst=null perm=null proto=rpc http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java index b2a2671..a0a230a 100644 --- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java +++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java @@ -16,6 +16,8 @@ */ package org.apache.eagle.service.security.hdfs.resolver; +import com.typesafe.config.Config; +import org.apache.eagle.metadata.service.ApplicationEntityService; import org.apache.eagle.service.alert.resolver.AttributeResolvable; import org.apache.eagle.service.alert.resolver.AttributeResolveException; import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException; @@ -33,6 +35,10 @@ import java.util.regex.Pattern; public class HDFSCommandResolver implements AttributeResolvable<GenericAttributeResolveRequest,String> { private final static Logger LOG = LoggerFactory.getLogger(HDFSCommandResolver.class); + public HDFSCommandResolver(ApplicationEntityService entityService, Config eagleServerConfig){ + + } + private final static String [] cmdStrs = {"open", "create", "append", "delete", "listfileinfo", "rename", "mkdirs", "listStatus", "setReplication", "setOwner", "setPermission", "setTimes", "setXAttr", "removeXAttr", "getXAttrs", "contentSummary", "createEncryptionZone", "checkAccess"}; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java index 4326c93..370d9a3 100644 --- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java +++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java @@ -18,16 +18,19 @@ package org.apache.eagle.service.security.hdfs.resolver; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.typesafe.config.Config; -import org.apache.eagle.security.resolver.MetadataAccessConfigRepo; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.metadata.service.ApplicationEntityService; import org.apache.eagle.service.alert.resolver.AttributeResolvable; import org.apache.eagle.service.alert.resolver.AttributeResolveException; import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException; import org.apache.eagle.service.alert.resolver.GenericAttributeResolveRequest; +import org.apache.eagle.service.security.hdfs.rest.HDFSResourceWebResource; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; @@ -45,6 +48,12 @@ import org.apache.eagle.service.security.hdfs.HDFSResourceConstants; */ public class HDFSResourceResolver implements AttributeResolvable<GenericAttributeResolveRequest,String> { private final static Logger LOG = LoggerFactory.getLogger(HDFSResourceResolver.class); + private ApplicationEntityService entityService; + + public HDFSResourceResolver(ApplicationEntityService entityService, Config eagleServerConfig){ + this.entityService = entityService; + } + /** * HDFS Resource Resolve API * @@ -54,10 +63,9 @@ public class HDFSResourceResolver implements AttributeResolvable<GenericAttribu public List<String> resolve(GenericAttributeResolveRequest request) throws AttributeResolveException { List<String> result = new ArrayList<>(); - MetadataAccessConfigRepo repo = new MetadataAccessConfigRepo(); try { - Config config = repo.getConfig(HDFSResourceConstants.HDFS_APPLICATION, request.getSite().trim()); - Configuration conf = repo.convert(config); + Map<String, Object> config = getAppConfig(request.getSite(), HDFSResourceWebResource.HDFS_APPLICATION); + Configuration conf = convert(config); HDFSFileSystem fileSystem = new HDFSFileSystem(conf); String query = request.getQuery().trim(); List<FileStatus> fileStatuses = null; @@ -86,6 +94,19 @@ public class HDFSResourceResolver implements AttributeResolvable<GenericAttribu } } + private Map<String, Object> getAppConfig(String site, String appType){ + ApplicationEntity entity = entityService.getBySiteIdAndAppType(site, appType); + return entity.getConfiguration(); + } + + private Configuration convert(Map<String, Object> originalConfig) throws Exception { + Configuration config = new Configuration(); + for (Map.Entry<String, Object> entry : originalConfig.entrySet()) { + config.set(entry.getKey().toString(), entry.getValue().toString()); + } + return config; + } + /** * Validate the Passed Request Object * It should have Site Id and File Path http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java index f1d8808..5f3ec54 100644 --- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java +++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java @@ -16,6 +16,13 @@ */ package org.apache.eagle.service.security.hdfs.resolver; +import com.google.inject.Inject; +import com.typesafe.config.Config; +import org.apache.eagle.metadata.service.ApplicationEntityService; +import org.apache.eagle.security.service.HBaseSensitivityEntity; +import org.apache.eagle.security.service.HdfsSensitivityEntity; +import org.apache.eagle.security.service.ISecurityMetadataDAO; +import org.apache.eagle.security.service.MetadataDaoFactory; import org.apache.eagle.service.alert.resolver.AttributeResolvable; import org.apache.eagle.service.alert.resolver.AttributeResolveException; import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException; @@ -24,16 +31,17 @@ import org.apache.eagle.service.security.hdfs.HDFSResourceSensitivityService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.regex.Pattern; public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericAttributeResolveRequest,String> { private final static Logger LOG = LoggerFactory.getLogger(HDFSSensitivityTypeResolver.class); - private HDFSResourceSensitivityService dao = new HDFSResourceSensitivityService(); - private Map<String, Map<String, String>> maps = dao.getAllFileSensitivityMap(); + private ISecurityMetadataDAO dao; + @Inject + public HDFSSensitivityTypeResolver(ApplicationEntityService entityService, Config eagleServerConfig){ + dao = MetadataDaoFactory.getMetadataDAO(eagleServerConfig); + } private final static String SENSITIVETYPE_ATTRIBUTE_RESOLVE_FORMAT_HINT = "Sensitive type should be composed of a-z, A-Z, 0-9 or -"; @@ -41,6 +49,7 @@ public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericA String query = request.getQuery().trim(); String site = request.getSite().trim(); List<String> res = new ArrayList<>(); + Map<String, Map<String, String>> maps = getAllSensitivities(); Map<String, String> map = maps.get(site); if(map == null) { @@ -72,4 +81,16 @@ public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericA public Class<GenericAttributeResolveRequest> getRequestClass() { return GenericAttributeResolveRequest.class; } + + private Map<String, Map<String, String>> getAllSensitivities(){ + Map<String, Map<String, String>> all = new HashMap<>(); + Collection<HdfsSensitivityEntity> entities = dao.listHdfsSensitivities(); + for(HdfsSensitivityEntity entity : entities){ + if(!all.containsKey(entity.getSite())){ + all.put(entity.getSite(), new HashMap<>()); + } + all.get(entity.getSite()).put(entity.getFiledir(), entity.getSensitivityType()); + } + return all; + } }
