Repository: incubator-eagle Updated Branches: refs/heads/develop 43d229eec -> 27513f7b7
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java index 083c9f8..eac807e 100644 --- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java +++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java @@ -18,6 +18,7 @@ package org.apache.eagle.service.security.hdfs.rest; import java.util.ArrayList; import java.util.List; +import java.util.Map; import javax.ws.rs.Consumes; import javax.ws.rs.GET; @@ -26,9 +27,14 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; +import com.google.inject.Inject; import com.typesafe.config.Config; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.metadata.service.ApplicationEntityService; import org.apache.eagle.security.entity.FileStatusEntity; import org.apache.eagle.security.resolver.MetadataAccessConfigRepo; +import org.apache.eagle.security.service.ISecurityMetadataDAO; +import org.apache.eagle.security.service.MetadataDaoFactory; import org.apache.eagle.service.common.EagleExceptionWrapper; import org.apache.eagle.service.security.hdfs.HDFSResourceConstants; import org.apache.eagle.service.security.hdfs.HDFSResourceSensitivityDataJoiner; @@ -44,9 +50,17 @@ import org.apache.eagle.service.security.hdfs.HDFSFileSystem; * REST Web Service to browse files and Paths in HDFS */ @Path(HDFSResourceConstants.HDFS_RESOURCE) -public class HDFSResourceWebResource -{ +public class HDFSResourceWebResource { private static Logger LOG = LoggerFactory.getLogger(HDFSResourceWebResource.class); + final public static String HDFS_APPLICATION = "HdfsAuditLogApplication"; + private ApplicationEntityService entityService; + private ISecurityMetadataDAO dao; + + @Inject + public HDFSResourceWebResource(ApplicationEntityService entityService, Config eagleServerConfig){ + this.entityService = entityService; + dao = MetadataDaoFactory.getMetadataDAO(eagleServerConfig); + } @GET @Consumes(MediaType.APPLICATION_JSON) @@ -56,13 +70,12 @@ public class HDFSResourceWebResource LOG.info("Starting HDFS Resource Browsing. Query Parameters ==> Site :"+site+" Path : "+filePath ); HDFSResourceWebResponse response = new HDFSResourceWebResponse(); HDFSResourceWebRequestValidator validator = new HDFSResourceWebRequestValidator(); - MetadataAccessConfigRepo repo = new MetadataAccessConfigRepo(); List<FileStatusEntity> result = new ArrayList<>(); List<FileStatus> fileStatuses = null; try { validator.validate(site, filePath); // First Step would be validating Request - Config config = repo.getConfig(HDFSResourceConstants.HDFS_APPLICATION, site); - Configuration conf = repo.convert(config); + Map<String, Object> config = getAppConfig(site, HDFS_APPLICATION); + Configuration conf = convert(config); HDFSFileSystem fileSystem = new HDFSFileSystem(conf); fileStatuses = fileSystem.browse(filePath); // Join with File Sensitivity Info @@ -76,4 +89,17 @@ public class HDFSResourceWebResource response.setObj(result); return response; } + + private Map<String, Object> getAppConfig(String site, String appType){ + ApplicationEntity entity = entityService.getBySiteIdAndAppType(site, appType); + return entity.getConfiguration(); + } + + private Configuration convert(Map<String, Object> originalConfig) throws Exception { + Configuration config = new Configuration(); + for (Map.Entry<String, Object> entry : originalConfig.entrySet()) { + config.set(entry.getKey().toString(), entry.getValue().toString()); + } + return config; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java new file mode 100644 index 0000000..3319164 --- /dev/null +++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java @@ -0,0 +1,44 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 + * * <p/> + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.eagle.security.auditlog; + +import backtype.storm.topology.base.BaseRichBolt; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +/** + * Since 8/11/16. + */ +public class MapRFSAuditLogApplication extends AbstractHdfsAuditLogApplication { + @Override + public BaseRichBolt getParserBolt() { + return new MapRFSAuditLogParserBolt(); + } + + @Override + public String getSinkStreamName() { + return "mapr_audit_log_stream"; + } + + public static void main(String[] args){ + Config config = ConfigFactory.load(); + MapRFSAuditLogApplication app = new MapRFSAuditLogApplication(); + app.run(config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java new file mode 100644 index 0000000..37e55c6 --- /dev/null +++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java @@ -0,0 +1,76 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 + * * <p/> + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.security.auditlog; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.security.hdfs.MAPRFSAuditLogObject; +import org.apache.eagle.security.hdfs.MAPRFSAuditLogParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +/** + * Since 8/11/16. + */ +public class MapRFSAuditLogParserBolt extends BaseRichBolt { + private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class); + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + String logLine = input.getString(0); + + MAPRFSAuditLogParser parser = new MAPRFSAuditLogParser(); + MAPRFSAuditLogObject entity = null; + try{ + entity = parser.parse(logLine); + Map<String, Object> map = new TreeMap<String, Object>(); + map.put("src", entity.src); + map.put("dst", entity.dst); + map.put("host", entity.host); + map.put("timestamp", entity.timestamp); + map.put("status", entity.status); + map.put("user", entity.user); + map.put("cmd", entity.cmd); + map.put("volume", entity.volume); + collector.emit(Arrays.asList(map)); + }catch(Exception ex) { + LOG.error("Failing parse audit log message", ex); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("f1")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogProcessorMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogProcessorMain.java b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogProcessorMain.java deleted file mode 100644 index b824e8a..0000000 --- a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogProcessorMain.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.security.auditlog; - -import backtype.storm.spout.SchemeAsMultiScheme; -import com.typesafe.config.Config; -import org.apache.commons.lang3.time.DateUtils; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.core.StreamProducer; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.partition.DataDistributionDao; -import org.apache.eagle.partition.PartitionAlgorithm; -import org.apache.eagle.partition.PartitionStrategy; -import org.apache.eagle.partition.PartitionStrategyImpl; -import org.apache.eagle.security.partition.DataDistributionDaoImpl; -import org.apache.eagle.security.partition.GreedyPartitionAlgorithm; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public class MapRFSAuditLogProcessorMain { - - public static PartitionStrategy createStrategy(Config config) { - // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao - String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME); - String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD); - String topic = config.getString("dataSourceConfig.topic"); - DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic); - PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm(); - String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin"; - Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60; - String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin"; - Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60; - PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE); - return strategy; - } - - public static KafkaSourcedSpoutProvider createProvider(Config config) { - String deserClsName = config.getString("dataSourceConfig.deserializerClass"); - final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { - @Override - public List<Object> deserialize(byte[] ser) { - Object tmp = deserializer.deserialize(ser); - Map<String, Object> map = (Map<String, Object>)tmp; - if(tmp == null) return null; - return Arrays.asList(map.get("user"), tmp); - } - }; - - KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() { - @Override - public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) { - return new SchemeAsMultiScheme(scheme); - } - }; - return provider; - } - - @SuppressWarnings("unchecked") - public static void execWithDefaultPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) { - StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(Arrays.asList(0)); - //StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0)); - //source.streamUnion(reassembler) - source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0)) - .flatMap(new IPZoneDataJoinExecutor()) - .alertWithConsumer("maprFSAuditLogEventStream", "maprFSAuditLogAlertExecutor"); - env.execute(); - } - - @SuppressWarnings("unchecked") - public static void execWithBalancedPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) { - PartitionStrategy strategy = createStrategy(env.getConfig()); - StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(strategy); - //StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0)); - //source.streamUnion(reassembler) - source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0)) - .flatMap(new IPZoneDataJoinExecutor()) - .alertWithConsumer("maprFSAuditLogEventStream", "maprFSAuditLogAlertExecutor"); - env.execute(); - } - - public static void main(String[] args) throws Exception{ - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args); - Config config = env.getConfig(); - KafkaSourcedSpoutProvider provider = createProvider(env.getConfig()); - Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled"); - if (balancePartition) { - execWithBalancedPartition(env, provider); - } else { - execWithDefaultPartition(env, provider); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/log4j.properties b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/log4j.properties index 07f8402..12f215c 100644 --- a/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/log4j.properties +++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/log4j.properties @@ -20,7 +20,7 @@ eagle.log.file=eagle.log #log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG -#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG +#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinBoltUG log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG #log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG # standard output http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-server-assembly/src/main/conf/configuration.yml ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/configuration.yml b/eagle-server-assembly/src/main/conf/configuration.yml deleted file mode 100644 index c671ade..0000000 --- a/eagle-server-assembly/src/main/conf/configuration.yml +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -server: - applicationConnectors: - - type: http - port: 9090 - adminConnectors: - - type: http - port: 9091 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-server-assembly/src/main/conf/configuration.yml~HEAD ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/configuration.yml~HEAD b/eagle-server-assembly/src/main/conf/configuration.yml~HEAD new file mode 100644 index 0000000..c671ade --- /dev/null +++ b/eagle-server-assembly/src/main/conf/configuration.yml~HEAD @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +server: + applicationConnectors: + - type: http + port: 9090 + adminConnectors: + - type: http + port: 9091 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop b/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop new file mode 100644 index 0000000..c671ade --- /dev/null +++ b/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +server: + applicationConnectors: + - type: http + port: 9090 + adminConnectors: + - type: http + port: 9091 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-server/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml index 20819ff..924cd6d 100644 --- a/eagle-server/pom.xml +++ b/eagle-server/pom.xml @@ -145,6 +145,18 @@ <artifactId>eagle-security-hbase-web</artifactId> <version>${project.version}</version> </dependency> + + <!-- hdfs audit log monitoring --> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-security-hdfs-auditlog</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-security-hdfs-web</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <resources> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-server/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf index 111b7ea..9363445 100644 --- a/eagle-server/src/main/resources/application.conf +++ b/eagle-server/src/main/resources/application.conf @@ -43,7 +43,7 @@ "store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore" "jdbc":{ "username": "root" - "password": null + "password": "" "driverClassName":"com.mysql.jdbc.Driver" "url":"jdbc:mysql://server.eagle.apache.org:3306/eagle" }
