Repository: incubator-eagle Updated Branches: refs/heads/develop 502c7e37f -> 15e1c8335
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java deleted file mode 100644 index ad06bd4..0000000 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.security.hive.jobrunning; - -import com.typesafe.config.Config; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.security.hive.ql.HiveQLParserContent; -import org.apache.eagle.security.hive.ql.Parser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; - -/** - * parse hive query log - */ -public class HiveQueryParserExecutor extends JavaStormStreamExecutor2<String, Map> { - private static final long serialVersionUID = -5878930561335302957L; - private static final Logger LOG = LoggerFactory.getLogger(HiveQueryParserExecutor.class); - - private Config config; - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init(){ - - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){ - /** - * hiveQueryLog includes the following key value pair - * "hive.current.database" -> <database name> - * "hive.query.string" -> <hive query statement> - * "mapreduce.job.user.name" -> <user name> - * TODO we need hive job start and end time - */ - String user = (String)input.get(0); - @SuppressWarnings("unchecked") - Map<String, Object> hiveQueryLog = (Map<String, Object>)input.get(1); - //if(LOG.isDebugEnabled()) LOG.debug("Receive hive query log: " + hiveQueryLog); - - String query = null; - String db = null; - String userName = null; - long timestamp = -1; - for (Entry<String, Object> entry : hiveQueryLog.entrySet()) { - switch (entry.getKey()) { - case "hive.query.string": - if (entry.getValue() != null) { - query = entry.getValue().toString(); - } - break; - case "hive.current.database": - if (entry.getValue() != null) { - db = entry.getValue().toString(); - } - break; - case "mapreduce.job.user.name": - if (entry.getValue() != null) { - userName = entry.getValue().toString(); - } - break; - case "mapreduce.job.cache.files.timestamps": - if (entry.getValue() != null) { - String timestampString = (String) entry.getValue(); - String[] timestampArray = timestampString.split("\\s*,\\s*"); - /* Get timestamp of start time. */ - timestamp = Long.parseLong(timestampArray[0]); - } - break; - } - } - - HiveQLParserContent parserContent = null; - Parser queryParser = new Parser(); - try { - parserContent = queryParser.run(query); - } catch (Exception ex) { - LOG.error("Failed running hive query parser.", ex); - //throw new IllegalStateException(ex); - } - if(parserContent == null) { - LOG.warn("Event ignored as it can't be correctly parsed, the query log is " + query); - return; - } - if(parserContent.getTableColumnMap().size() == 0) { - LOG.warn("Unsupported command for parsing " + query); - return; - } - /** - * Generate "resource" field: /db/table/column - * "resource" -> </db/table/column1,/db/table/column2,...> - */ - StringBuilder resources = new StringBuilder(); - String prefix = ","; - String connector = "/"; - for (Entry<String, Set<String>> entry : parserContent.getTableColumnMap().entrySet()) { - String table = entry.getKey(); - Set<String> colSet = entry.getValue(); - /** - * If colSet is empty, it means no column is accessed in the table. - * So column is not added to the event stream. - * Only /db/table - */ - if (colSet.isEmpty()) { - resources.append(connector).append(db).append(connector).append(table).append(prefix); - } else { - for (String col : colSet) { - resources.append(connector).append(db).append(connector).append(table); - if (col != null && col.length() > 0) { - resources.append(connector).append(col); - } - resources.append(prefix); - } - } - } - /* Remove the last prefix: "," */ - resources.setLength(resources.length() - 1); - - /* <event> has to be SortedMap. */ - Map<String, Object> event = new TreeMap<String, Object>(); - event.put("user", userName); - event.put("command", parserContent.getOperation()); - event.put("timestamp", timestamp); - event.put("resource", resources.toString()); - LOG.info("HiveQL Parser event stream. " + event); - - outputCollector.collect(new Tuple2(user, event)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java deleted file mode 100644 index 29c4fd2..0000000 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.security.hive.jobrunning; - -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.jobrunning.common.JobConstants; -import org.apache.eagle.jobrunning.common.JobConstants.ResourceType; -import org.apache.eagle.jobrunning.storm.JobRunningContentFilter; -import org.apache.eagle.jobrunning.storm.JobRunningContentFilterImpl; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -public class JobConfigurationAdaptorExecutor extends JavaStormStreamExecutor2<String, Map> { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(JobConfigurationAdaptorExecutor.class); - private JobRunningContentFilter filter; - - @Override - public void prepareConfig(Config config) { - } - - @Override - public void init() { - filter = new JobRunningContentFilterImpl(); - } - - private Map<String, Object> convertMap(Map<String, String> configs) { - Map<String, Object> map = new HashMap<String, Object>(); - for (Entry<String, String> config : configs.entrySet()) { - map.put(config.getKey(), config.getValue()); - } - return map; - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){ - String user = (String)input.get(0); - String jobId = (String)input.get(1); - ResourceType type = (ResourceType)input.get(2); - if (type.equals(ResourceType.JOB_CONFIGURATION)) { - Map<String, String> configs = (Map<String, String>)input.get(3); - if (filter.acceptJobConf(configs)) { - if(LOG.isDebugEnabled()) { - LOG.debug("Got a hive job, jobID: " + jobId + ", query: " + configs.get(JobConstants.HIVE_QUERY_STRING)); - } else { - LOG.info("Got a hive job, jobID: " + jobId); - } - - Map<String, Object> map = convertMap(configs); - outputCollector.collect(new Tuple2(user, map)); - } - else { - LOG.info("skip non hive job, jobId: " + jobId); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java new file mode 100644 index 0000000..3f0b95b --- /dev/null +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.security.hive.jobrunning; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.jobrunning.common.JobConstants; +import org.apache.eagle.jobrunning.common.JobConstants.ResourceType; +import org.apache.eagle.jobrunning.storm.JobRunningContentFilter; +import org.apache.eagle.jobrunning.storm.JobRunningContentFilterImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +public class JobFilterBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(JobFilterBolt.class); + private OutputCollector collector; + private JobRunningContentFilter filter; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + filter = new JobRunningContentFilterImpl(); + } + + private Map<String, Object> convertMap(Map<String, String> configs) { + Map<String, Object> map = new HashMap<String, Object>(); + for (Entry<String, String> config : configs.entrySet()) { + map.put(config.getKey(), config.getValue()); + } + return map; + } + + @Override + public void execute(Tuple input) { + String user = input.getString(0); + String jobId = input.getString(1); + ResourceType type = (ResourceType)input.getValue(2); + if (type.equals(ResourceType.JOB_CONFIGURATION)) { + Map<String, String> configs = (Map<String, String>)input.getValue(3); + if (filter.acceptJobConf(configs)) { + if(LOG.isDebugEnabled()) { + LOG.debug("Got a hive job, jobID: " + jobId + ", query: " + configs.get(JobConstants.HIVE_QUERY_STRING)); + } else { + LOG.info("Got a hive job, jobID: " + jobId); + } + + Map<String, Object> map = convertMap(configs); + collector.emit(Arrays.asList(user, map)); + } + else { + LOG.info("skip non hive job, jobId: " + jobId); + } + } + collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("user", "message")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinBolt.java new file mode 100644 index 0000000..57da65b --- /dev/null +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinBolt.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.security.hive.sensitivity; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import com.typesafe.config.Config; +import org.apache.eagle.datastream.Collector; +import org.apache.eagle.datastream.JavaStormStreamExecutor2; +import org.apache.eagle.jobrunning.storm.JobRunningContentFilterImpl; +import org.apache.eagle.security.entity.HiveResourceSensitivityAPIEntity; +import org.apache.eagle.security.util.ExternalDataCache; +import org.apache.eagle.security.util.ExternalDataJoiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class HiveResourceSensitivityDataJoinBolt extends BaseRichBolt { + private final static Logger LOG = LoggerFactory.getLogger(HiveResourceSensitivityDataJoinBolt.class); + private OutputCollector collector; + private Config config; + + public HiveResourceSensitivityDataJoinBolt(Config config){ + this.config = config; + } + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + // start hive resource data polling + try { + ExternalDataJoiner joiner = new ExternalDataJoiner( + HiveResourceSensitivityPollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex()); + joiner.start(); + } catch(Exception ex){ + LOG.error("Fail to bring up quartz scheduler.", ex); + throw new IllegalStateException(ex); + } + } + + @Override + public void execute(Tuple input) { + String user = input.getString(0); + Map<String, Object> event = (Map<String, Object>)input.getValue(1); + Map<String, HiveResourceSensitivityAPIEntity> map = + (Map<String, HiveResourceSensitivityAPIEntity>) ExternalDataCache + .getInstance() + .getJobResult(HiveResourceSensitivityPollingJob.class); + + String resource = (String)event.get("resource"); + List<String> resourceList = Arrays.asList(resource.split("\\s*,\\s*")); + HiveResourceSensitivityAPIEntity sensitivityEntity = null; + + // Check if hive resource contains sensitive data. + for (String s : resourceList) { + if (map != null) { + sensitivityEntity = null; + for (String r : map.keySet()) { + Pattern pattern = Pattern.compile(r,Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(s); + boolean isMatched = matcher.matches(); + if (isMatched) { + sensitivityEntity = map.get(r); + break; + } + } + } + Map<String, Object> newEvent = new TreeMap<String, Object>(event); + newEvent.put("sensitivityType", sensitivityEntity == null ? + "NA" : sensitivityEntity.getSensitivityType()); + newEvent.put("resource", s); + if(LOG.isDebugEnabled()) { + LOG.debug("After hive resource sensitivity lookup: " + newEvent); + } + LOG.info("After hive resource sensitivity lookup: " + newEvent); + collector.emit(Arrays.asList(user, newEvent)); + collector.ack(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("user", "message")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java deleted file mode 100644 index d26abea..0000000 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.security.hive.sensitivity; - -import com.typesafe.config.Config; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.security.entity.HiveResourceSensitivityAPIEntity; -import org.apache.eagle.security.util.ExternalDataCache; -import org.apache.eagle.security.util.ExternalDataJoiner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class HiveResourceSensitivityDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> { - private final static Logger LOG = LoggerFactory.getLogger( - HiveResourceSensitivityDataJoinExecutor.class); - private Config config; - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - // start hive resource data polling - try { - ExternalDataJoiner joiner = new ExternalDataJoiner( - HiveResourceSensitivityPollingJob.class, config, "1"); - joiner.start(); - } catch(Exception ex){ - LOG.error("Fail to bring up quartz scheduler.", ex); - throw new IllegalStateException(ex); - } - } - - @Override - @SuppressWarnings("unchecked") - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){ - String user = (String)input.get(0); - Map<String, Object> event = (Map<String, Object>)input.get(1); - Map<String, HiveResourceSensitivityAPIEntity> map = - (Map<String, HiveResourceSensitivityAPIEntity>) ExternalDataCache - .getInstance() - .getJobResult(HiveResourceSensitivityPollingJob.class); - - String resource = (String)event.get("resource"); - List<String> resourceList = Arrays.asList(resource.split("\\s*,\\s*")); - HiveResourceSensitivityAPIEntity sensitivityEntity = null; - - // Check if hive resource contains sensitive data. - for (String s : resourceList) { - if (map != null) { - sensitivityEntity = null; - for (String r : map.keySet()) { - Pattern pattern = Pattern.compile(r,Pattern.CASE_INSENSITIVE); - Matcher matcher = pattern.matcher(s); - boolean isMatched = matcher.matches(); - if (isMatched) { - sensitivityEntity = map.get(r); - break; - } - } - } - Map<String, Object> newEvent = new TreeMap<String, Object>(event); - newEvent.put("sensitivityType", sensitivityEntity == null ? - "NA" : sensitivityEntity.getSensitivityType()); - newEvent.put("resource", s); - if(LOG.isDebugEnabled()) { - LOG.debug("After hive resource sensitivity lookup: " + newEvent); - } - LOG.info("After hive resource sensitivity lookup: " + newEvent); - outputCollector.collect(new Tuple2( - user, - newEvent)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java index b7d9e9c..061ef19 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java @@ -20,6 +20,9 @@ import com.google.common.base.Function; import com.google.common.collect.Maps; import org.apache.eagle.common.config.EagleConfigConstants; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.security.service.HiveSensitivityEntity; +import org.apache.eagle.security.service.IMetadataServiceClient; +import org.apache.eagle.security.service.MetadataServiceClientImpl; import org.apache.eagle.security.util.ExternalDataCache; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; @@ -31,6 +34,7 @@ import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -43,18 +47,18 @@ public class HiveResourceSensitivityPollingJob implements Job { throws JobExecutionException { JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); try { - List<HiveResourceSensitivityAPIEntity> + Collection<HiveSensitivityEntity> hiveResourceSensitivity = load(jobDataMap); if(hiveResourceSensitivity == null) { LOG.warn("Hive resource sensitivity information is empty"); return; } - Map<String, HiveResourceSensitivityAPIEntity> map = Maps.uniqueIndex( + Map<String, HiveSensitivityEntity> map = Maps.uniqueIndex( hiveResourceSensitivity, - new Function<HiveResourceSensitivityAPIEntity, String>() { + new Function<HiveSensitivityEntity, String>() { @Override - public String apply(HiveResourceSensitivityAPIEntity input) { - return input.getTags().get("hiveResource"); + public String apply(HiveSensitivityEntity input) { + return input.getHiveResource(); } }); ExternalDataCache.getInstance().setJobResult(getClass(), map); @@ -63,7 +67,7 @@ public class HiveResourceSensitivityPollingJob implements Job { } } - private List<HiveResourceSensitivityAPIEntity> load(JobDataMap jobDataMap) throws Exception { + private Collection<HiveSensitivityEntity> load(JobDataMap jobDataMap) throws Exception { Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE); String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST); Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString()); @@ -74,13 +78,7 @@ public class HiveResourceSensitivityPollingJob implements Job { LOG.info("Load hive resource sensitivity information from eagle service " + eagleServiceHost + ":" + eagleServicePort); - IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password); - String query = "HiveResourceSensitivityService[]{*}"; - GenericServiceAPIResponseEntity<HiveResourceSensitivityAPIEntity> response = - client.search().pageSize(Integer.MAX_VALUE).query(query).send(); - client.close(); - if (response.getException() != null) - throw new IllegalStateException(response.getException()); - return response.getObj(); + IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest"); + return client.listHiveSensitivities(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HiveQueryMonitoringAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HiveQueryMonitoringAppProvider.xml b/eagle-security/eagle-security-hive/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HiveQueryMonitoringAppProvider.xml new file mode 100644 index 0000000..931bee6 --- /dev/null +++ b/eagle-security/eagle-security-hive/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HiveQueryMonitoringAppProvider.xml @@ -0,0 +1,218 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<application> + <type>HiveQueryMonitoringApplication</type> + <name>Hdfs Audit Log Monitoring Application</name> + <version>0.5.0-incubating</version> + <appClass>org.apache.eagle.security.auditlog.HdfsAuditLogApplication</appClass> + <viewPath>/apps/example</viewPath> + <configuration> + <property> + <name>dataSourceConfig.zkQuorum</name> + <displayName>dataSourceConfig.zkQuorum</displayName> + <value>server.eagle.apache.org:2181</value> + <description>zookeeper quorum for storing hive job processing status</description> + </property> + <property> + <name>dataSourceConfig.zkRoot</name> + <displayName>dataSourceConfig.zkRoot</displayName> + <value>/jobrunning</value> + <description>zookeeper znode path for storing hive job processing status</description> + </property> + <property> + <name>dataSourceConfig.zkSessionTimeoutMs</name> + <displayName>dataSourceConfig.zkSessionTimeoutMs</displayName> + <value>15000</value> + <description>zk connection timeout in milliseconds</description> + </property> + <property> + <name>dataSourceConfig.zkRetryTimes</name> + <displayName>dataSourceConfig.zkRetryTimes</displayName> + <value>3</value> + <description>retry times when zookeeper fails</description> + </property> + <property> + <name>dataSourceConfig.zkRetryInterval</name> + <displayName>dataSourceConfig.zkRetryInterval</displayName> + <value>2000</value> + <description>interval for retrying when zookeeper fails</description> + </property> + <property> + <name>dataSourceConfig.RMEndPoints</name> + <displayName>dataSourceConfig.RMEndPoints</displayName> + <value>http://server.eagle.apache.org:8088/</value> + <description>resource manager endpoint</description> + </property> + <property> + <name>dataSourceConfig.HSEndPoint</name> + <displayName>dataSourceConfig.HSEndPoint</displayName> + <value>http://server.eagle.apache.org:19888/</value> + <description>history server endpoint</description> + </property> + <property> + <name>dataSourceConfig.partitionerCls</name> + <displayName>dataSourceConfig.partitionerCls</displayName> + <value>org.apache.eagle.job.DefaultJobPartitionerImpl</value> + <description>partition class for job</description> + </property> + <property> + <name>topology.numOfSpoutTasks</name> + <displayName>topology.numOfSpoutTasks</displayName> + <value>2</value> + <description>number of spout tasks</description> + </property> + <property> + <name>topology.numOfParserTasks</name> + <displayName>topology.numOfParserTasks</displayName> + <value>2</value> + <description>number of parser tasks</description> + </property> + <property> + <name>topology.numOfJoinTasks</name> + <displayName>topology.numOfJoinTasks</displayName> + <value>2</value> + <description>number of external join tasks</description> + </property> + <property> + <name>topology.numOfSinkTasks</name> + <displayName>topology.numOfSinkTasks</displayName> + <value>2</value> + <description>number of sink tasks</description> + </property> + <property> + <name>eagleProps.dataJoinPollIntervalSec</name> + <displayName>eagleProps.dataJoinPollIntervalSec</displayName> + <value>30</value> + <description>interval in seconds for polling</description> + </property> + <property> + <name>eagleProps.eagleService.host</name> + <displayName>eagleProps.eagleService.host</displayName> + <value>localhost</value> + <description>eagle service host</description> + </property> + <property> + <name>eagleProps.eagleService.port</name> + <displayName>eagleProps.eagleService.port</displayName> + <value>8080</value> + <description>eagle service port</description> + </property> + <property> + <name>eagleProps.eagleService.username</name> + <displayName>eagleProps.eagleService.username</displayName> + <value>admin</value> + <description>eagle service username</description> + </property> + <property> + <name>eagleProps.eagleService.password</name> + <displayName>eagleProps.eagleService.password</displayName> + <value>secret</value> + <description>eagle service password</description> + </property> + <property> + <name>dataSinkConfig.topic</name> + <displayName>dataSinkConfig.topic</displayName> + <value>hive_query_parsed</value> + <description>topic for kafka data sink</description> + </property> + <property> + <name>dataSinkConfig.brokerList</name> + <displayName>dataSinkConfig.brokerList</displayName> + <value>server.eagle.apache.org:6667</value> + <description>kafka broker list</description> + </property> + <property> + <name>dataSinkConfig.serializerClass</name> + <displayName>dataSinkConfig.serializerClass</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>serializer class Kafka message value</description> + </property> + <property> + <name>dataSinkConfig.keySerializerClass</name> + <displayName>dataSinkConfig.keySerializerClass</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>serializer class Kafka message key</description> + </property> + + <!-- properties for hdfs file system access and attribute resolver--> + <property> + <name>fs.defaultFS</name> + <displayName>fs.defaultFS</displayName> + <value>hdfs://server.eagle.apache.org:8020</value> + <description>hdfs endpoint</description> + </property> + </configuration> + <streams> + <stream> + <streamId>hdfs_audit_log_stream</streamId> + <description>Hdfs Audit Log Stream</description> + <validate>true</validate> + <timeseries>true</timeseries> + <columns> + <column> + <name>action</name> + <type>string</type> + </column> + <column> + <name>host</name> + <type>string</type> + </column> + <column> + <name>status</name> + <type>string</type> + </column> + <column> + <name>timestamp</name> + <type>long</type> + </column> + </columns> + </stream> + </streams> + <docs> + <install> +# Step 1: Create source kafka topic named "${site}_example_source_topic" + +./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1 + +# Step 2: Set up data collector to flow data into kafka topic in + +./bin/logstash -f log_collector.conf + +## `log_collector.conf` sample as following: + +input { + +} +filter { + +} +output{ + +} + +# Step 3: start application + +# Step 4: monitor with featured portal or alert with policies + </install> + <uninstall> +# Step 1: stop and uninstall application +# Step 2: delete kafka topic named "${site}_example_source_topic" +# Step 3: stop logstash + </uninstall> + </docs> +</application> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-security/eagle-security-hive/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider new file mode 100644 index 0000000..fdd2754 --- /dev/null +++ b/eagle-security/eagle-security-hive/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.eagle.security.hive.jobrunning.HiveQueryMonitoringAppProvider http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/resources/application.conf b/eagle-security/eagle-security-hive/src/main/resources/application.conf index 22be461..f21b4a0 100644 --- a/eagle-security/eagle-security-hive/src/main/resources/application.conf +++ b/eagle-security/eagle-security-hive/src/main/resources/application.conf @@ -14,50 +14,39 @@ # limitations under the License. { - "envContextConfig" : { - "env" : "storm", - "mode" : "local", - "topologyName" : "hiveQueryRunningTopology", - "stormConfigFile" : "hive.storm.yaml", - "parallelismConfig" : { - "msgConsumer" : 1 - } + "appId" : "HiveQueryMonitoringApp", + "mode" : "LOCAL", + "siteId" : "testsite", + "topology" : { + "numOfSpoutTasks" : 2, + "numOfFilterTasks" : 2, + "numOfParserTasks" : 2, + "numOfJoinTasks" : 2, + "numOfSinkTasks" : 2 }, "dataSourceConfig": { - "flavor" : "stormrunning", - "zkQuorum" : "localhost:2181", + "zkQuorum" : "server.eagle.apache.org:2181", "zkRoot" : "/jobrunning", "zkSessionTimeoutMs" : 15000, "zkRetryTimes" : 3, "zkRetryInterval" : 2000, - "RMEndPoints" : "http://localhost:8088/", - "HSEndPoint" : "http://localhost:19888/", + "RMEndPoints" : "http://server.eagle.apache.org:8088/", + "HSEndPoint" : "http://server.eagle.apache.org:19888/", "partitionerCls" : "org.apache.eagle.job.DefaultJobPartitionerImpl", }, "eagleProps" : { - "site" : "sandbox", - "application" : "hiveQueryLog", - "mailHost" : "mailHost.com", - "mailSmtpPort":"25", - "mailDebug" : "true", + "dataJoinPollIntervalSec" : 30, "eagleService": { "host": "localhost", - "port": 38080, + "port": 9090, "username": "admin", "password": "secret" } }, - "alertExecutorConfigs" : { - "hiveAccessAlertByRunningJob" : { - "parallelism" : 1, - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner", - "needValidation" : "true" - } - }, - "dynamicConfigSource" : { - "enabled" : true, - "initDelayMillis" : 0, - "delayMillis" : 30000, - "ignoreDeleteFromSource" : true + "dataSinkConfig": { + "topic" : "hive_query_parsed", + "brokerList" : "server.eagle.apache.org:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-server-assembly/src/main/conf/configuration.yml ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/configuration.yml b/eagle-server-assembly/src/main/conf/configuration.yml new file mode 100644 index 0000000..c671ade --- /dev/null +++ b/eagle-server-assembly/src/main/conf/configuration.yml @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +server: + applicationConnectors: + - type: http + port: 9090 + adminConnectors: + - type: http + port: 9091 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-server-assembly/src/main/conf/configuration.yml~HEAD ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/configuration.yml~HEAD b/eagle-server-assembly/src/main/conf/configuration.yml~HEAD deleted file mode 100644 index c671ade..0000000 --- a/eagle-server-assembly/src/main/conf/configuration.yml~HEAD +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -server: - applicationConnectors: - - type: http - port: 9090 - adminConnectors: - - type: http - port: 9091 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop b/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop deleted file mode 100644 index c671ade..0000000 --- a/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -server: - applicationConnectors: - - type: http - port: 9090 - adminConnectors: - - type: http - port: 9091
