http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java deleted file mode 100644 index 4397873..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java +++ /dev/null @@ -1,142 +0,0 @@ -package gov.nasa.jpl.mudrod.weblog.partition; - -import java.util.*; - -public class KGreedyPartitionSolver implements ThePartitionProblemSolver { - - public boolean bsorted = false; - - public KGreedyPartitionSolver() { - // default constructor - } - - public KGreedyPartitionSolver(boolean bsorted) { - this.bsorted = true; - } - - @Override - public Map<String, Integer> solve(Map<String, Double> labelNums, int k) { - List<Double> lista = null; - List<String> months = null; - - if (!this.bsorted) { - LinkedHashMap sortedMap = this.sortMapByValue(labelNums); - lista = new ArrayList(sortedMap.values()); - months = new ArrayList(sortedMap.keySet()); - } else { - lista = new ArrayList(labelNums.values()); - months = new ArrayList(labelNums.keySet()); - } - - List<List<Double>> parts = new ArrayList<>(); - List<List<String>> splitMonths = new ArrayList<>(); - - for (int i = 0; i < k; i++) { - List<Double> part = new ArrayList(); - parts.add(part); - - List<String> monthList = new ArrayList(); - splitMonths.add(monthList); - } - - int j = 0; - for (Double lista1 : lista) { - - Double minimalSum = -1.0; - int position = 0; - for (int i = 0; i < parts.size(); i++) { - List<Double> part = parts.get(i); - if (minimalSum == -1) { - minimalSum = Suma(part); - position = i; - } else if (Suma(part) < minimalSum) { - minimalSum = Suma(part); - position = i; - } - } - - List<Double> part = parts.get(position); - part.add(lista1); - parts.set(position, part); - - List<String> month = splitMonths.get(position); - month.add(months.get(j)); - splitMonths.set(position, month); - j++; - } - - /* for(int i=0; i<splitMonths.size(); i++){ - System.out.println("group:" + i); - printStrList(splitMonths.get(i)); - } - - for(int i=0; i<parts.size(); i++){ - print(parts.get(i)); - }*/ - - Map<String, Integer> LabelGroups = new HashMap<String, Integer>(); - for (int i = 0; i < splitMonths.size(); i++) { - List<String> list = splitMonths.get(i); - for (int m = 0; m < list.size(); m++) { - LabelGroups.put(list.get(m), i); - } - } - - return LabelGroups; - } - - public LinkedHashMap<String, Double> sortMapByValue(Map passedMap) { - List mapKeys = new ArrayList(passedMap.keySet()); - List mapValues = new ArrayList(passedMap.values()); - Collections.sort(mapValues, Collections.reverseOrder()); - Collections.sort(mapKeys, Collections.reverseOrder()); - - LinkedHashMap sortedMap = new LinkedHashMap(); - - Iterator valueIt = mapValues.iterator(); - while (valueIt.hasNext()) { - Object val = valueIt.next(); - Iterator keyIt = mapKeys.iterator(); - - while (keyIt.hasNext()) { - Object key = keyIt.next(); - String comp1 = passedMap.get(key).toString(); - String comp2 = val.toString(); - - if (comp1.equals(comp2)) { - passedMap.remove(key); - mapKeys.remove(key); - sortedMap.put((String) key, (Double) val); - break; - } - - } - - } - return sortedMap; - } - - private Double Suma(List<Double> part) { - Double ret = 0.0; - for (int i = 0; i < part.size(); i++) { - ret += part.get(i); - } - return ret; - } - - private void print(List<Double> list) { - /*for (int i = 0; i < list.size(); i++) { - System.out.print(list.get(i)+","); - }*/ - System.out.print("sum is:" + Suma(list)); - System.out.println(); - } - - private void printStrList(List<String> list) { - for (int i = 0; i < list.size(); i++) { - System.out.print(list.get(i) + ","); - } - System.out.println(); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java deleted file mode 100644 index 11aaed3..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java +++ /dev/null @@ -1,8 +0,0 @@ -package gov.nasa.jpl.mudrod.weblog.partition; - -import java.util.Map; - -public interface ThePartitionProblemSolver { - - public Map<String, Integer> solve(Map<String, Double> labelNums, int k); -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java deleted file mode 100644 index 4c299dd..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java +++ /dev/null @@ -1,33 +0,0 @@ -package gov.nasa.jpl.mudrod.weblog.partition; - -import org.apache.spark.Partitioner; - -import java.util.Map; - -public class logPartitioner extends Partitioner { - - int num; - Map<String, Integer> UserGroups; - - public logPartitioner(int num) { - this.num = num; - } - - public logPartitioner(Map<String, Integer> UserGroups, int num) { - this.UserGroups = UserGroups; - this.num = num; - } - - @Override - public int getPartition(Object arg0) { - // TODO Auto-generated method stub - String user = (String) arg0; - return UserGroups.get(user); - } - - @Override - public int numPartitions() { - // TODO Auto-generated method stub - return num; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java deleted file mode 100644 index 34323df..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix; -import gov.nasa.jpl.mudrod.utils.MatrixUtil; -import gov.nasa.jpl.mudrod.weblog.structure.ClickStream; -import gov.nasa.jpl.mudrod.weblog.structure.SessionExtractor; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Properties; - -/** - * Supports ability to extract click stream data based on session processing results - */ -public class ClickStreamGenerator extends DiscoveryStepAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(ClickStreamGenerator.class); - - public ClickStreamGenerator(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - @Override - public Object execute() { - LOG.info("Starting ClickStreamGenerator..."); - startTime = System.currentTimeMillis(); - - String clickstremMatrixFile = props.getProperty("clickstreamMatrix"); - try { - SessionExtractor extractor = new SessionExtractor(); - JavaRDD<ClickStream> clickstreamRDD = extractor.extractClickStreamFromES(this.props, this.es, this.spark); - int weight = Integer.parseInt(props.getProperty("downloadWeight")); - JavaPairRDD<String, List<String>> metaddataQueryRDD = extractor.bulidDataQueryRDD(clickstreamRDD, weight); - LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metaddataQueryRDD); - - MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, clickstremMatrixFile); - } catch (Exception e) { - LOG.error("Encountered error within ClickStreamGenerator: {}", e); - } - - endTime = System.currentTimeMillis(); - LOG.info("ClickStreamGenerator complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); - return null; - } - - @Override - public Object execute(Object o) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java deleted file mode 100644 index 80bf33b..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.joda.time.DateTime; -import org.joda.time.Seconds; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * An {@link DiscoveryStepAbstract} - * implementation which detects a known list of Web crawlers which may may be - * present within, and pollute various logs acting as input to Mudrod. - */ -public class CrawlerDetection extends LogAbstract { - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(CrawlerDetection.class); - - public static final String CRAWLER = "crawler"; - public static final String GOOGLE_BOT = "googlebot"; - public static final String BING_BOT = "bingbot"; - public static final String YAHOO_BOT = "slurp"; - public static final String YACY_BOT = "yacybot"; - public static final String ROGER_BOT = "rogerbot"; - public static final String YANDEX_BOT = "yandexbot"; - - public static final String NO_AGENT_BOT = "-"; - public static final String PERL_BOT = "libwww-perl/"; - public static final String APACHE_HHTP = "apache-httpclient/"; - public static final String JAVA_CLIENT = "java/"; - public static final String CURL = "curl/"; - - /** - * Paramterized constructor to instantiate a configured instance of - * {@link CrawlerDetection} - * - * @param props populated {@link java.util.Properties} object - * @param es {@link ESDriver} object to use in - * crawler detection preprocessing. - * @param spark {@link SparkDriver} object to use in - * crawler detection preprocessing. - */ - public CrawlerDetection(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - public CrawlerDetection() { - super(null, null, null); - } - - @Override - public Object execute() { - LOG.info("Starting Crawler detection {}.", httpType); - startTime = System.currentTimeMillis(); - try { - checkByRate(); - } catch (InterruptedException | IOException e) { - LOG.error("Encountered an error whilst detecting Web crawlers.", e); - } - endTime = System.currentTimeMillis(); - es.refreshIndex(); - LOG.info("Crawler detection complete. Time elapsed {} seconds", (endTime - startTime) / 1000); - return null; - } - - /** - * Check known crawler through crawler agent name list - * - * @param agent name of a log line - * @return 1 if the log is initiated by crawler, 0 otherwise - */ - public boolean checkKnownCrawler(String agent) { - agent = agent.toLowerCase(); - if (agent.contains(CRAWLER) || agent.contains(GOOGLE_BOT) || agent.contains(BING_BOT) || agent.contains(APACHE_HHTP) || agent.contains(PERL_BOT) || agent.contains(YAHOO_BOT) || agent - .contains(YANDEX_BOT) || agent.contains(NO_AGENT_BOT) || agent.contains(PERL_BOT) || agent.contains(APACHE_HHTP) || agent.contains(JAVA_CLIENT) || agent.contains(CURL)) { - return true; - } else { - return false; - } - } - - public void checkByRate() throws InterruptedException, IOException { - String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); - if (processingType.equals("sequential")) { - checkByRateInSequential(); - } else if (processingType.equals("parallel")) { - checkByRateInParallel(); - } - } - - /** - * Check crawler by request sending rate, which is read from configruation - * file - * - * @throws InterruptedException InterruptedException - * @throws IOException IOException - */ - public void checkByRateInSequential() throws InterruptedException, IOException { - es.createBulkProcessor(); - - int rate = Integer.parseInt(props.getProperty("sendingrate")); - - Terms users = this.getUserTerms(this.httpType); - LOG.info("Original User count: {}", Integer.toString(users.getBuckets().size())); - - int userCount = 0; - for (Terms.Bucket entry : users.getBuckets()) { - String user = entry.getKey().toString(); - int count = checkByRate(es, user); - userCount += count; - } - es.destroyBulkProcessor(); - LOG.info("User count: {}", Integer.toString(userCount)); - } - - void checkByRateInParallel() throws InterruptedException, IOException { - - JavaRDD<String> userRDD = getUserRDD(this.httpType); - LOG.info("Original User count: {}", userRDD.count()); - - int userCount = 0; - userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> { - ESDriver tmpES = new ESDriver(props); - tmpES.createBulkProcessor(); - List<Integer> realUserNums = new ArrayList<>(); - while (iterator.hasNext()) { - String s = iterator.next(); - Integer realUser = checkByRate(tmpES, s); - realUserNums.add(realUser); - } - tmpES.destroyBulkProcessor(); - tmpES.close(); - return realUserNums.iterator(); - }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b); - - LOG.info("User count: {}", Integer.toString(userCount)); - } - - private int checkByRate(ESDriver es, String user) { - - int rate = Integer.parseInt(props.getProperty("sendingrate")); - Pattern pattern = Pattern.compile("get (.*?) http/*"); - Matcher matcher; - - BoolQueryBuilder filterSearch = new BoolQueryBuilder(); - filterSearch.must(QueryBuilders.termQuery("IP", user)); - - AggregationBuilder aggregation = AggregationBuilders.dateHistogram("by_minute").field("Time").dateHistogramInterval(DateHistogramInterval.MINUTE).order(Order.COUNT_DESC); - SearchResponse checkRobot = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setQuery(filterSearch).setSize(0).addAggregation(aggregation).execute().actionGet(); - - Histogram agg = checkRobot.getAggregations().get("by_minute"); - - List<? extends Histogram.Bucket> botList = agg.getBuckets(); - long maxCount = botList.get(0).getDocCount(); - if (maxCount >= rate) { - return 0; - } else { - DateTime dt1 = null; - int toLast = 0; - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setScroll(new TimeValue(60000)).setQuery(filterSearch).setSize(100).execute().actionGet(); - while (true) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - Map<String, Object> result = hit.getSource(); - String logtype = (String) result.get("LogType"); - if (logtype.equals("PO.DAAC")) { - String request = (String) result.get("Request"); - matcher = pattern.matcher(request.trim().toLowerCase()); - boolean find = false; - while (matcher.find()) { - request = matcher.group(1); - result.put("RequestUrl", "http://podaac.jpl.nasa.gov" + request); - find = true; - } - if (!find) { - result.put("RequestUrl", request); - } - } else { - result.put("RequestUrl", result.get("Request")); - } - - DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); - DateTime dt2 = fmt.parseDateTime((String) result.get("Time")); - - if (dt1 == null) { - toLast = 0; - } else { - toLast = Math.abs(Seconds.secondsBetween(dt1, dt2).getSeconds()); - } - result.put("ToLast", toLast); - IndexRequest ir = new IndexRequest(logIndex, cleanupType).source(result); - - es.getBulkProcessor().add(ir); - dt1 = dt2; - } - - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - if (scrollResp.getHits().getHits().length == 0) { - break; - } - } - - } - - return 1; - } - - @Override - public Object execute(Object o) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java deleted file mode 100644 index d5dc102..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.*; - -/** - * Supports ability to generate search history (queries) for each individual - * user (IP) - */ -public class HistoryGenerator extends LogAbstract { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(HistoryGenerator.class); - - public HistoryGenerator(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - @Override - public Object execute() { - LOG.info("Starting HistoryGenerator..."); - startTime = System.currentTimeMillis(); - - generateBinaryMatrix(); - - endTime = System.currentTimeMillis(); - LOG.info("HistoryGenerator complete. Time elapsed {} seconds", (endTime - startTime) / 1000); - return null; - } - - /** - * Method to generate a binary user*query matrix (stored in temporary .csv - * file) - */ - public void generateBinaryMatrix() { - try { - - System.out.println(props.getProperty("userHistoryMatrix")); - File file = new File(props.getProperty("userHistoryMatrix")); - if (file.exists()) { - file.delete(); - } - - file.createNewFile(); - - FileWriter fw = new FileWriter(file.getAbsoluteFile()); - BufferedWriter bw = new BufferedWriter(fw); - bw.write("Num" + ","); - - // step 1: write first row of csv - List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX)); - - String[] logIndices = logIndexList.toArray(new String[0]); - String[] statictypeArray = new String[] { this.sessionStats }; - int docCount = es.getDocCount(logIndices, statictypeArray); - - SearchResponse sr = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0) - .addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet(); - Terms ips = sr.getAggregations().get("IPs"); - List<String> ipList = new ArrayList<>(); - for (Terms.Bucket entry : ips.getBuckets()) { - if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) { // filter - // out - // less - // active users/ips - ipList.add(entry.getKey().toString()); - } - } - bw.write(String.join(",", ipList) + "\n"); - - // step 2: step the rest rows of csv - SearchRequestBuilder sr2Builder = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0) - .addAggregation(AggregationBuilders.terms("KeywordAgg").field("keywords").size(docCount).subAggregation(AggregationBuilders.terms("IPAgg").field("IP").size(docCount))); - - SearchResponse sr2 = sr2Builder.execute().actionGet(); - Terms keywords = sr2.getAggregations().get("KeywordAgg"); - - for (Terms.Bucket keyword : keywords.getBuckets()) { - - Map<String, Integer> ipMap = new HashMap<>(); - Terms ipAgg = keyword.getAggregations().get("IPAgg"); - - int distinctUser = ipAgg.getBuckets().size(); - if (distinctUser > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) { - bw.write(keyword.getKey() + ","); - for (Terms.Bucket IP : ipAgg.getBuckets()) { - - ipMap.put(IP.getKey().toString(), 1); - } - for (int i = 0; i < ipList.size(); i++) { - if (ipMap.containsKey(ipList.get(i))) { - bw.write(ipMap.get(ipList.get(i)) + ","); - } else { - bw.write("0,"); - } - } - bw.write("\n"); - } - } - - bw.close(); - } catch (IOException e) { - e.printStackTrace(); - } - - } - - @Override - public Object execute(Object o) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java deleted file mode 100644 index c55082a..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import gov.nasa.jpl.mudrod.weblog.structure.ApacheAccessLog; -import gov.nasa.jpl.mudrod.weblog.structure.FtpLog; -import org.apache.spark.api.java.JavaRDD; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -/** - * Supports ability to parse and process FTP and HTTP log files - */ -public class ImportLogFile extends LogAbstract { - - private static final Logger LOG = LoggerFactory.getLogger(ImportLogFile.class); - - /** - * - */ - private static final long serialVersionUID = 1L; - - String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] " + "\"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" \"([^\"]+)\""; - - public static final int NUM_FIELDS = 9; - Pattern p = Pattern.compile(logEntryPattern); - transient Matcher matcher; - - /** - * Constructor supporting a number of parameters documented below. - * - * @param props a {@link java.util.Map} containing K,V of type String, String - * respectively. - * @param es the {@link ESDriver} used to persist log - * files. - * @param spark the {@link SparkDriver} used to process - * input log files. - */ - public ImportLogFile(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - @Override - public Object execute() { - LOG.info("Starting Log Import {}", props.getProperty(MudrodConstants.TIME_SUFFIX)); - startTime = System.currentTimeMillis(); - readFile(); - endTime = System.currentTimeMillis(); - LOG.info("Log Import complete. Time elapsed {} seconds", (endTime - startTime) / 1000); - es.refreshIndex(); - return null; - } - - /** - * Utility function to aid String to Number formatting such that three letter - * months such as 'Jan' are converted to the Gregorian integer equivalent. - * - * @param time the input {@link java.lang.String} to convert to int. - * @return the converted Month as an int. - */ - public String switchtoNum(String time) { - String newTime = time; - if (newTime.contains("Jan")) { - newTime = newTime.replace("Jan", "1"); - } else if (newTime.contains("Feb")) { - newTime = newTime.replace("Feb", "2"); - } else if (newTime.contains("Mar")) { - newTime = newTime.replace("Mar", "3"); - } else if (newTime.contains("Apr")) { - newTime = newTime.replace("Apr", "4"); - } else if (newTime.contains("May")) { - newTime = newTime.replace("May", "5"); - } else if (newTime.contains("Jun")) { - newTime = newTime.replace("Jun", "6"); - } else if (newTime.contains("Jul")) { - newTime = newTime.replace("Jul", "7"); - } else if (newTime.contains("Aug")) { - newTime = newTime.replace("Aug", "8"); - } else if (newTime.contains("Sep")) { - newTime = newTime.replace("Sep", "9"); - } else if (newTime.contains("Oct")) { - newTime = newTime.replace("Oct", "10"); - } else if (newTime.contains("Nov")) { - newTime = newTime.replace("Nov", "11"); - } else if (newTime.contains("Dec")) { - newTime = newTime.replace("Dec", "12"); - } - return newTime; - } - - public void readFile() { - - String httplogpath = null; - String ftplogpath = null; - - File directory = new File(props.getProperty(MudrodConstants.DATA_DIR)); - File[] fList = directory.listFiles(); - for (File file : fList) { - if (file.isFile() && file.getName().contains(props.getProperty(MudrodConstants.TIME_SUFFIX))) - { - if (file.getName().contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) - { - httplogpath = file.getAbsolutePath(); - } - - if (file.getName().contains(props.getProperty(MudrodConstants.FTP_PREFIX))) - { - ftplogpath = file.getAbsolutePath(); - } - } - } - - if(httplogpath == null || ftplogpath == null) - { - LOG.error("WWW file or FTP logs cannot be found, please check your data directory."); - return; - } - - String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE, "parallel"); - if (processingType.equals("sequential")) { - readFileInSequential(httplogpath, ftplogpath); - } else if (processingType.equals("parallel")) { - readFileInParallel(httplogpath, ftplogpath); - } - } - - /** - * Read the FTP or HTTP log path with the intention of processing lines from - * log files. - * - * @param httplogpath path to the parent directory containing http logs - * @param ftplogpath path to the parent directory containing ftp logs - */ - public void readFileInSequential(String httplogpath, String ftplogpath) { - es.createBulkProcessor(); - try { - readLogFile(httplogpath, "http", logIndex, httpType); - readLogFile(ftplogpath, "FTP", logIndex, ftpType); - - } catch (IOException e) { - LOG.error("Error whilst reading log file.", e); - } - es.destroyBulkProcessor(); - } - - /** - * Read the FTP or HTTP log path with the intention of processing lines from - * log files. - * - * @param httplogpath path to the parent directory containing http logs - * @param ftplogpath path to the parent directory containing ftp logs - */ - public void readFileInParallel(String httplogpath, String ftplogpath) { - - importHttpfile(httplogpath); - importFtpfile(ftplogpath); - } - - public void importHttpfile(String httplogpath) { - // import http logs - JavaRDD<String> accessLogs = spark.sc.textFile(httplogpath, this.partition).map(s -> ApacheAccessLog.parseFromLogLine(s)).filter(ApacheAccessLog::checknull); - - JavaEsSpark.saveJsonToEs(accessLogs, logIndex + "/" + this.httpType); - } - - public void importFtpfile(String ftplogpath) { - // import ftp logs - JavaRDD<String> ftpLogs = spark.sc.textFile(ftplogpath, this.partition).map(s -> FtpLog.parseFromLogLine(s)).filter(FtpLog::checknull); - - JavaEsSpark.saveJsonToEs(ftpLogs, logIndex + "/" + this.ftpType); - } - - /** - * Process a log path on local file system which contains the relevant - * parameters as below. - * - * @param fileName the {@link java.lang.String} path to the log directory on file - * system - * @param protocol whether to process 'http' or 'FTP' - * @param index the index name to write logs to - * @param type one of the available protocols from which Mudrod logs are obtained. - * @throws IOException if there is an error reading anything from the fileName provided. - */ - public void readLogFile(String fileName, String protocol, String index, String type) throws IOException { - BufferedReader br = new BufferedReader(new FileReader(fileName)); - int count = 0; - try { - String line = br.readLine(); - while (line != null) { - if ("FTP".equals(protocol)) { - parseSingleLineFTP(line, index, type); - } else { - parseSingleLineHTTP(line, index, type); - } - line = br.readLine(); - count++; - } - } catch (FileNotFoundException e) { - LOG.error("File not found.", e); - } catch (IOException e) { - LOG.error("Error reading input directory.", e); - } finally { - br.close(); - LOG.info("Num of {} entries:\t{}", protocol, count); - } - } - - /** - * Parse a single FTP log entry - * - * @param log a single log line - * @param index the index name we wish to persist the log line to - * @param type one of the available protocols from which Mudrod logs are obtained. - */ - public void parseSingleLineFTP(String log, String index, String type) { - String ip = log.split(" +")[6]; - - String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + log.split(" +")[3] + ":" + log.split(" +")[4]; - - time = switchtoNum(time); - SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy"); - Date date = null; - try { - date = formatter.parse(time); - } catch (ParseException e) { - LOG.error("Error whilst parsing the date.", e); - } - String bytes = log.split(" +")[7]; - - String request = log.split(" +")[8].toLowerCase(); - - if (!request.contains("/misc/") && !request.contains("readme")) { - IndexRequest ir; - try { - ir = new IndexRequest(index, type) - .source(jsonBuilder().startObject().field("LogType", "ftp").field("IP", ip).field("Time", date).field("Request", request).field("Bytes", Long.parseLong(bytes)).endObject()); - es.getBulkProcessor().add(ir); - } catch (NumberFormatException e) { - LOG.error("Error whilst processing numbers", e); - } catch (IOException e) { - LOG.error("IOError whilst adding to the bulk processor.", e); - } - } - - } - - /** - * Parse a single HTTP log entry - * - * @param log a single log line - * @param index the index name we wish to persist the log line to - * @param type one of the available protocols from which Mudrod logs are obtained. - */ - public void parseSingleLineHTTP(String log, String index, String type) { - matcher = p.matcher(log); - if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) { - return; - } - String time = matcher.group(4); - time = switchtoNum(time); - SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); - Date date = null; - try { - date = formatter.parse(time); - } catch (ParseException e) { - LOG.error("Error whilst attempting to parse date.", e); - } - - String bytes = matcher.group(7); - if ("-".equals(bytes)) { - bytes = "0"; - } - - String request = matcher.group(5).toLowerCase(); - String agent = matcher.group(9); - CrawlerDetection crawlerDe = new CrawlerDetection(this.props, this.es, this.spark); - if (!crawlerDe.checkKnownCrawler(agent)) { - boolean tag = false; - String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" }; - for (int i = 0; i < mimeTypes.length; i++) { - if (request.contains(mimeTypes[i])) { - tag = true; - break; - } - } - - if (!tag) { - IndexRequest ir = null; - executeBulkRequest(ir, index, type, matcher, date, bytes); - } - } - } - - private void executeBulkRequest(IndexRequest ir, String index, String type, Matcher matcher, Date date, String bytes) { - IndexRequest newIr = ir; - try { - newIr = new IndexRequest(index, type).source( - jsonBuilder().startObject().field("LogType", "PO.DAAC").field("IP", matcher.group(1)).field("Time", date).field("Request", matcher.group(5)).field("Response", matcher.group(6)) - .field("Bytes", Integer.parseInt(bytes)).field("Referer", matcher.group(8)).field("Browser", matcher.group(9)).endObject()); - - es.getBulkProcessor().add(newIr); - } catch (NumberFormatException e) { - LOG.error("Error whilst processing numbers", e); - } catch (IOException e) { - LOG.error("IOError whilst adding to the bulk processor.", e); - } - } - - @Override - public Object execute(Object o) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java deleted file mode 100644 index 5b8ed9b..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java +++ /dev/null @@ -1,228 +0,0 @@ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import gov.nasa.jpl.mudrod.weblog.partition.KGreedyPartitionSolver; -import gov.nasa.jpl.mudrod.weblog.partition.ThePartitionProblemSolver; -import gov.nasa.jpl.mudrod.weblog.partition.logPartitioner; -import org.apache.commons.io.IOUtils; -import org.apache.spark.Partition; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.io.IOException; -import java.io.InputStream; -import java.util.*; - -public class LogAbstract extends DiscoveryStepAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(LogAbstract.class); - - public String logIndex = null; - public String httpType = null; - public String ftpType = null; - public String cleanupType = null; - public String sessionStats = null; - public int partition = 96; - - public LogAbstract(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - if (props != null) { - initLogIndex(); - } - } - - protected void initLogIndex() { - logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + props.getProperty(MudrodConstants.TIME_SUFFIX); - httpType = props.getProperty(MudrodConstants.HTTP_TYPE_PREFIX); - ftpType = props.getProperty(MudrodConstants.FTP_TYPE_PREFIX); - cleanupType = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX); - sessionStats = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX); - - InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS); - InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS); - JSONObject settingsJSON = null; - JSONObject mappingJSON = null; - - try { - settingsJSON = new JSONObject(IOUtils.toString(settingsStream)); - } catch (JSONException | IOException e1) { - LOG.error("Error reading Elasticsearch settings!", e1); - } - - try { - mappingJSON = new JSONObject(IOUtils.toString(mappingsStream)); - } catch (JSONException | IOException e1) { - LOG.error("Error reading Elasticsearch mappings!", e1); - } - - try { - if (settingsJSON != null && mappingJSON != null) { - this.es.putMapping(logIndex, settingsJSON.toString(), mappingJSON.toString()); - } - } catch (IOException e) { - LOG.error("Error entering Elasticsearch Mappings!", e); - } - } - - @Override - public Object execute() { - return null; - } - - @Override - public Object execute(Object o) { - return null; - } - - public JavaRDD<String> getUserRDD(String... type) { - Map<String, Double> userDocs = getUserDocs(type); - return parallizeUsers(userDocs); - } - - public List<String> getUsers(String type) { - - Terms users = this.getUserTerms(type); - List<String> userList = new ArrayList<>(); - for (Terms.Bucket entry : users.getBuckets()) { - String ip = (String) entry.getKey(); - userList.add(ip); - } - - return userList; - } - - public Terms getUserTerms(String... type) { - - int docCount = es.getDocCount(logIndex, type); - - SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0) - .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount)).execute().actionGet(); - return sr.getAggregations().get("Users"); - } - - public Map<String, Double> getUserDocs(String... type) { - - Terms users = this.getUserTerms(type); - Map<String, Double> userList = new HashMap<>(); - for (Terms.Bucket entry : users.getBuckets()) { - String ip = (String) entry.getKey(); - Long count = entry.getDocCount(); - userList.put(ip, Double.valueOf(count)); - } - - return userList; - } - - public Map<String, Long> getUserDailyDocs() { - - int docCount = es.getDocCount(logIndex, httpType); - - AggregationBuilder dailyAgg = AggregationBuilders.dateHistogram("by_day").field("Time").dateHistogramInterval(DateHistogramInterval.DAY).order(Order.COUNT_DESC); - - SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(httpType).setQuery(QueryBuilders.matchAllQuery()).setSize(0) - .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount).subAggregation(dailyAgg)).execute().actionGet(); - Terms users = sr.getAggregations().get("Users"); - Map<String, Long> userList = new HashMap<>(); - for (Terms.Bucket user : users.getBuckets()) { - String ip = (String) user.getKey(); - - System.out.println(ip); - - Histogram agg = user.getAggregations().get("by_day"); - List<? extends Histogram.Bucket> dateList = agg.getBuckets(); - int size = dateList.size(); - for (int i = 0; i < size; i++) { - Long count = dateList.get(i).getDocCount(); - String date = dateList.get(i).getKey().toString(); - - System.out.println(date); - System.out.println(count); - } - } - - return userList; - } - - protected void checkUserPartition(JavaRDD<String> userRDD) { - System.out.println("hhhhh"); - List<Partition> partitios = userRDD.partitions(); - System.out.println(partitios.size()); - int[] partitionIds = new int[partitios.size()]; - for (int i = 0; i < partitios.size(); i++) { - int index = partitios.get(i).index(); - partitionIds[i] = index; - } - - List<String>[] userIPs = userRDD.collectPartitions(partitionIds); - for (int i = 0; i < userIPs.length; i++) { - List<String> iuser = userIPs[i]; - System.out.println(i + " partition"); - System.out.println(iuser.toString()); - } - } - - public JavaRDD<String> parallizeUsers(Map<String, Double> userDocs) { - - // prepare list for parallize - List<Tuple2<String, Double>> list = new ArrayList<>(); - for (String user : userDocs.keySet()) { - list.add(new Tuple2<String, Double>(user, userDocs.get(user))); - } - - // group users - ThePartitionProblemSolver solution = new KGreedyPartitionSolver(); - Map<String, Integer> userGroups = solution.solve(userDocs, this.partition); - - JavaPairRDD<String, Double> pairRdd = spark.sc.parallelizePairs(list); - JavaPairRDD<String, Double> userPairRDD = pairRdd.partitionBy(new logPartitioner(userGroups, this.partition)); - - // repartitioned user RDD - return userPairRDD.keys(); - } - - public Terms getSessionTerms() { - - int docCount = es.getDocCount(this.logIndex, this.cleanupType); - - SearchResponse sr = es.getClient().prepareSearch(this.logIndex).setTypes(this.cleanupType).setQuery(QueryBuilders.matchAllQuery()) - .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount)).execute().actionGet(); - - Terms Sessions = sr.getAggregations().get("Sessions"); - return Sessions; - } - - public List<String> getSessions() { - - Terms sessions = this.getSessionTerms(); - List<String> sessionList = new ArrayList<>(); - for (Terms.Bucket entry : sessions.getBuckets()) { - if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) { - String session = (String) entry.getKey(); - sessionList.add(session); - } - } - - return sessionList; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java deleted file mode 100644 index 12d7386..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java +++ /dev/null @@ -1,54 +0,0 @@ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.weblog.structure.RankingTrainData; -import gov.nasa.jpl.mudrod.weblog.structure.SessionExtractor; -import org.apache.spark.api.java.JavaRDD; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; - -public class RankingTrainDataGenerator extends DiscoveryStepAbstract { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(RankingTrainDataGenerator.class); - - public RankingTrainDataGenerator(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - // TODO Auto-generated constructor stub - } - - @Override - public Object execute() { - // TODO Auto-generated method stub - LOG.info("Starting generate ranking train data."); - startTime = System.currentTimeMillis(); - - String rankingTrainFile = "E:\\Mudrod_input_data\\Testing_Data_4_1monthLog+Meta+Onto\\traing.txt"; - try { - SessionExtractor extractor = new SessionExtractor(); - JavaRDD<RankingTrainData> rankingTrainDataRDD = extractor.extractRankingTrainData(this.props, this.es, this.spark); - - JavaRDD<String> rankingTrainData_JsonRDD = rankingTrainDataRDD.map(f -> f.toJson()); - - rankingTrainData_JsonRDD.coalesce(1, true).saveAsTextFile(rankingTrainFile); - - } catch (Exception e) { - e.printStackTrace(); - } - - endTime = System.currentTimeMillis(); - LOG.info("Ranking train data generation complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); - return null; - } - - @Override - public Object execute(Object o) { - // TODO Auto-generated method stub - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java deleted file mode 100644 index 01b57c0..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import org.elasticsearch.index.query.QueryBuilders; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; - -/** - * Supports ability to remove raw logs after processing is finished - */ -public class RemoveRawLog extends LogAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(RemoveRawLog.class); - - public RemoveRawLog(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - @Override - public Object execute() { - LOG.info("Starting raw log removal."); - startTime = System.currentTimeMillis(); - es.deleteAllByQuery(logIndex, httpType, QueryBuilders.matchAllQuery()); - es.deleteAllByQuery(logIndex, ftpType, QueryBuilders.matchAllQuery()); - endTime = System.currentTimeMillis(); - es.refreshIndex(); - LOG.info("Raw log removal complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); - return null; - } - - @Override - public Object execute(Object o) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java deleted file mode 100644 index b95e30b..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java +++ /dev/null @@ -1,452 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import gov.nasa.jpl.mudrod.weblog.structure.Session; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.VoidFunction; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.metrics.stats.Stats; -import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder; -import org.elasticsearch.search.sort.SortOrder; -import org.joda.time.DateTime; -import org.joda.time.Seconds; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -/** - * Supports ability to generate user session by time threshold and referrer - */ -public class SessionGenerator extends LogAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(SessionGenerator.class); - - public SessionGenerator(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - @Override - public Object execute() { - LOG.info("Starting Session Generation."); - startTime = System.currentTimeMillis(); - generateSession(); - endTime = System.currentTimeMillis(); - es.refreshIndex(); - LOG.info("Session generating complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); - return null; - } - - public void generateSession() { - try { - es.createBulkProcessor(); - genSessionByReferer(Integer.parseInt(props.getProperty("timegap"))); - es.destroyBulkProcessor(); - - es.createBulkProcessor(); - combineShortSessions(Integer.parseInt(props.getProperty("timegap"))); - es.destroyBulkProcessor(); - } catch (ElasticsearchException e) { - LOG.error("Error whilst executing bulk processor.", e); - } catch (IOException e) { - LOG.error("Error whilst reading configuration.", e); - } catch (NumberFormatException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - public void genSessionByReferer(int timeThres) throws InterruptedException, IOException { - String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); - if (processingType.equals("sequential")) { - genSessionByRefererInSequential(timeThres); - } else if (processingType.equals("parallel")) { - genSessionByRefererInParallel(timeThres); - } - } - - public void combineShortSessions(int timeThres) throws InterruptedException, IOException { - String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); - if (processingType.equals("sequential")) { - combineShortSessionsInSequential(timeThres); - } else if (processingType.equals("parallel")) { - combineShortSessionsInParallel(timeThres); - } - } - - /** - * Method to generate session by time threshold and referrer - * - * @param timeThres value of time threshold (s) - * @throws ElasticsearchException ElasticsearchException - * @throws IOException IOException - */ - public void genSessionByRefererInSequential(int timeThres) throws ElasticsearchException, IOException { - - Terms users = this.getUserTerms(this.cleanupType); - - int sessionCount = 0; - for (Terms.Bucket entry : users.getBuckets()) { - - String user = (String) entry.getKey(); - Integer sessionNum = genSessionByReferer(es, user, timeThres); - sessionCount += sessionNum; - } - - LOG.info("Initial session count: {}", Integer.toString(sessionCount)); - } - - public void combineShortSessionsInSequential(int timeThres) throws ElasticsearchException, IOException { - - Terms users = this.getUserTerms(this.cleanupType); - for (Terms.Bucket entry : users.getBuckets()) { - String user = entry.getKey().toString(); - combineShortSessions(es, user, timeThres); - } - } - - /** - * Method to remove invalid logs through IP address - * - * @param es an instantiated es driver - * @param ip invalid IP address - * @throws ElasticsearchException ElasticsearchException - * @throws IOException IOException - */ - public void deleteInvalid(ESDriver es, String ip) throws IOException { - - BoolQueryBuilder filterAll = new BoolQueryBuilder(); - filterAll.must(QueryBuilders.termQuery("IP", ip)); - - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterAll).setSize(100).execute().actionGet(); - while (true) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - update(es, logIndex, cleanupType, hit.getId(), "SessionID", "invalid"); - } - - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - if (scrollResp.getHits().getHits().length == 0) { - break; - } - } - } - - /** - * Method to update a Elasticsearch record/document by id, field, and value - * - * @param es - * @param index index name is Elasticsearch - * @param type type name - * @param id ID of the document that needs to be updated - * @param field1 field of the document that needs to be updated - * @param value1 value of the document that needs to be changed to - * @throws ElasticsearchException - * @throws IOException - */ - private void update(ESDriver es, String index, String type, String id, String field1, Object value1) throws IOException { - UpdateRequest ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject()); - es.getBulkProcessor().add(ur); - } - - public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException { - - JavaRDD<String> userRDD = getUserRDD(this.cleanupType); - - int sessionCount = 0; - sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Iterator<Integer> call(Iterator<String> arg0) throws Exception { - ESDriver tmpES = new ESDriver(props); - tmpES.createBulkProcessor(); - List<Integer> sessionNums = new ArrayList<>(); - while (arg0.hasNext()) { - String s = arg0.next(); - Integer sessionNum = genSessionByReferer(tmpES, s, timeThres); - sessionNums.add(sessionNum); - } - tmpES.destroyBulkProcessor(); - tmpES.close(); - return sessionNums.iterator(); - } - }).reduce(new Function2<Integer, Integer, Integer>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); - - LOG.info("Initial Session count: {}", Integer.toString(sessionCount)); - } - - public int genSessionByReferer(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException { - - String startTime = null; - int sessionCountIn = 0; - - BoolQueryBuilder filterSearch = new BoolQueryBuilder(); - filterSearch.must(QueryBuilders.termQuery("IP", user)); - - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterSearch).addSort("Time", SortOrder.ASC).setSize(100) - .execute().actionGet(); - - Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>(); - String request = ""; - String referer = ""; - String logType = ""; - String id = ""; - String ip = user; - String indexUrl = "http://podaac.jpl.nasa.gov/"; - DateTime time = null; - DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); - - while (scrollResp.getHits().getHits().length != 0) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - Map<String, Object> result = hit.getSource(); - request = (String) result.get("RequestUrl"); - referer = (String) result.get("Referer"); - logType = (String) result.get("LogType"); - time = fmt.parseDateTime((String) result.get("Time")); - id = hit.getId(); - - if ("PO.DAAC".equals(logType)) { - if ("-".equals(referer) || referer.equals(indexUrl) || !referer.contains(indexUrl)) { - sessionCountIn++; - sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); - sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); - - update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); - - } else { - int count = sessionCountIn; - int rollbackNum = 0; - while (true) { - Map<String, DateTime> requests = sessionReqs.get(ip + "@" + count); - if (requests == null) { - sessionReqs.put(ip + "@" + count, new HashMap<String, DateTime>()); - sessionReqs.get(ip + "@" + count).put(request, time); - update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + count); - - break; - } - ArrayList<String> keys = new ArrayList<>(requests.keySet()); - boolean bFindRefer = false; - - for (int i = keys.size() - 1; i >= 0; i--) { - rollbackNum++; - if (keys.get(i).equalsIgnoreCase(referer)) { - bFindRefer = true; - // threshold,if time interval > 10* - // click num, start a new session - if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(i)), time).getSeconds()) < timeThres * rollbackNum) { - sessionReqs.get(ip + "@" + count).put(request, time); - update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + count); - } else { - sessionCountIn++; - sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); - sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); - update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); - } - - break; - } - } - - if (bFindRefer) { - break; - } - - count--; - if (count < 0) { - sessionCountIn++; - - sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); - sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); - update(es, props.getProperty(MudrodConstants.ES_INDEX_NAME), this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); - - break; - } - } - } - } else if ("ftp".equals(logType)) { - - // may affect computation efficiency - Map<String, DateTime> requests = sessionReqs.get(ip + "@" + sessionCountIn); - if (requests == null) { - sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); - } else { - ArrayList<String> keys = new ArrayList<>(requests.keySet()); - int size = keys.size(); - if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(size - 1)), time).getSeconds()) > timeThres) { - sessionCountIn += 1; - sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); - } - } - sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); - update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); - } - } - - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - } - - return sessionCountIn; - } - - public void combineShortSessionsInParallel(int timeThres) throws InterruptedException, IOException { - - JavaRDD<String> userRDD = getUserRDD(this.cleanupType); - - userRDD.foreachPartition(new VoidFunction<Iterator<String>>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public void call(Iterator<String> arg0) throws Exception { - ESDriver tmpES = new ESDriver(props); - tmpES.createBulkProcessor(); - while (arg0.hasNext()) { - String s = arg0.next(); - combineShortSessions(tmpES, s, timeThres); - } - tmpES.destroyBulkProcessor(); - tmpES.close(); - } - }); - } - - public void combineShortSessions(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException { - - BoolQueryBuilder filterSearch = new BoolQueryBuilder(); - filterSearch.must(QueryBuilders.termQuery("IP", user)); - - String[] indexArr = new String[] { logIndex }; - String[] typeArr = new String[] { cleanupType }; - int docCount = es.getDocCount(indexArr, typeArr, filterSearch); - - if (docCount < 3) { - deleteInvalid(es, user); - return; - } - - BoolQueryBuilder filterCheck = new BoolQueryBuilder(); - filterCheck.must(QueryBuilders.termQuery("IP", user)).must(QueryBuilders.termQuery("Referer", "-")); - SearchResponse checkReferer = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterCheck).setSize(0).execute().actionGet(); - - long numInvalid = checkReferer.getHits().getTotalHits(); - double invalidRate = numInvalid / docCount; - - if (invalidRate >= 0.8) { - deleteInvalid(es, user); - return; - } - - StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time"); - SearchResponse srSession = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterSearch) - .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount).subAggregation(statsAgg)).execute().actionGet(); - - Terms sessions = srSession.getAggregations().get("Sessions"); - - List<Session> sessionList = new ArrayList<>(); - for (Terms.Bucket session : sessions.getBuckets()) { - Stats agg = session.getAggregations().get("Stats"); - Session sess = new Session(props, es, agg.getMinAsString(), agg.getMaxAsString(), session.getKey().toString()); - sessionList.add(sess); - } - - Collections.sort(sessionList); - DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); - String last = null; - String lastnewID = null; - String lastoldID = null; - String current = null; - for (Session s : sessionList) { - current = s.getEndTime(); - if (last != null) { - if (Seconds.secondsBetween(fmt.parseDateTime(last), fmt.parseDateTime(current)).getSeconds() < timeThres) { - if (lastnewID == null) { - s.setNewID(lastoldID); - } else { - s.setNewID(lastnewID); - } - - QueryBuilder fs = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", s.getID())); - - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(fs).setSize(100).execute().actionGet(); - while (true) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - if (lastnewID == null) { - update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastoldID); - } else { - update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastnewID); - } - } - - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - if (scrollResp.getHits().getHits().length == 0) { - break; - } - } - } - ; - } - lastoldID = s.getID(); - lastnewID = s.getNewID(); - last = current; - } - - } - - @Override - public Object execute(Object o) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java deleted file mode 100644 index 7e7640f..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.pre; - -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import gov.nasa.jpl.mudrod.weblog.structure.RequestUrl; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.metrics.stats.Stats; -import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder; -import org.joda.time.DateTime; -import org.joda.time.Seconds; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -/** - * Supports ability to post-process session, including summarizing statistics - * and filtering - */ -public class SessionStatistic extends LogAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(SessionStatistic.class); - - public SessionStatistic(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - @Override - public Object execute() { - LOG.info("Starting Session Summarization."); - startTime = System.currentTimeMillis(); - try { - processSession(); - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - endTime = System.currentTimeMillis(); - es.refreshIndex(); - LOG.info("Session Summarization complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); - return null; - } - - public void processSession() throws InterruptedException, IOException, ExecutionException { - String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); - if (processingType.equals("sequential")) { - processSessionInSequential(); - } else if (processingType.equals("parallel")) { - processSessionInParallel(); - } - } - - public void processSessionInSequential() throws IOException, InterruptedException, ExecutionException { - es.createBulkProcessor(); - Terms Sessions = this.getSessionTerms(); - int session_count = 0; - for (Terms.Bucket entry : Sessions.getBuckets()) { - if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) { - String sessionid = entry.getKey().toString(); - int sessionNum = processSession(es, sessionid); - session_count += sessionNum; - } - } - LOG.info("Final Session count: {}", Integer.toString(session_count)); - es.destroyBulkProcessor(); - } - - /** - * Extract the dataset ID from a long request - * - * @param request raw log request - * @return dataset ID - */ - public String findDataset(String request) { - String pattern1 = "/dataset/"; - String pattern2; - if (request.contains("?")) { - pattern2 = "?"; - } else { - pattern2 = " "; - } - - Pattern p = Pattern.compile(Pattern.quote(pattern1) + "(.*?)" + Pattern.quote(pattern2)); - Matcher m = p.matcher(request); - if (m.find()) { - return m.group(1); - } - return null; - } - - public void processSessionInParallel() throws InterruptedException, IOException { - - List<String> sessions = this.getSessions(); - JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition); - - int sessionCount = 0; - sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { - @Override - public Iterator<Integer> call(Iterator<String> arg0) throws Exception { - ESDriver tmpES = new ESDriver(props); - tmpES.createBulkProcessor(); - List<Integer> sessionNums = new ArrayList<Integer>(); - sessionNums.add(0); - while (arg0.hasNext()) { - String s = arg0.next(); - Integer sessionNum = processSession(tmpES, s); - sessionNums.add(sessionNum); - } - tmpES.destroyBulkProcessor(); - tmpES.close(); - return sessionNums.iterator(); - } - }).reduce(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); - - LOG.info("Final Session count: {}", Integer.toString(sessionCount)); - } - - public int processSession(ESDriver es, String sessionId) throws IOException, InterruptedException, ExecutionException { - - String inputType = cleanupType; - String outputType = sessionStats; - - DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); - String min = null; - String max = null; - DateTime start = null; - DateTime end = null; - int duration = 0; - float request_rate = 0; - - int session_count = 0; - Pattern pattern = Pattern.compile("get (.*?) http/*"); - - StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time"); - - BoolQueryBuilder filter_search = new BoolQueryBuilder(); - filter_search.must(QueryBuilders.termQuery("SessionID", sessionId)); - - SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(inputType).setQuery(filter_search).addAggregation(statsAgg).execute().actionGet(); - - Stats agg = sr.getAggregations().get("Stats"); - min = agg.getMinAsString(); - max = agg.getMaxAsString(); - start = fmt.parseDateTime(min); - end = fmt.parseDateTime(max); - - duration = Seconds.secondsBetween(start, end).getSeconds(); - - int searchDataListRequest_count = 0; - int searchDataRequest_count = 0; - int searchDataListRequest_byKeywords_count = 0; - int ftpRequest_count = 0; - int keywords_num = 0; - - String IP = null; - String keywords = ""; - String views = ""; - String downloads = ""; - - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(inputType).setScroll(new TimeValue(60000)).setQuery(filter_search).setSize(100).execute().actionGet(); - - while (true) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - Map<String, Object> result = hit.getSource(); - - String request = (String) result.get("Request"); - String logType = (String) result.get("LogType"); - IP = (String) result.get("IP"); - Matcher matcher = pattern.matcher(request.trim().toLowerCase()); - while (matcher.find()) { - request = matcher.group(1); - } - - String datasetlist = "/datasetlist?"; - String dataset = "/dataset/"; - if (request.contains(datasetlist)) { - searchDataListRequest_count++; - - RequestUrl requestURL = new RequestUrl(); - String infoStr = requestURL.getSearchInfo(request) + ","; - String info = es.customAnalyzing(props.getProperty("indexName"), infoStr); - - if (!info.equals(",")) { - if (keywords.equals("")) { - keywords = keywords + info; - } else { - String[] items = info.split(","); - String[] keywordList = keywords.split(","); - for (int m = 0; m < items.length; m++) { - if (!Arrays.asList(keywordList).contains(items[m])) { - keywords = keywords + items[m] + ","; - } - } - } - } - - } - if (request.startsWith(dataset)) { - searchDataRequest_count++; - if (findDataset(request) != null) { - String view = findDataset(request); - - if ("".equals(views)) { - views = view; - } else { - if (views.contains(view)) { - - } else { - views = views + "," + view; - } - } - } - } - if ("ftp".equals(logType)) { - ftpRequest_count++; - String download = ""; - String requestLowercase = request.toLowerCase(); - if (requestLowercase.endsWith(".jpg") == false && requestLowercase.endsWith(".pdf") == false && requestLowercase.endsWith(".txt") == false && requestLowercase.endsWith(".gif") == false) { - download = request; - } - - if ("".equals(downloads)) { - downloads = download; - } else { - if (downloads.contains(download)) { - - } else { - downloads = downloads + "," + download; - } - } - } - - } - - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - // Break condition: No hits are returned - if (scrollResp.getHits().getHits().length == 0) { - break; - } - } - - if (!keywords.equals("")) { - keywords_num = keywords.split(",").length; - } - - if (searchDataListRequest_count != 0 && searchDataListRequest_count <= Integer.parseInt(props.getProperty("searchf")) && searchDataRequest_count != 0 && searchDataRequest_count <= Integer - .parseInt(props.getProperty("viewf")) && ftpRequest_count <= Integer.parseInt(props.getProperty("downloadf"))) { - String sessionURL = props.getProperty("SessionPort") + props.getProperty("SessionUrl") + "?sessionid=" + sessionId + "&sessionType=" + outputType + "&requestType=" + inputType; - session_count = 1; - - IndexRequest ir = new IndexRequest(logIndex, outputType).source( - jsonBuilder().startObject().field("SessionID", sessionId).field("SessionURL", sessionURL).field("Duration", duration).field("Number of Keywords", keywords_num).field("Time", min) - .field("End_time", max).field("searchDataListRequest_count", searchDataListRequest_count).field("searchDataListRequest_byKeywords_count", searchDataListRequest_byKeywords_count) - .field("searchDataRequest_count", searchDataRequest_count).field("keywords", es.customAnalyzing(logIndex, keywords)).field("views", views).field("downloads", downloads) - .field("request_rate", request_rate).field("Comments", "").field("Validation", 0).field("Produceby", 0).field("Correlation", 0).field("IP", IP).endObject()); - - es.getBulkProcessor().add(ir); - } - - return session_count; - } - - @Override - public Object execute(Object o) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java deleted file mode 100644 index 5a3a1c3..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes Preprocessing for all functionality required by the - * {@link gov.nasa.jpl.mudrod.discoveryengine.WeblogDiscoveryEngine} - */ -package gov.nasa.jpl.mudrod.weblog.pre; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java deleted file mode 100644 index 2b8f45f..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.process; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.semantics.SVDAnalyzer; -import gov.nasa.jpl.mudrod.ssearch.ClickstreamImporter; -import gov.nasa.jpl.mudrod.utils.LinkageTriple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.List; -import java.util.Properties; - -/** - * Supports ability to calculate term similarity based on click stream - */ -public class ClickStreamAnalyzer extends DiscoveryStepAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(ClickStreamAnalyzer.class); - - public ClickStreamAnalyzer(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - /** - * Method of executing click stream analyzer - */ - @Override - public Object execute() { - LOG.info("Starting ClickStreamAnalyzer..."); - startTime = System.currentTimeMillis(); - try { - String clickstream_matrixFile = props.getProperty("clickstreamMatrix"); - File f = new File(clickstream_matrixFile); - if (f.exists()) { - SVDAnalyzer svd = new SVDAnalyzer(props, es, spark); - svd.getSVDMatrix(props.getProperty("clickstreamMatrix"), Integer.parseInt(props.getProperty("clickstreamSVDDimension")), props.getProperty("clickstreamSVDMatrix_tmp")); - List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty("clickstreamSVDMatrix_tmp")); - svd.saveToES(tripleList, props.getProperty("indexName"), props.getProperty("clickStreamLinkageType")); - - // Store click stream in ES for the ranking use - ClickstreamImporter cs = new ClickstreamImporter(props, es, spark); - cs.importfromCSVtoES(); - } - } catch (Exception e) { - LOG.error("Encountered an error during execution of ClickStreamAnalyzer.", e); - } - - endTime = System.currentTimeMillis(); - es.refreshIndex(); - LOG.info("ClickStreamAnalyzer complete. Time elapsed: {}s", (endTime - startTime) / 1000); - return null; - } - - @Override - public Object execute(Object o) { - return null; - } -}
