http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java new file mode 100644 index 0000000..c55082a --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java @@ -0,0 +1,343 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.pre; + +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; +import gov.nasa.jpl.mudrod.weblog.structure.ApacheAccessLog; +import gov.nasa.jpl.mudrod.weblog.structure.FtpLog; +import org.apache.spark.api.java.JavaRDD; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to parse and process FTP and HTTP log files + */ +public class ImportLogFile extends LogAbstract { + + private static final Logger LOG = LoggerFactory.getLogger(ImportLogFile.class); + + /** + * + */ + private static final long serialVersionUID = 1L; + + String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] " + "\"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" \"([^\"]+)\""; + + public static final int NUM_FIELDS = 9; + Pattern p = Pattern.compile(logEntryPattern); + transient Matcher matcher; + + /** + * Constructor supporting a number of parameters documented below. + * + * @param props a {@link java.util.Map} containing K,V of type String, String + * respectively. + * @param es the {@link ESDriver} used to persist log + * files. + * @param spark the {@link SparkDriver} used to process + * input log files. + */ + public ImportLogFile(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting Log Import {}", props.getProperty(MudrodConstants.TIME_SUFFIX)); + startTime = System.currentTimeMillis(); + readFile(); + endTime = System.currentTimeMillis(); + LOG.info("Log Import complete. Time elapsed {} seconds", (endTime - startTime) / 1000); + es.refreshIndex(); + return null; + } + + /** + * Utility function to aid String to Number formatting such that three letter + * months such as 'Jan' are converted to the Gregorian integer equivalent. + * + * @param time the input {@link java.lang.String} to convert to int. + * @return the converted Month as an int. + */ + public String switchtoNum(String time) { + String newTime = time; + if (newTime.contains("Jan")) { + newTime = newTime.replace("Jan", "1"); + } else if (newTime.contains("Feb")) { + newTime = newTime.replace("Feb", "2"); + } else if (newTime.contains("Mar")) { + newTime = newTime.replace("Mar", "3"); + } else if (newTime.contains("Apr")) { + newTime = newTime.replace("Apr", "4"); + } else if (newTime.contains("May")) { + newTime = newTime.replace("May", "5"); + } else if (newTime.contains("Jun")) { + newTime = newTime.replace("Jun", "6"); + } else if (newTime.contains("Jul")) { + newTime = newTime.replace("Jul", "7"); + } else if (newTime.contains("Aug")) { + newTime = newTime.replace("Aug", "8"); + } else if (newTime.contains("Sep")) { + newTime = newTime.replace("Sep", "9"); + } else if (newTime.contains("Oct")) { + newTime = newTime.replace("Oct", "10"); + } else if (newTime.contains("Nov")) { + newTime = newTime.replace("Nov", "11"); + } else if (newTime.contains("Dec")) { + newTime = newTime.replace("Dec", "12"); + } + return newTime; + } + + public void readFile() { + + String httplogpath = null; + String ftplogpath = null; + + File directory = new File(props.getProperty(MudrodConstants.DATA_DIR)); + File[] fList = directory.listFiles(); + for (File file : fList) { + if (file.isFile() && file.getName().contains(props.getProperty(MudrodConstants.TIME_SUFFIX))) + { + if (file.getName().contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) + { + httplogpath = file.getAbsolutePath(); + } + + if (file.getName().contains(props.getProperty(MudrodConstants.FTP_PREFIX))) + { + ftplogpath = file.getAbsolutePath(); + } + } + } + + if(httplogpath == null || ftplogpath == null) + { + LOG.error("WWW file or FTP logs cannot be found, please check your data directory."); + return; + } + + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE, "parallel"); + if (processingType.equals("sequential")) { + readFileInSequential(httplogpath, ftplogpath); + } else if (processingType.equals("parallel")) { + readFileInParallel(httplogpath, ftplogpath); + } + } + + /** + * Read the FTP or HTTP log path with the intention of processing lines from + * log files. + * + * @param httplogpath path to the parent directory containing http logs + * @param ftplogpath path to the parent directory containing ftp logs + */ + public void readFileInSequential(String httplogpath, String ftplogpath) { + es.createBulkProcessor(); + try { + readLogFile(httplogpath, "http", logIndex, httpType); + readLogFile(ftplogpath, "FTP", logIndex, ftpType); + + } catch (IOException e) { + LOG.error("Error whilst reading log file.", e); + } + es.destroyBulkProcessor(); + } + + /** + * Read the FTP or HTTP log path with the intention of processing lines from + * log files. + * + * @param httplogpath path to the parent directory containing http logs + * @param ftplogpath path to the parent directory containing ftp logs + */ + public void readFileInParallel(String httplogpath, String ftplogpath) { + + importHttpfile(httplogpath); + importFtpfile(ftplogpath); + } + + public void importHttpfile(String httplogpath) { + // import http logs + JavaRDD<String> accessLogs = spark.sc.textFile(httplogpath, this.partition).map(s -> ApacheAccessLog.parseFromLogLine(s)).filter(ApacheAccessLog::checknull); + + JavaEsSpark.saveJsonToEs(accessLogs, logIndex + "/" + this.httpType); + } + + public void importFtpfile(String ftplogpath) { + // import ftp logs + JavaRDD<String> ftpLogs = spark.sc.textFile(ftplogpath, this.partition).map(s -> FtpLog.parseFromLogLine(s)).filter(FtpLog::checknull); + + JavaEsSpark.saveJsonToEs(ftpLogs, logIndex + "/" + this.ftpType); + } + + /** + * Process a log path on local file system which contains the relevant + * parameters as below. + * + * @param fileName the {@link java.lang.String} path to the log directory on file + * system + * @param protocol whether to process 'http' or 'FTP' + * @param index the index name to write logs to + * @param type one of the available protocols from which Mudrod logs are obtained. + * @throws IOException if there is an error reading anything from the fileName provided. + */ + public void readLogFile(String fileName, String protocol, String index, String type) throws IOException { + BufferedReader br = new BufferedReader(new FileReader(fileName)); + int count = 0; + try { + String line = br.readLine(); + while (line != null) { + if ("FTP".equals(protocol)) { + parseSingleLineFTP(line, index, type); + } else { + parseSingleLineHTTP(line, index, type); + } + line = br.readLine(); + count++; + } + } catch (FileNotFoundException e) { + LOG.error("File not found.", e); + } catch (IOException e) { + LOG.error("Error reading input directory.", e); + } finally { + br.close(); + LOG.info("Num of {} entries:\t{}", protocol, count); + } + } + + /** + * Parse a single FTP log entry + * + * @param log a single log line + * @param index the index name we wish to persist the log line to + * @param type one of the available protocols from which Mudrod logs are obtained. + */ + public void parseSingleLineFTP(String log, String index, String type) { + String ip = log.split(" +")[6]; + + String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + log.split(" +")[3] + ":" + log.split(" +")[4]; + + time = switchtoNum(time); + SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy"); + Date date = null; + try { + date = formatter.parse(time); + } catch (ParseException e) { + LOG.error("Error whilst parsing the date.", e); + } + String bytes = log.split(" +")[7]; + + String request = log.split(" +")[8].toLowerCase(); + + if (!request.contains("/misc/") && !request.contains("readme")) { + IndexRequest ir; + try { + ir = new IndexRequest(index, type) + .source(jsonBuilder().startObject().field("LogType", "ftp").field("IP", ip).field("Time", date).field("Request", request).field("Bytes", Long.parseLong(bytes)).endObject()); + es.getBulkProcessor().add(ir); + } catch (NumberFormatException e) { + LOG.error("Error whilst processing numbers", e); + } catch (IOException e) { + LOG.error("IOError whilst adding to the bulk processor.", e); + } + } + + } + + /** + * Parse a single HTTP log entry + * + * @param log a single log line + * @param index the index name we wish to persist the log line to + * @param type one of the available protocols from which Mudrod logs are obtained. + */ + public void parseSingleLineHTTP(String log, String index, String type) { + matcher = p.matcher(log); + if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) { + return; + } + String time = matcher.group(4); + time = switchtoNum(time); + SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); + Date date = null; + try { + date = formatter.parse(time); + } catch (ParseException e) { + LOG.error("Error whilst attempting to parse date.", e); + } + + String bytes = matcher.group(7); + if ("-".equals(bytes)) { + bytes = "0"; + } + + String request = matcher.group(5).toLowerCase(); + String agent = matcher.group(9); + CrawlerDetection crawlerDe = new CrawlerDetection(this.props, this.es, this.spark); + if (!crawlerDe.checkKnownCrawler(agent)) { + boolean tag = false; + String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" }; + for (int i = 0; i < mimeTypes.length; i++) { + if (request.contains(mimeTypes[i])) { + tag = true; + break; + } + } + + if (!tag) { + IndexRequest ir = null; + executeBulkRequest(ir, index, type, matcher, date, bytes); + } + } + } + + private void executeBulkRequest(IndexRequest ir, String index, String type, Matcher matcher, Date date, String bytes) { + IndexRequest newIr = ir; + try { + newIr = new IndexRequest(index, type).source( + jsonBuilder().startObject().field("LogType", "PO.DAAC").field("IP", matcher.group(1)).field("Time", date).field("Request", matcher.group(5)).field("Response", matcher.group(6)) + .field("Bytes", Integer.parseInt(bytes)).field("Referer", matcher.group(8)).field("Browser", matcher.group(9)).endObject()); + + es.getBulkProcessor().add(newIr); + } catch (NumberFormatException e) { + LOG.error("Error whilst processing numbers", e); + } catch (IOException e) { + LOG.error("IOError whilst adding to the bulk processor.", e); + } + } + + @Override + public Object execute(Object o) { + return null; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java new file mode 100644 index 0000000..5b8ed9b --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java @@ -0,0 +1,228 @@ +package gov.nasa.jpl.mudrod.weblog.pre; + +import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; +import gov.nasa.jpl.mudrod.weblog.partition.KGreedyPartitionSolver; +import gov.nasa.jpl.mudrod.weblog.partition.ThePartitionProblemSolver; +import gov.nasa.jpl.mudrod.weblog.partition.logPartitioner; +import org.apache.commons.io.IOUtils; +import org.apache.spark.Partition; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +public class LogAbstract extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(LogAbstract.class); + + public String logIndex = null; + public String httpType = null; + public String ftpType = null; + public String cleanupType = null; + public String sessionStats = null; + public int partition = 96; + + public LogAbstract(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + if (props != null) { + initLogIndex(); + } + } + + protected void initLogIndex() { + logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + props.getProperty(MudrodConstants.TIME_SUFFIX); + httpType = props.getProperty(MudrodConstants.HTTP_TYPE_PREFIX); + ftpType = props.getProperty(MudrodConstants.FTP_TYPE_PREFIX); + cleanupType = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX); + sessionStats = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX); + + InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS); + InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS); + JSONObject settingsJSON = null; + JSONObject mappingJSON = null; + + try { + settingsJSON = new JSONObject(IOUtils.toString(settingsStream)); + } catch (JSONException | IOException e1) { + LOG.error("Error reading Elasticsearch settings!", e1); + } + + try { + mappingJSON = new JSONObject(IOUtils.toString(mappingsStream)); + } catch (JSONException | IOException e1) { + LOG.error("Error reading Elasticsearch mappings!", e1); + } + + try { + if (settingsJSON != null && mappingJSON != null) { + this.es.putMapping(logIndex, settingsJSON.toString(), mappingJSON.toString()); + } + } catch (IOException e) { + LOG.error("Error entering Elasticsearch Mappings!", e); + } + } + + @Override + public Object execute() { + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + public JavaRDD<String> getUserRDD(String... type) { + Map<String, Double> userDocs = getUserDocs(type); + return parallizeUsers(userDocs); + } + + public List<String> getUsers(String type) { + + Terms users = this.getUserTerms(type); + List<String> userList = new ArrayList<>(); + for (Terms.Bucket entry : users.getBuckets()) { + String ip = (String) entry.getKey(); + userList.add(ip); + } + + return userList; + } + + public Terms getUserTerms(String... type) { + + int docCount = es.getDocCount(logIndex, type); + + SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0) + .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount)).execute().actionGet(); + return sr.getAggregations().get("Users"); + } + + public Map<String, Double> getUserDocs(String... type) { + + Terms users = this.getUserTerms(type); + Map<String, Double> userList = new HashMap<>(); + for (Terms.Bucket entry : users.getBuckets()) { + String ip = (String) entry.getKey(); + Long count = entry.getDocCount(); + userList.put(ip, Double.valueOf(count)); + } + + return userList; + } + + public Map<String, Long> getUserDailyDocs() { + + int docCount = es.getDocCount(logIndex, httpType); + + AggregationBuilder dailyAgg = AggregationBuilders.dateHistogram("by_day").field("Time").dateHistogramInterval(DateHistogramInterval.DAY).order(Order.COUNT_DESC); + + SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(httpType).setQuery(QueryBuilders.matchAllQuery()).setSize(0) + .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount).subAggregation(dailyAgg)).execute().actionGet(); + Terms users = sr.getAggregations().get("Users"); + Map<String, Long> userList = new HashMap<>(); + for (Terms.Bucket user : users.getBuckets()) { + String ip = (String) user.getKey(); + + System.out.println(ip); + + Histogram agg = user.getAggregations().get("by_day"); + List<? extends Histogram.Bucket> dateList = agg.getBuckets(); + int size = dateList.size(); + for (int i = 0; i < size; i++) { + Long count = dateList.get(i).getDocCount(); + String date = dateList.get(i).getKey().toString(); + + System.out.println(date); + System.out.println(count); + } + } + + return userList; + } + + protected void checkUserPartition(JavaRDD<String> userRDD) { + System.out.println("hhhhh"); + List<Partition> partitios = userRDD.partitions(); + System.out.println(partitios.size()); + int[] partitionIds = new int[partitios.size()]; + for (int i = 0; i < partitios.size(); i++) { + int index = partitios.get(i).index(); + partitionIds[i] = index; + } + + List<String>[] userIPs = userRDD.collectPartitions(partitionIds); + for (int i = 0; i < userIPs.length; i++) { + List<String> iuser = userIPs[i]; + System.out.println(i + " partition"); + System.out.println(iuser.toString()); + } + } + + public JavaRDD<String> parallizeUsers(Map<String, Double> userDocs) { + + // prepare list for parallize + List<Tuple2<String, Double>> list = new ArrayList<>(); + for (String user : userDocs.keySet()) { + list.add(new Tuple2<String, Double>(user, userDocs.get(user))); + } + + // group users + ThePartitionProblemSolver solution = new KGreedyPartitionSolver(); + Map<String, Integer> userGroups = solution.solve(userDocs, this.partition); + + JavaPairRDD<String, Double> pairRdd = spark.sc.parallelizePairs(list); + JavaPairRDD<String, Double> userPairRDD = pairRdd.partitionBy(new logPartitioner(userGroups, this.partition)); + + // repartitioned user RDD + return userPairRDD.keys(); + } + + public Terms getSessionTerms() { + + int docCount = es.getDocCount(this.logIndex, this.cleanupType); + + SearchResponse sr = es.getClient().prepareSearch(this.logIndex).setTypes(this.cleanupType).setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount)).execute().actionGet(); + + Terms Sessions = sr.getAggregations().get("Sessions"); + return Sessions; + } + + public List<String> getSessions() { + + Terms sessions = this.getSessionTerms(); + List<String> sessionList = new ArrayList<>(); + for (Terms.Bucket entry : sessions.getBuckets()) { + if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) { + String session = (String) entry.getKey(); + sessionList.add(session); + } + } + + return sessionList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java new file mode 100644 index 0000000..12d7386 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java @@ -0,0 +1,54 @@ +package gov.nasa.jpl.mudrod.weblog.pre; + +import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.weblog.structure.RankingTrainData; +import gov.nasa.jpl.mudrod.weblog.structure.SessionExtractor; +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class RankingTrainDataGenerator extends DiscoveryStepAbstract { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(RankingTrainDataGenerator.class); + + public RankingTrainDataGenerator(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + // TODO Auto-generated constructor stub + } + + @Override + public Object execute() { + // TODO Auto-generated method stub + LOG.info("Starting generate ranking train data."); + startTime = System.currentTimeMillis(); + + String rankingTrainFile = "E:\\Mudrod_input_data\\Testing_Data_4_1monthLog+Meta+Onto\\traing.txt"; + try { + SessionExtractor extractor = new SessionExtractor(); + JavaRDD<RankingTrainData> rankingTrainDataRDD = extractor.extractRankingTrainData(this.props, this.es, this.spark); + + JavaRDD<String> rankingTrainData_JsonRDD = rankingTrainDataRDD.map(f -> f.toJson()); + + rankingTrainData_JsonRDD.coalesce(1, true).saveAsTextFile(rankingTrainFile); + + } catch (Exception e) { + e.printStackTrace(); + } + + endTime = System.currentTimeMillis(); + LOG.info("Ranking train data generation complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + // TODO Auto-generated method stub + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java new file mode 100644 index 0000000..01b57c0 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.pre; + +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import org.elasticsearch.index.query.QueryBuilders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Supports ability to remove raw logs after processing is finished + */ +public class RemoveRawLog extends LogAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(RemoveRawLog.class); + + public RemoveRawLog(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting raw log removal."); + startTime = System.currentTimeMillis(); + es.deleteAllByQuery(logIndex, httpType, QueryBuilders.matchAllQuery()); + es.deleteAllByQuery(logIndex, ftpType, QueryBuilders.matchAllQuery()); + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("Raw log removal complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java new file mode 100644 index 0000000..b95e30b --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java @@ -0,0 +1,452 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.pre; + +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; +import gov.nasa.jpl.mudrod.weblog.structure.Session; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.VoidFunction; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; +import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.joda.time.DateTime; +import org.joda.time.Seconds; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to generate user session by time threshold and referrer + */ +public class SessionGenerator extends LogAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SessionGenerator.class); + + public SessionGenerator(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting Session Generation."); + startTime = System.currentTimeMillis(); + generateSession(); + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("Session generating complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + public void generateSession() { + try { + es.createBulkProcessor(); + genSessionByReferer(Integer.parseInt(props.getProperty("timegap"))); + es.destroyBulkProcessor(); + + es.createBulkProcessor(); + combineShortSessions(Integer.parseInt(props.getProperty("timegap"))); + es.destroyBulkProcessor(); + } catch (ElasticsearchException e) { + LOG.error("Error whilst executing bulk processor.", e); + } catch (IOException e) { + LOG.error("Error whilst reading configuration.", e); + } catch (NumberFormatException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void genSessionByReferer(int timeThres) throws InterruptedException, IOException { + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); + if (processingType.equals("sequential")) { + genSessionByRefererInSequential(timeThres); + } else if (processingType.equals("parallel")) { + genSessionByRefererInParallel(timeThres); + } + } + + public void combineShortSessions(int timeThres) throws InterruptedException, IOException { + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); + if (processingType.equals("sequential")) { + combineShortSessionsInSequential(timeThres); + } else if (processingType.equals("parallel")) { + combineShortSessionsInParallel(timeThres); + } + } + + /** + * Method to generate session by time threshold and referrer + * + * @param timeThres value of time threshold (s) + * @throws ElasticsearchException ElasticsearchException + * @throws IOException IOException + */ + public void genSessionByRefererInSequential(int timeThres) throws ElasticsearchException, IOException { + + Terms users = this.getUserTerms(this.cleanupType); + + int sessionCount = 0; + for (Terms.Bucket entry : users.getBuckets()) { + + String user = (String) entry.getKey(); + Integer sessionNum = genSessionByReferer(es, user, timeThres); + sessionCount += sessionNum; + } + + LOG.info("Initial session count: {}", Integer.toString(sessionCount)); + } + + public void combineShortSessionsInSequential(int timeThres) throws ElasticsearchException, IOException { + + Terms users = this.getUserTerms(this.cleanupType); + for (Terms.Bucket entry : users.getBuckets()) { + String user = entry.getKey().toString(); + combineShortSessions(es, user, timeThres); + } + } + + /** + * Method to remove invalid logs through IP address + * + * @param es an instantiated es driver + * @param ip invalid IP address + * @throws ElasticsearchException ElasticsearchException + * @throws IOException IOException + */ + public void deleteInvalid(ESDriver es, String ip) throws IOException { + + BoolQueryBuilder filterAll = new BoolQueryBuilder(); + filterAll.must(QueryBuilders.termQuery("IP", ip)); + + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterAll).setSize(100).execute().actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + update(es, logIndex, cleanupType, hit.getId(), "SessionID", "invalid"); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + } + + /** + * Method to update a Elasticsearch record/document by id, field, and value + * + * @param es + * @param index index name is Elasticsearch + * @param type type name + * @param id ID of the document that needs to be updated + * @param field1 field of the document that needs to be updated + * @param value1 value of the document that needs to be changed to + * @throws ElasticsearchException + * @throws IOException + */ + private void update(ESDriver es, String index, String type, String id, String field1, Object value1) throws IOException { + UpdateRequest ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject()); + es.getBulkProcessor().add(ur); + } + + public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException { + + JavaRDD<String> userRDD = getUserRDD(this.cleanupType); + + int sessionCount = 0; + sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Iterator<Integer> call(Iterator<String> arg0) throws Exception { + ESDriver tmpES = new ESDriver(props); + tmpES.createBulkProcessor(); + List<Integer> sessionNums = new ArrayList<>(); + while (arg0.hasNext()) { + String s = arg0.next(); + Integer sessionNum = genSessionByReferer(tmpES, s, timeThres); + sessionNums.add(sessionNum); + } + tmpES.destroyBulkProcessor(); + tmpES.close(); + return sessionNums.iterator(); + } + }).reduce(new Function2<Integer, Integer, Integer>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Integer call(Integer a, Integer b) { + return a + b; + } + }); + + LOG.info("Initial Session count: {}", Integer.toString(sessionCount)); + } + + public int genSessionByReferer(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException { + + String startTime = null; + int sessionCountIn = 0; + + BoolQueryBuilder filterSearch = new BoolQueryBuilder(); + filterSearch.must(QueryBuilders.termQuery("IP", user)); + + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterSearch).addSort("Time", SortOrder.ASC).setSize(100) + .execute().actionGet(); + + Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>(); + String request = ""; + String referer = ""; + String logType = ""; + String id = ""; + String ip = user; + String indexUrl = "http://podaac.jpl.nasa.gov/"; + DateTime time = null; + DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + + while (scrollResp.getHits().getHits().length != 0) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + request = (String) result.get("RequestUrl"); + referer = (String) result.get("Referer"); + logType = (String) result.get("LogType"); + time = fmt.parseDateTime((String) result.get("Time")); + id = hit.getId(); + + if ("PO.DAAC".equals(logType)) { + if ("-".equals(referer) || referer.equals(indexUrl) || !referer.contains(indexUrl)) { + sessionCountIn++; + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); + + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); + + } else { + int count = sessionCountIn; + int rollbackNum = 0; + while (true) { + Map<String, DateTime> requests = sessionReqs.get(ip + "@" + count); + if (requests == null) { + sessionReqs.put(ip + "@" + count, new HashMap<String, DateTime>()); + sessionReqs.get(ip + "@" + count).put(request, time); + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + count); + + break; + } + ArrayList<String> keys = new ArrayList<>(requests.keySet()); + boolean bFindRefer = false; + + for (int i = keys.size() - 1; i >= 0; i--) { + rollbackNum++; + if (keys.get(i).equalsIgnoreCase(referer)) { + bFindRefer = true; + // threshold,if time interval > 10* + // click num, start a new session + if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(i)), time).getSeconds()) < timeThres * rollbackNum) { + sessionReqs.get(ip + "@" + count).put(request, time); + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + count); + } else { + sessionCountIn++; + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); + } + + break; + } + } + + if (bFindRefer) { + break; + } + + count--; + if (count < 0) { + sessionCountIn++; + + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); + update(es, props.getProperty(MudrodConstants.ES_INDEX_NAME), this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); + + break; + } + } + } + } else if ("ftp".equals(logType)) { + + // may affect computation efficiency + Map<String, DateTime> requests = sessionReqs.get(ip + "@" + sessionCountIn); + if (requests == null) { + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + } else { + ArrayList<String> keys = new ArrayList<>(requests.keySet()); + int size = keys.size(); + if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(size - 1)), time).getSeconds()) > timeThres) { + sessionCountIn += 1; + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + } + } + sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); + } + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + } + + return sessionCountIn; + } + + public void combineShortSessionsInParallel(int timeThres) throws InterruptedException, IOException { + + JavaRDD<String> userRDD = getUserRDD(this.cleanupType); + + userRDD.foreachPartition(new VoidFunction<Iterator<String>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public void call(Iterator<String> arg0) throws Exception { + ESDriver tmpES = new ESDriver(props); + tmpES.createBulkProcessor(); + while (arg0.hasNext()) { + String s = arg0.next(); + combineShortSessions(tmpES, s, timeThres); + } + tmpES.destroyBulkProcessor(); + tmpES.close(); + } + }); + } + + public void combineShortSessions(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException { + + BoolQueryBuilder filterSearch = new BoolQueryBuilder(); + filterSearch.must(QueryBuilders.termQuery("IP", user)); + + String[] indexArr = new String[] { logIndex }; + String[] typeArr = new String[] { cleanupType }; + int docCount = es.getDocCount(indexArr, typeArr, filterSearch); + + if (docCount < 3) { + deleteInvalid(es, user); + return; + } + + BoolQueryBuilder filterCheck = new BoolQueryBuilder(); + filterCheck.must(QueryBuilders.termQuery("IP", user)).must(QueryBuilders.termQuery("Referer", "-")); + SearchResponse checkReferer = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterCheck).setSize(0).execute().actionGet(); + + long numInvalid = checkReferer.getHits().getTotalHits(); + double invalidRate = numInvalid / docCount; + + if (invalidRate >= 0.8) { + deleteInvalid(es, user); + return; + } + + StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time"); + SearchResponse srSession = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterSearch) + .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount).subAggregation(statsAgg)).execute().actionGet(); + + Terms sessions = srSession.getAggregations().get("Sessions"); + + List<Session> sessionList = new ArrayList<>(); + for (Terms.Bucket session : sessions.getBuckets()) { + Stats agg = session.getAggregations().get("Stats"); + Session sess = new Session(props, es, agg.getMinAsString(), agg.getMaxAsString(), session.getKey().toString()); + sessionList.add(sess); + } + + Collections.sort(sessionList); + DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + String last = null; + String lastnewID = null; + String lastoldID = null; + String current = null; + for (Session s : sessionList) { + current = s.getEndTime(); + if (last != null) { + if (Seconds.secondsBetween(fmt.parseDateTime(last), fmt.parseDateTime(current)).getSeconds() < timeThres) { + if (lastnewID == null) { + s.setNewID(lastoldID); + } else { + s.setNewID(lastnewID); + } + + QueryBuilder fs = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", s.getID())); + + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(fs).setSize(100).execute().actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + if (lastnewID == null) { + update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastoldID); + } else { + update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastnewID); + } + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + } + ; + } + lastoldID = s.getID(); + lastnewID = s.getNewID(); + last = current; + } + + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java new file mode 100644 index 0000000..7e7640f --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java @@ -0,0 +1,312 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.pre; + +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; +import gov.nasa.jpl.mudrod.weblog.structure.RequestUrl; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; +import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder; +import org.joda.time.DateTime; +import org.joda.time.Seconds; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to post-process session, including summarizing statistics + * and filtering + */ +public class SessionStatistic extends LogAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SessionStatistic.class); + + public SessionStatistic(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting Session Summarization."); + startTime = System.currentTimeMillis(); + try { + processSession(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("Session Summarization complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + public void processSession() throws InterruptedException, IOException, ExecutionException { + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); + if (processingType.equals("sequential")) { + processSessionInSequential(); + } else if (processingType.equals("parallel")) { + processSessionInParallel(); + } + } + + public void processSessionInSequential() throws IOException, InterruptedException, ExecutionException { + es.createBulkProcessor(); + Terms Sessions = this.getSessionTerms(); + int session_count = 0; + for (Terms.Bucket entry : Sessions.getBuckets()) { + if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) { + String sessionid = entry.getKey().toString(); + int sessionNum = processSession(es, sessionid); + session_count += sessionNum; + } + } + LOG.info("Final Session count: {}", Integer.toString(session_count)); + es.destroyBulkProcessor(); + } + + /** + * Extract the dataset ID from a long request + * + * @param request raw log request + * @return dataset ID + */ + public String findDataset(String request) { + String pattern1 = "/dataset/"; + String pattern2; + if (request.contains("?")) { + pattern2 = "?"; + } else { + pattern2 = " "; + } + + Pattern p = Pattern.compile(Pattern.quote(pattern1) + "(.*?)" + Pattern.quote(pattern2)); + Matcher m = p.matcher(request); + if (m.find()) { + return m.group(1); + } + return null; + } + + public void processSessionInParallel() throws InterruptedException, IOException { + + List<String> sessions = this.getSessions(); + JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition); + + int sessionCount = 0; + sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { + @Override + public Iterator<Integer> call(Iterator<String> arg0) throws Exception { + ESDriver tmpES = new ESDriver(props); + tmpES.createBulkProcessor(); + List<Integer> sessionNums = new ArrayList<Integer>(); + sessionNums.add(0); + while (arg0.hasNext()) { + String s = arg0.next(); + Integer sessionNum = processSession(tmpES, s); + sessionNums.add(sessionNum); + } + tmpES.destroyBulkProcessor(); + tmpES.close(); + return sessionNums.iterator(); + } + }).reduce(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer a, Integer b) { + return a + b; + } + }); + + LOG.info("Final Session count: {}", Integer.toString(sessionCount)); + } + + public int processSession(ESDriver es, String sessionId) throws IOException, InterruptedException, ExecutionException { + + String inputType = cleanupType; + String outputType = sessionStats; + + DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + String min = null; + String max = null; + DateTime start = null; + DateTime end = null; + int duration = 0; + float request_rate = 0; + + int session_count = 0; + Pattern pattern = Pattern.compile("get (.*?) http/*"); + + StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time"); + + BoolQueryBuilder filter_search = new BoolQueryBuilder(); + filter_search.must(QueryBuilders.termQuery("SessionID", sessionId)); + + SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(inputType).setQuery(filter_search).addAggregation(statsAgg).execute().actionGet(); + + Stats agg = sr.getAggregations().get("Stats"); + min = agg.getMinAsString(); + max = agg.getMaxAsString(); + start = fmt.parseDateTime(min); + end = fmt.parseDateTime(max); + + duration = Seconds.secondsBetween(start, end).getSeconds(); + + int searchDataListRequest_count = 0; + int searchDataRequest_count = 0; + int searchDataListRequest_byKeywords_count = 0; + int ftpRequest_count = 0; + int keywords_num = 0; + + String IP = null; + String keywords = ""; + String views = ""; + String downloads = ""; + + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(inputType).setScroll(new TimeValue(60000)).setQuery(filter_search).setSize(100).execute().actionGet(); + + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + + String request = (String) result.get("Request"); + String logType = (String) result.get("LogType"); + IP = (String) result.get("IP"); + Matcher matcher = pattern.matcher(request.trim().toLowerCase()); + while (matcher.find()) { + request = matcher.group(1); + } + + String datasetlist = "/datasetlist?"; + String dataset = "/dataset/"; + if (request.contains(datasetlist)) { + searchDataListRequest_count++; + + RequestUrl requestURL = new RequestUrl(); + String infoStr = requestURL.getSearchInfo(request) + ","; + String info = es.customAnalyzing(props.getProperty("indexName"), infoStr); + + if (!info.equals(",")) { + if (keywords.equals("")) { + keywords = keywords + info; + } else { + String[] items = info.split(","); + String[] keywordList = keywords.split(","); + for (int m = 0; m < items.length; m++) { + if (!Arrays.asList(keywordList).contains(items[m])) { + keywords = keywords + items[m] + ","; + } + } + } + } + + } + if (request.startsWith(dataset)) { + searchDataRequest_count++; + if (findDataset(request) != null) { + String view = findDataset(request); + + if ("".equals(views)) { + views = view; + } else { + if (views.contains(view)) { + + } else { + views = views + "," + view; + } + } + } + } + if ("ftp".equals(logType)) { + ftpRequest_count++; + String download = ""; + String requestLowercase = request.toLowerCase(); + if (requestLowercase.endsWith(".jpg") == false && requestLowercase.endsWith(".pdf") == false && requestLowercase.endsWith(".txt") == false && requestLowercase.endsWith(".gif") == false) { + download = request; + } + + if ("".equals(downloads)) { + downloads = download; + } else { + if (downloads.contains(download)) { + + } else { + downloads = downloads + "," + download; + } + } + } + + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + // Break condition: No hits are returned + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + if (!keywords.equals("")) { + keywords_num = keywords.split(",").length; + } + + if (searchDataListRequest_count != 0 && searchDataListRequest_count <= Integer.parseInt(props.getProperty("searchf")) && searchDataRequest_count != 0 && searchDataRequest_count <= Integer + .parseInt(props.getProperty("viewf")) && ftpRequest_count <= Integer.parseInt(props.getProperty("downloadf"))) { + String sessionURL = props.getProperty("SessionPort") + props.getProperty("SessionUrl") + "?sessionid=" + sessionId + "&sessionType=" + outputType + "&requestType=" + inputType; + session_count = 1; + + IndexRequest ir = new IndexRequest(logIndex, outputType).source( + jsonBuilder().startObject().field("SessionID", sessionId).field("SessionURL", sessionURL).field("Duration", duration).field("Number of Keywords", keywords_num).field("Time", min) + .field("End_time", max).field("searchDataListRequest_count", searchDataListRequest_count).field("searchDataListRequest_byKeywords_count", searchDataListRequest_byKeywords_count) + .field("searchDataRequest_count", searchDataRequest_count).field("keywords", es.customAnalyzing(logIndex, keywords)).field("views", views).field("downloads", downloads) + .field("request_rate", request_rate).field("Comments", "").field("Validation", 0).field("Produceby", 0).field("Correlation", 0).field("IP", IP).endObject()); + + es.getBulkProcessor().add(ir); + } + + return session_count; + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java new file mode 100644 index 0000000..5a3a1c3 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes Preprocessing for all functionality required by the + * {@link gov.nasa.jpl.mudrod.discoveryengine.WeblogDiscoveryEngine} + */ +package gov.nasa.jpl.mudrod.weblog.pre; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java new file mode 100644 index 0000000..2b8f45f --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.process; + +import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.semantics.SVDAnalyzer; +import gov.nasa.jpl.mudrod.ssearch.ClickstreamImporter; +import gov.nasa.jpl.mudrod.utils.LinkageTriple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; +import java.util.Properties; + +/** + * Supports ability to calculate term similarity based on click stream + */ +public class ClickStreamAnalyzer extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ClickStreamAnalyzer.class); + + public ClickStreamAnalyzer(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of executing click stream analyzer + */ + @Override + public Object execute() { + LOG.info("Starting ClickStreamAnalyzer..."); + startTime = System.currentTimeMillis(); + try { + String clickstream_matrixFile = props.getProperty("clickstreamMatrix"); + File f = new File(clickstream_matrixFile); + if (f.exists()) { + SVDAnalyzer svd = new SVDAnalyzer(props, es, spark); + svd.getSVDMatrix(props.getProperty("clickstreamMatrix"), Integer.parseInt(props.getProperty("clickstreamSVDDimension")), props.getProperty("clickstreamSVDMatrix_tmp")); + List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty("clickstreamSVDMatrix_tmp")); + svd.saveToES(tripleList, props.getProperty("indexName"), props.getProperty("clickStreamLinkageType")); + + // Store click stream in ES for the ranking use + ClickstreamImporter cs = new ClickstreamImporter(props, es, spark); + cs.importfromCSVtoES(); + } + } catch (Exception e) { + LOG.error("Encountered an error during execution of ClickStreamAnalyzer.", e); + } + + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("ClickStreamAnalyzer complete. Time elapsed: {}s", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/UserHistoryAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/UserHistoryAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/UserHistoryAnalyzer.java new file mode 100644 index 0000000..248756b --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/UserHistoryAnalyzer.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.process; + +import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; +import gov.nasa.jpl.mudrod.semantics.SemanticAnalyzer; +import gov.nasa.jpl.mudrod.utils.LinkageTriple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Properties; + +/** + * Supports ability to calculate term similarity based on user history + */ +public class UserHistoryAnalyzer extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(UserHistoryAnalyzer.class); + + public UserHistoryAnalyzer(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of executing user history analyzer + */ + @Override + public Object execute() { + LOG.info("Starting UserHistoryAnalyzer..."); + startTime = System.currentTimeMillis(); + + SemanticAnalyzer sa = new SemanticAnalyzer(props, es, spark); + List<LinkageTriple> tripleList = sa.calTermSimfromMatrix(props.getProperty("userHistoryMatrix")); + sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.USE_HISTORY_LINKAGE_TYPE)); + + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("UserHistoryAnalyzer complete. Time elapsed: {}s", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/package-info.java new file mode 100644 index 0000000..e96fd3c --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/package-info.java @@ -0,0 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes web log processing classes. + */ +package gov.nasa.jpl.mudrod.weblog.process; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ApacheAccessLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ApacheAccessLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ApacheAccessLog.java new file mode 100644 index 0000000..1051384 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ApacheAccessLog.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.structure; + +import com.google.gson.Gson; +import gov.nasa.jpl.mudrod.weblog.pre.CrawlerDetection; + +import java.io.IOException; +import java.io.Serializable; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This class represents an Apache access log line. See + * http://httpd.apache.org/docs/2.2/logs.html for more details. + */ +public class ApacheAccessLog extends WebLog implements Serializable { + + // double Bytes; + String Response; + String Referer; + String Browser; + + @Override + public double getBytes() { + return this.Bytes; + } + + public String getBrowser() { + return this.Browser; + } + + public String getResponse() { + return this.Response; + } + + public String getReferer() { + return this.Referer; + } + + public ApacheAccessLog() { + + } + + public static String parseFromLogLine(String log) throws IOException, ParseException { + + String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" \"([^\"]+)\""; + final int NUM_FIELDS = 9; + Pattern p = Pattern.compile(logEntryPattern); + Matcher matcher; + + String lineJson = "{}"; + matcher = p.matcher(log); + if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) { + return lineJson; + } + + String time = matcher.group(4); + time = SwithtoNum(time); + SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); + Date date = formatter.parse(time); + + String bytes = matcher.group(7); + + if (bytes.equals("-")) { + bytes = "0"; + } + + String request = matcher.group(5).toLowerCase(); + String agent = matcher.group(9); + CrawlerDetection crawlerDe = new CrawlerDetection(); + if (crawlerDe.checkKnownCrawler(agent)) { + return lineJson; + } else { + + boolean tag = false; + String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" }; + for (int i = 0; i < mimeTypes.length; i++) { + if (request.contains(mimeTypes[i])) { + tag = true; + return lineJson; + } + } + + if (tag == false) { + ApacheAccessLog accesslog = new ApacheAccessLog(); + accesslog.LogType = "PO.DAAC"; + accesslog.IP = matcher.group(1); + accesslog.Request = matcher.group(5); + accesslog.Response = matcher.group(6); + accesslog.Bytes = Double.parseDouble(bytes); + accesslog.Referer = matcher.group(8); + accesslog.Browser = matcher.group(9); + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'"); + accesslog.Time = df.format(date); + + Gson gson = new Gson(); + lineJson = gson.toJson(accesslog); + + return lineJson; + } + } + + lineJson = "{}"; + return lineJson; + } + + public static boolean checknull(WebLog s) { + if (s == null) { + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ClickStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ClickStream.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ClickStream.java new file mode 100644 index 0000000..76e8d7a --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ClickStream.java @@ -0,0 +1,188 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.structure; + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.io.Serializable; + +/** + * ClassName: ClickStream Function: user click stream data related operations. + */ +public class ClickStream implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + // keywords: query words related to the click behaviour + private String keywords; + // viewDataset: the dataset name user viewed + private String viewDataset; + // downloadDataset: the dataset name user downloaded + private String downloadDataset; + // sessionID: session ID + private String sessionID; + // type: session type name + private String type; + + /** + * Creates a new instance of ClickStream. + * + * @param keywords the query user searched + * @param viewDataset the dataset name user viewed + * @param download: if user download the data set after viewing it, this parameter is + * true, otherwise, it is false. + */ + public ClickStream(String keywords, String viewDataset, boolean download) { + this.keywords = keywords; + this.viewDataset = viewDataset; + this.downloadDataset = ""; + if (download) { + this.downloadDataset = viewDataset; + } + } + + public ClickStream() { + //default constructor + } + + public String getSessionID() { + return sessionID; + } + + /** + * setKeyWords: Set the query user searched. + * + * @param query search words + */ + public void setKeyWords(String query) { + this.keywords = query; + } + + /** + * setViewDataset:Set the data set name user viewed + * + * @param dataset short name of data set + */ + public void setViewDataset(String dataset) { + this.viewDataset = dataset; + } + + /** + * setDownloadDataset: Set the data set name user downloaded + * + * @param dataset short name of data set + */ + public void setDownloadDataset(String dataset) { + this.downloadDataset = dataset; + } + + /** + * getKeyWords: Get the query user searched + * + * @return data set name + */ + public String getKeyWords() { + return this.keywords; + } + + /** + * getViewDataset: Get the data set user viewed + * + * @return data set name + */ + public String getViewDataset() { + return this.viewDataset; + } + + /** + * isDownload: Show whether the data is downloaded in the session. + * + * @return True or False + */ + public Boolean isDownload() { + if ("".equals(this.downloadDataset)) { + return false; + } + return true; + } + + /** + * setSessionId: Set ID of session + * + * @param sessionID session id + */ + public void setSessionId(String sessionID) { + this.sessionID = sessionID; + } + + /** + * setType: Set session type name + * + * @param type session type name in elasticsearch + */ + public void setType(String type) { + this.type = type; + } + + /** + * Output click stream info in string format + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "Query: " + keywords + " || View Dataset: " + viewDataset + " || Download Dataset: " + downloadDataset; + } + + /** + * toJson: Output click stream info in Json format + * + * @return session in string format + */ + public String toJson() { + String jsonQuery = "{"; + jsonQuery += "\"query\":\"" + this.keywords + "\","; + jsonQuery += "\"viewdataset\":\"" + this.viewDataset + "\","; + jsonQuery += "\"downloaddataset\":\"" + this.downloadDataset + "\","; + jsonQuery += "\"sessionId\":\"" + this.sessionID + "\","; + jsonQuery += "\"type\":\"" + this.type + "\""; + jsonQuery += "},"; + return jsonQuery; + } + + /** + * parseFromTextLine: Convert string to click stream data + * + * @param logline http log line + * @return {@link ClickStream} + */ + public static ClickStream parseFromTextLine(String logline) { + JSONObject jsonData = null; + ClickStream data = null; + try { + jsonData = new JSONObject(logline); + data = new ClickStream(); + data.setKeyWords(jsonData.getString("query")); + data.setViewDataset(jsonData.getString("viewdataset")); + data.setDownloadDataset(jsonData.getString("downloaddataset")); + + } catch (JSONException e) { + e.printStackTrace(); + } + + return data; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Coordinates.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Coordinates.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Coordinates.java new file mode 100644 index 0000000..f416eb4 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Coordinates.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.structure; + +public class Coordinates { + /* + * public String lat; public String lon; + */ + public String latlon; +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/FtpLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/FtpLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/FtpLog.java new file mode 100644 index 0000000..5ddc717 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/FtpLog.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.structure; + +import com.google.gson.Gson; +import gov.nasa.jpl.mudrod.weblog.pre.ImportLogFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * This class represents an FTP access log line. + */ +public class FtpLog extends WebLog implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(ImportLogFile.class); + + public static String parseFromLogLine(String log) { + + try { + String ip = log.split(" +")[6]; + + String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + log.split(" +")[3] + ":" + log.split(" +")[4]; + + time = SwithtoNum(time); + SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy"); + Date date = formatter.parse(time); + + String bytes = log.split(" +")[7]; + + String request = log.split(" +")[8].toLowerCase(); + + if (!request.contains("/misc/") && !request.contains("readme")) { + FtpLog ftplog = new FtpLog(); + ftplog.LogType = "ftp"; + ftplog.IP = ip; + ftplog.Request = request; + ftplog.Bytes = Double.parseDouble(bytes); + + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'"); + ftplog.Time = df.format(date); + + return new Gson().toJson(ftplog); + } + } catch (Exception e) { + LOG.warn("Error parsing ftp log line [{}]. Skipping this line.", log, e); + } + return "{}"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/GeoIp.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/GeoIp.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/GeoIp.java new file mode 100644 index 0000000..778224b --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/GeoIp.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.weblog.structure; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import gov.nasa.jpl.mudrod.utils.HttpRequest; + +/** + * ClassName: GeoIp Function: convert IP to geo location + */ +public class GeoIp { + + /** + * toLocation: convert ip to location + * + * @param ip ip address + * @return coordinates + */ + public Coordinates toLocation(String ip) { + String url = "http://getcitydetails.geobytes.com/GetCityDetails?fqcn=" + ip; + HttpRequest http = new HttpRequest(); + String response = http.getRequest(url); + JsonParser parser = new JsonParser(); + JsonElement jobSon = parser.parse(response); + JsonObject responseObject = jobSon.getAsJsonObject(); + + Coordinates co = new Coordinates(); + String lon = responseObject.get("geobyteslongitude").toString().replace("\"", ""); + String lat = responseObject.get("geobyteslatitude").toString().replace("\"", ""); + co.latlon = lat + "," + lon; + return co; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RankingTrainData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RankingTrainData.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RankingTrainData.java new file mode 100644 index 0000000..7ea17c0 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RankingTrainData.java @@ -0,0 +1,147 @@ +package gov.nasa.jpl.mudrod.weblog.structure; + +import java.io.Serializable; +import java.util.Map; + +/** + * ClassName: train data extracted from web logs for training ranking weightss. + */ +public class RankingTrainData implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + // sessionID: session ID + private String sessionID; + // type: session type name + private String index; + // query: query words related to the click + private String query; + // datasetA + private String highRankDataset; + // datasetB + private String lowRankDataset; + + private Map<String, String> filter; + + /** + * Creates a new instance of ClickStream. + * + * @param query the user query string + * @param highRankDataset the dataset name for the highest ranked dataset + * @param lowRankDataset the dataset name for the lowest ranked dataset + */ + public RankingTrainData(String query, String highRankDataset, String lowRankDataset) { + this.query = query; + this.highRankDataset = highRankDataset; + this.lowRankDataset = lowRankDataset; + } + + public RankingTrainData() { + //default constructor + } + + public String getSessionID() { + return sessionID; + } + + /** + * setKeyWords: Set the query user searched. + * + * @param query search words + */ + public void setQuery(String query) { + this.query = query; + } + + /** + * getKeyWords: Get the query user searched + * + * @return data set name + */ + public String getQuery() { + return this.query; + } + + /** + * setViewDataset:Set the data set name user viewed + * + * @param dataset short name of data set + */ + public void setHighRankDataset(String dataset) { + this.highRankDataset = dataset; + } + + /** + * setDownloadDataset: Set the data set name user downloaded + * + * @param dataset short name of data set + */ + public void setLowRankDataset(String dataset) { + this.lowRankDataset = dataset; + } + + /** + * getViewDataset: Get the data set user viewed + * + * @return data set name + */ + public String getLowRankDataset() { + return this.lowRankDataset; + } + + /** + * setSessionId: Set ID of session + * + * @param sessionID session id + */ + public void setSessionId(String sessionID) { + this.sessionID = sessionID; + } + + /** + * setType: Set session type name + * + * @param index session type name in elasticsearch + */ + public void setIndex(String index) { + this.index = index; + } + + public void setFilter(Map<String, String> filter) { + this.filter = filter; + } + + /** + * Output click stream info in string format + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "query:" + query + "|| highRankDataset:" + highRankDataset + "|| lowRankDataset:" + lowRankDataset; + } + + /** + * toJson: Output click stream info in Json format + * + * @return session in string format + */ + public String toJson() { + String jsonQuery = "{"; + jsonQuery += "\"query\":\"" + this.query + "\","; + jsonQuery += "\"highRankDataset\":\"" + this.highRankDataset + "\","; + jsonQuery += "\"lowRankDataset\":\"" + this.lowRankDataset + "\","; + + if (this.filter != null) { + for (String key : filter.keySet()) { + jsonQuery += "\"" + key + "\":\"" + filter.get(key) + "\","; + } + } + + jsonQuery += "\"sessionId\":\"" + this.sessionID + "\","; + jsonQuery += "\"index\":\"" + this.index + "\""; + jsonQuery += "},"; + return jsonQuery; + } +}
