http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RemoveRawLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RemoveRawLog.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RemoveRawLog.java new file mode 100644 index 0000000..22aa09c --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RemoveRawLog.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.elasticsearch.index.query.QueryBuilders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Supports ability to remove raw logs after processing is finished + */ +public class RemoveRawLog extends LogAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(RemoveRawLog.class); + + public RemoveRawLog(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting raw log removal."); + startTime = System.currentTimeMillis(); + es.deleteAllByQuery(logIndex, httpType, QueryBuilders.matchAllQuery()); + es.deleteAllByQuery(logIndex, ftpType, QueryBuilders.matchAllQuery()); + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("Raw log removal complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java new file mode 100644 index 0000000..b1153bf --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java @@ -0,0 +1,452 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.weblog.structure.Session; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.VoidFunction; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; +import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.joda.time.DateTime; +import org.joda.time.Seconds; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to generate user session by time threshold and referrer + */ +public class SessionGenerator extends LogAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SessionGenerator.class); + + public SessionGenerator(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting Session Generation."); + startTime = System.currentTimeMillis(); + generateSession(); + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("Session generating complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + public void generateSession() { + try { + es.createBulkProcessor(); + genSessionByReferer(Integer.parseInt(props.getProperty("timegap"))); + es.destroyBulkProcessor(); + + es.createBulkProcessor(); + combineShortSessions(Integer.parseInt(props.getProperty("timegap"))); + es.destroyBulkProcessor(); + } catch (ElasticsearchException e) { + LOG.error("Error whilst executing bulk processor.", e); + } catch (IOException e) { + LOG.error("Error whilst reading configuration.", e); + } catch (NumberFormatException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void genSessionByReferer(int timeThres) throws InterruptedException, IOException { + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); + if (processingType.equals("sequential")) { + genSessionByRefererInSequential(timeThres); + } else if (processingType.equals("parallel")) { + genSessionByRefererInParallel(timeThres); + } + } + + public void combineShortSessions(int timeThres) throws InterruptedException, IOException { + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); + if (processingType.equals("sequential")) { + combineShortSessionsInSequential(timeThres); + } else if (processingType.equals("parallel")) { + combineShortSessionsInParallel(timeThres); + } + } + + /** + * Method to generate session by time threshold and referrer + * + * @param timeThres value of time threshold (s) + * @throws ElasticsearchException ElasticsearchException + * @throws IOException IOException + */ + public void genSessionByRefererInSequential(int timeThres) throws ElasticsearchException, IOException { + + Terms users = this.getUserTerms(this.cleanupType); + + int sessionCount = 0; + for (Terms.Bucket entry : users.getBuckets()) { + + String user = (String) entry.getKey(); + Integer sessionNum = genSessionByReferer(es, user, timeThres); + sessionCount += sessionNum; + } + + LOG.info("Initial session count: {}", Integer.toString(sessionCount)); + } + + public void combineShortSessionsInSequential(int timeThres) throws ElasticsearchException, IOException { + + Terms users = this.getUserTerms(this.cleanupType); + for (Terms.Bucket entry : users.getBuckets()) { + String user = entry.getKey().toString(); + combineShortSessions(es, user, timeThres); + } + } + + /** + * Method to remove invalid logs through IP address + * + * @param es an instantiated es driver + * @param ip invalid IP address + * @throws ElasticsearchException ElasticsearchException + * @throws IOException IOException + */ + public void deleteInvalid(ESDriver es, String ip) throws IOException { + + BoolQueryBuilder filterAll = new BoolQueryBuilder(); + filterAll.must(QueryBuilders.termQuery("IP", ip)); + + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterAll).setSize(100).execute().actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + update(es, logIndex, cleanupType, hit.getId(), "SessionID", "invalid"); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + } + + /** + * Method to update a Elasticsearch record/document by id, field, and value + * + * @param es + * @param index index name is Elasticsearch + * @param type type name + * @param id ID of the document that needs to be updated + * @param field1 field of the document that needs to be updated + * @param value1 value of the document that needs to be changed to + * @throws ElasticsearchException + * @throws IOException + */ + private void update(ESDriver es, String index, String type, String id, String field1, Object value1) throws IOException { + UpdateRequest ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject()); + es.getBulkProcessor().add(ur); + } + + public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException { + + JavaRDD<String> userRDD = getUserRDD(this.cleanupType); + + int sessionCount = 0; + sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Iterator<Integer> call(Iterator<String> arg0) throws Exception { + ESDriver tmpES = new ESDriver(props); + tmpES.createBulkProcessor(); + List<Integer> sessionNums = new ArrayList<>(); + while (arg0.hasNext()) { + String s = arg0.next(); + Integer sessionNum = genSessionByReferer(tmpES, s, timeThres); + sessionNums.add(sessionNum); + } + tmpES.destroyBulkProcessor(); + tmpES.close(); + return sessionNums.iterator(); + } + }).reduce(new Function2<Integer, Integer, Integer>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Integer call(Integer a, Integer b) { + return a + b; + } + }); + + LOG.info("Initial Session count: {}", Integer.toString(sessionCount)); + } + + public int genSessionByReferer(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException { + + String startTime = null; + int sessionCountIn = 0; + + BoolQueryBuilder filterSearch = new BoolQueryBuilder(); + filterSearch.must(QueryBuilders.termQuery("IP", user)); + + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterSearch).addSort("Time", SortOrder.ASC).setSize(100) + .execute().actionGet(); + + Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>(); + String request = ""; + String referer = ""; + String logType = ""; + String id = ""; + String ip = user; + String indexUrl = "http://podaac.jpl.nasa.gov/"; + DateTime time = null; + DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + + while (scrollResp.getHits().getHits().length != 0) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + request = (String) result.get("RequestUrl"); + referer = (String) result.get("Referer"); + logType = (String) result.get("LogType"); + time = fmt.parseDateTime((String) result.get("Time")); + id = hit.getId(); + + if ("PO.DAAC".equals(logType)) { + if ("-".equals(referer) || referer.equals(indexUrl) || !referer.contains(indexUrl)) { + sessionCountIn++; + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); + + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); + + } else { + int count = sessionCountIn; + int rollbackNum = 0; + while (true) { + Map<String, DateTime> requests = sessionReqs.get(ip + "@" + count); + if (requests == null) { + sessionReqs.put(ip + "@" + count, new HashMap<String, DateTime>()); + sessionReqs.get(ip + "@" + count).put(request, time); + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + count); + + break; + } + ArrayList<String> keys = new ArrayList<>(requests.keySet()); + boolean bFindRefer = false; + + for (int i = keys.size() - 1; i >= 0; i--) { + rollbackNum++; + if (keys.get(i).equalsIgnoreCase(referer)) { + bFindRefer = true; + // threshold,if time interval > 10* + // click num, start a new session + if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(i)), time).getSeconds()) < timeThres * rollbackNum) { + sessionReqs.get(ip + "@" + count).put(request, time); + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + count); + } else { + sessionCountIn++; + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); + } + + break; + } + } + + if (bFindRefer) { + break; + } + + count--; + if (count < 0) { + sessionCountIn++; + + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); + update(es, props.getProperty(MudrodConstants.ES_INDEX_NAME), this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); + + break; + } + } + } + } else if ("ftp".equals(logType)) { + + // may affect computation efficiency + Map<String, DateTime> requests = sessionReqs.get(ip + "@" + sessionCountIn); + if (requests == null) { + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + } else { + ArrayList<String> keys = new ArrayList<>(requests.keySet()); + int size = keys.size(); + if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(size - 1)), time).getSeconds()) > timeThres) { + sessionCountIn += 1; + sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>()); + } + } + sessionReqs.get(ip + "@" + sessionCountIn).put(request, time); + update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn); + } + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + } + + return sessionCountIn; + } + + public void combineShortSessionsInParallel(int timeThres) throws InterruptedException, IOException { + + JavaRDD<String> userRDD = getUserRDD(this.cleanupType); + + userRDD.foreachPartition(new VoidFunction<Iterator<String>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public void call(Iterator<String> arg0) throws Exception { + ESDriver tmpES = new ESDriver(props); + tmpES.createBulkProcessor(); + while (arg0.hasNext()) { + String s = arg0.next(); + combineShortSessions(tmpES, s, timeThres); + } + tmpES.destroyBulkProcessor(); + tmpES.close(); + } + }); + } + + public void combineShortSessions(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException { + + BoolQueryBuilder filterSearch = new BoolQueryBuilder(); + filterSearch.must(QueryBuilders.termQuery("IP", user)); + + String[] indexArr = new String[] { logIndex }; + String[] typeArr = new String[] { cleanupType }; + int docCount = es.getDocCount(indexArr, typeArr, filterSearch); + + if (docCount < 3) { + deleteInvalid(es, user); + return; + } + + BoolQueryBuilder filterCheck = new BoolQueryBuilder(); + filterCheck.must(QueryBuilders.termQuery("IP", user)).must(QueryBuilders.termQuery("Referer", "-")); + SearchResponse checkReferer = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterCheck).setSize(0).execute().actionGet(); + + long numInvalid = checkReferer.getHits().getTotalHits(); + double invalidRate = numInvalid / docCount; + + if (invalidRate >= 0.8) { + deleteInvalid(es, user); + return; + } + + StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time"); + SearchResponse srSession = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterSearch) + .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount).subAggregation(statsAgg)).execute().actionGet(); + + Terms sessions = srSession.getAggregations().get("Sessions"); + + List<Session> sessionList = new ArrayList<>(); + for (Terms.Bucket session : sessions.getBuckets()) { + Stats agg = session.getAggregations().get("Stats"); + Session sess = new Session(props, es, agg.getMinAsString(), agg.getMaxAsString(), session.getKey().toString()); + sessionList.add(sess); + } + + Collections.sort(sessionList); + DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + String last = null; + String lastnewID = null; + String lastoldID = null; + String current = null; + for (Session s : sessionList) { + current = s.getEndTime(); + if (last != null) { + if (Seconds.secondsBetween(fmt.parseDateTime(last), fmt.parseDateTime(current)).getSeconds() < timeThres) { + if (lastnewID == null) { + s.setNewID(lastoldID); + } else { + s.setNewID(lastnewID); + } + + QueryBuilder fs = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", s.getID())); + + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(fs).setSize(100).execute().actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + if (lastnewID == null) { + update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastoldID); + } else { + update(es, logIndex, this.cleanupType, hit.getId(), "SessionID", lastnewID); + } + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + } + ; + } + lastoldID = s.getID(); + lastnewID = s.getNewID(); + last = current; + } + + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java new file mode 100644 index 0000000..6f5c5f7 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java @@ -0,0 +1,312 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.weblog.structure.RequestUrl; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; +import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder; +import org.joda.time.DateTime; +import org.joda.time.Seconds; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to post-process session, including summarizing statistics + * and filtering + */ +public class SessionStatistic extends LogAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SessionStatistic.class); + + public SessionStatistic(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting Session Summarization."); + startTime = System.currentTimeMillis(); + try { + processSession(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("Session Summarization complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + public void processSession() throws InterruptedException, IOException, ExecutionException { + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); + if (processingType.equals("sequential")) { + processSessionInSequential(); + } else if (processingType.equals("parallel")) { + processSessionInParallel(); + } + } + + public void processSessionInSequential() throws IOException, InterruptedException, ExecutionException { + es.createBulkProcessor(); + Terms Sessions = this.getSessionTerms(); + int session_count = 0; + for (Terms.Bucket entry : Sessions.getBuckets()) { + if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) { + String sessionid = entry.getKey().toString(); + int sessionNum = processSession(es, sessionid); + session_count += sessionNum; + } + } + LOG.info("Final Session count: {}", Integer.toString(session_count)); + es.destroyBulkProcessor(); + } + + /** + * Extract the dataset ID from a long request + * + * @param request raw log request + * @return dataset ID + */ + public String findDataset(String request) { + String pattern1 = "/dataset/"; + String pattern2; + if (request.contains("?")) { + pattern2 = "?"; + } else { + pattern2 = " "; + } + + Pattern p = Pattern.compile(Pattern.quote(pattern1) + "(.*?)" + Pattern.quote(pattern2)); + Matcher m = p.matcher(request); + if (m.find()) { + return m.group(1); + } + return null; + } + + public void processSessionInParallel() throws InterruptedException, IOException { + + List<String> sessions = this.getSessions(); + JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition); + + int sessionCount = 0; + sessionCount = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { + @Override + public Iterator<Integer> call(Iterator<String> arg0) throws Exception { + ESDriver tmpES = new ESDriver(props); + tmpES.createBulkProcessor(); + List<Integer> sessionNums = new ArrayList<Integer>(); + sessionNums.add(0); + while (arg0.hasNext()) { + String s = arg0.next(); + Integer sessionNum = processSession(tmpES, s); + sessionNums.add(sessionNum); + } + tmpES.destroyBulkProcessor(); + tmpES.close(); + return sessionNums.iterator(); + } + }).reduce(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer a, Integer b) { + return a + b; + } + }); + + LOG.info("Final Session count: {}", Integer.toString(sessionCount)); + } + + public int processSession(ESDriver es, String sessionId) throws IOException, InterruptedException, ExecutionException { + + String inputType = cleanupType; + String outputType = sessionStats; + + DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + String min = null; + String max = null; + DateTime start = null; + DateTime end = null; + int duration = 0; + float request_rate = 0; + + int session_count = 0; + Pattern pattern = Pattern.compile("get (.*?) http/*"); + + StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time"); + + BoolQueryBuilder filter_search = new BoolQueryBuilder(); + filter_search.must(QueryBuilders.termQuery("SessionID", sessionId)); + + SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(inputType).setQuery(filter_search).addAggregation(statsAgg).execute().actionGet(); + + Stats agg = sr.getAggregations().get("Stats"); + min = agg.getMinAsString(); + max = agg.getMaxAsString(); + start = fmt.parseDateTime(min); + end = fmt.parseDateTime(max); + + duration = Seconds.secondsBetween(start, end).getSeconds(); + + int searchDataListRequest_count = 0; + int searchDataRequest_count = 0; + int searchDataListRequest_byKeywords_count = 0; + int ftpRequest_count = 0; + int keywords_num = 0; + + String IP = null; + String keywords = ""; + String views = ""; + String downloads = ""; + + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(inputType).setScroll(new TimeValue(60000)).setQuery(filter_search).setSize(100).execute().actionGet(); + + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + + String request = (String) result.get("Request"); + String logType = (String) result.get("LogType"); + IP = (String) result.get("IP"); + Matcher matcher = pattern.matcher(request.trim().toLowerCase()); + while (matcher.find()) { + request = matcher.group(1); + } + + String datasetlist = "/datasetlist?"; + String dataset = "/dataset/"; + if (request.contains(datasetlist)) { + searchDataListRequest_count++; + + RequestUrl requestURL = new RequestUrl(); + String infoStr = requestURL.getSearchInfo(request) + ","; + String info = es.customAnalyzing(props.getProperty("indexName"), infoStr); + + if (!info.equals(",")) { + if (keywords.equals("")) { + keywords = keywords + info; + } else { + String[] items = info.split(","); + String[] keywordList = keywords.split(","); + for (int m = 0; m < items.length; m++) { + if (!Arrays.asList(keywordList).contains(items[m])) { + keywords = keywords + items[m] + ","; + } + } + } + } + + } + if (request.startsWith(dataset)) { + searchDataRequest_count++; + if (findDataset(request) != null) { + String view = findDataset(request); + + if ("".equals(views)) { + views = view; + } else { + if (views.contains(view)) { + + } else { + views = views + "," + view; + } + } + } + } + if ("ftp".equals(logType)) { + ftpRequest_count++; + String download = ""; + String requestLowercase = request.toLowerCase(); + if (requestLowercase.endsWith(".jpg") == false && requestLowercase.endsWith(".pdf") == false && requestLowercase.endsWith(".txt") == false && requestLowercase.endsWith(".gif") == false) { + download = request; + } + + if ("".equals(downloads)) { + downloads = download; + } else { + if (downloads.contains(download)) { + + } else { + downloads = downloads + "," + download; + } + } + } + + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + // Break condition: No hits are returned + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + if (!keywords.equals("")) { + keywords_num = keywords.split(",").length; + } + + if (searchDataListRequest_count != 0 && searchDataListRequest_count <= Integer.parseInt(props.getProperty("searchf")) && searchDataRequest_count != 0 && searchDataRequest_count <= Integer + .parseInt(props.getProperty("viewf")) && ftpRequest_count <= Integer.parseInt(props.getProperty("downloadf"))) { + String sessionURL = props.getProperty("SessionPort") + props.getProperty("SessionUrl") + "?sessionid=" + sessionId + "&sessionType=" + outputType + "&requestType=" + inputType; + session_count = 1; + + IndexRequest ir = new IndexRequest(logIndex, outputType).source( + jsonBuilder().startObject().field("SessionID", sessionId).field("SessionURL", sessionURL).field("Duration", duration).field("Number of Keywords", keywords_num).field("Time", min) + .field("End_time", max).field("searchDataListRequest_count", searchDataListRequest_count).field("searchDataListRequest_byKeywords_count", searchDataListRequest_byKeywords_count) + .field("searchDataRequest_count", searchDataRequest_count).field("keywords", es.customAnalyzing(logIndex, keywords)).field("views", views).field("downloads", downloads) + .field("request_rate", request_rate).field("Comments", "").field("Validation", 0).field("Produceby", 0).field("Correlation", 0).field("IP", IP).endObject()); + + es.getBulkProcessor().add(ir); + } + + return session_count; + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/package-info.java new file mode 100644 index 0000000..5bf7f27 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes Preprocessing for all functionality required by the + * {@link org.apache.sdap.mudrod.discoveryengine.WeblogDiscoveryEngine} + */ +package org.apache.sdap.mudrod.weblog.pre; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java new file mode 100644 index 0000000..68fad4d --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.process; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.semantics.SVDAnalyzer; +import org.apache.sdap.mudrod.ssearch.ClickstreamImporter; +import org.apache.sdap.mudrod.utils.LinkageTriple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; +import java.util.Properties; + +/** + * Supports ability to calculate term similarity based on click stream + */ +public class ClickStreamAnalyzer extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ClickStreamAnalyzer.class); + + public ClickStreamAnalyzer(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of executing click stream analyzer + */ + @Override + public Object execute() { + LOG.info("Starting ClickStreamAnalyzer..."); + startTime = System.currentTimeMillis(); + try { + String clickstream_matrixFile = props.getProperty("clickstreamMatrix"); + File f = new File(clickstream_matrixFile); + if (f.exists()) { + SVDAnalyzer svd = new SVDAnalyzer(props, es, spark); + svd.getSVDMatrix(props.getProperty("clickstreamMatrix"), Integer.parseInt(props.getProperty("clickstreamSVDDimension")), props.getProperty("clickstreamSVDMatrix_tmp")); + List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty("clickstreamSVDMatrix_tmp")); + svd.saveToES(tripleList, props.getProperty("indexName"), props.getProperty("clickStreamLinkageType")); + + // Store click stream in ES for the ranking use + ClickstreamImporter cs = new ClickstreamImporter(props, es, spark); + cs.importfromCSVtoES(); + } + } catch (Exception e) { + LOG.error("Encountered an error during execution of ClickStreamAnalyzer.", e); + } + + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("ClickStreamAnalyzer complete. Time elapsed: {}s", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java new file mode 100644 index 0000000..d95475c --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.process; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.semantics.SemanticAnalyzer; +import org.apache.sdap.mudrod.utils.LinkageTriple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Properties; + +/** + * Supports ability to calculate term similarity based on user history + */ +public class UserHistoryAnalyzer extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(UserHistoryAnalyzer.class); + + public UserHistoryAnalyzer(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of executing user history analyzer + */ + @Override + public Object execute() { + LOG.info("Starting UserHistoryAnalyzer..."); + startTime = System.currentTimeMillis(); + + SemanticAnalyzer sa = new SemanticAnalyzer(props, es, spark); + List<LinkageTriple> tripleList = sa.calTermSimfromMatrix(props.getProperty("userHistoryMatrix")); + sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.USE_HISTORY_LINKAGE_TYPE)); + + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("UserHistoryAnalyzer complete. Time elapsed: {}s", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/process/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/package-info.java new file mode 100644 index 0000000..a6b55f4 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/package-info.java @@ -0,0 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes web log processing classes. + */ +package org.apache.sdap.mudrod.weblog.process; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java new file mode 100644 index 0000000..0127e2d --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java @@ -0,0 +1,130 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.structure; + +import com.google.gson.Gson; + +import java.io.IOException; +import java.io.Serializable; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.sdap.mudrod.weblog.pre.CrawlerDetection; + +/** + * This class represents an Apache access log line. See + * http://httpd.apache.org/docs/2.2/logs.html for more details. + */ +public class ApacheAccessLog extends WebLog implements Serializable { + + // double Bytes; + String Response; + String Referer; + String Browser; + + @Override + public double getBytes() { + return this.Bytes; + } + + public String getBrowser() { + return this.Browser; + } + + public String getResponse() { + return this.Response; + } + + public String getReferer() { + return this.Referer; + } + + public ApacheAccessLog() { + + } + + public static String parseFromLogLine(String log) throws IOException, ParseException { + + String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" \"([^\"]+)\""; + final int NUM_FIELDS = 9; + Pattern p = Pattern.compile(logEntryPattern); + Matcher matcher; + + String lineJson = "{}"; + matcher = p.matcher(log); + if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) { + return lineJson; + } + + String time = matcher.group(4); + time = SwithtoNum(time); + SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); + Date date = formatter.parse(time); + + String bytes = matcher.group(7); + + if (bytes.equals("-")) { + bytes = "0"; + } + + String request = matcher.group(5).toLowerCase(); + String agent = matcher.group(9); + CrawlerDetection crawlerDe = new CrawlerDetection(); + if (crawlerDe.checkKnownCrawler(agent)) { + return lineJson; + } else { + + boolean tag = false; + String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" }; + for (int i = 0; i < mimeTypes.length; i++) { + if (request.contains(mimeTypes[i])) { + tag = true; + return lineJson; + } + } + + if (tag == false) { + ApacheAccessLog accesslog = new ApacheAccessLog(); + accesslog.LogType = "PO.DAAC"; + accesslog.IP = matcher.group(1); + accesslog.Request = matcher.group(5); + accesslog.Response = matcher.group(6); + accesslog.Bytes = Double.parseDouble(bytes); + accesslog.Referer = matcher.group(8); + accesslog.Browser = matcher.group(9); + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'"); + accesslog.Time = df.format(date); + + Gson gson = new Gson(); + lineJson = gson.toJson(accesslog); + + return lineJson; + } + } + + lineJson = "{}"; + return lineJson; + } + + public static boolean checknull(WebLog s) { + if (s == null) { + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ClickStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ClickStream.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ClickStream.java new file mode 100644 index 0000000..2f0c34d --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ClickStream.java @@ -0,0 +1,188 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.structure; + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.io.Serializable; + +/** + * ClassName: ClickStream Function: user click stream data related operations. + */ +public class ClickStream implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + // keywords: query words related to the click behaviour + private String keywords; + // viewDataset: the dataset name user viewed + private String viewDataset; + // downloadDataset: the dataset name user downloaded + private String downloadDataset; + // sessionID: session ID + private String sessionID; + // type: session type name + private String type; + + /** + * Creates a new instance of ClickStream. + * + * @param keywords the query user searched + * @param viewDataset the dataset name user viewed + * @param download: if user download the data set after viewing it, this parameter is + * true, otherwise, it is false. + */ + public ClickStream(String keywords, String viewDataset, boolean download) { + this.keywords = keywords; + this.viewDataset = viewDataset; + this.downloadDataset = ""; + if (download) { + this.downloadDataset = viewDataset; + } + } + + public ClickStream() { + //default constructor + } + + public String getSessionID() { + return sessionID; + } + + /** + * setKeyWords: Set the query user searched. + * + * @param query search words + */ + public void setKeyWords(String query) { + this.keywords = query; + } + + /** + * setViewDataset:Set the data set name user viewed + * + * @param dataset short name of data set + */ + public void setViewDataset(String dataset) { + this.viewDataset = dataset; + } + + /** + * setDownloadDataset: Set the data set name user downloaded + * + * @param dataset short name of data set + */ + public void setDownloadDataset(String dataset) { + this.downloadDataset = dataset; + } + + /** + * getKeyWords: Get the query user searched + * + * @return data set name + */ + public String getKeyWords() { + return this.keywords; + } + + /** + * getViewDataset: Get the data set user viewed + * + * @return data set name + */ + public String getViewDataset() { + return this.viewDataset; + } + + /** + * isDownload: Show whether the data is downloaded in the session. + * + * @return True or False + */ + public Boolean isDownload() { + if ("".equals(this.downloadDataset)) { + return false; + } + return true; + } + + /** + * setSessionId: Set ID of session + * + * @param sessionID session id + */ + public void setSessionId(String sessionID) { + this.sessionID = sessionID; + } + + /** + * setType: Set session type name + * + * @param type session type name in elasticsearch + */ + public void setType(String type) { + this.type = type; + } + + /** + * Output click stream info in string format + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "Query: " + keywords + " || View Dataset: " + viewDataset + " || Download Dataset: " + downloadDataset; + } + + /** + * toJson: Output click stream info in Json format + * + * @return session in string format + */ + public String toJson() { + String jsonQuery = "{"; + jsonQuery += "\"query\":\"" + this.keywords + "\","; + jsonQuery += "\"viewdataset\":\"" + this.viewDataset + "\","; + jsonQuery += "\"downloaddataset\":\"" + this.downloadDataset + "\","; + jsonQuery += "\"sessionId\":\"" + this.sessionID + "\","; + jsonQuery += "\"type\":\"" + this.type + "\""; + jsonQuery += "},"; + return jsonQuery; + } + + /** + * parseFromTextLine: Convert string to click stream data + * + * @param logline http log line + * @return {@link ClickStream} + */ + public static ClickStream parseFromTextLine(String logline) { + JSONObject jsonData = null; + ClickStream data = null; + try { + jsonData = new JSONObject(logline); + data = new ClickStream(); + data.setKeyWords(jsonData.getString("query")); + data.setViewDataset(jsonData.getString("viewdataset")); + data.setDownloadDataset(jsonData.getString("downloaddataset")); + + } catch (JSONException e) { + e.printStackTrace(); + } + + return data; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Coordinates.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Coordinates.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Coordinates.java new file mode 100644 index 0000000..5e6fd07 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Coordinates.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.structure; + +public class Coordinates { + /* + * public String lat; public String lon; + */ + public String latlon; +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java new file mode 100644 index 0000000..488fe52 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.structure; + +import com.google.gson.Gson; + +import org.apache.sdap.mudrod.weblog.pre.ImportLogFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * This class represents an FTP access log line. + */ +public class FtpLog extends WebLog implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(ImportLogFile.class); + + public static String parseFromLogLine(String log) { + + try { + String ip = log.split(" +")[6]; + + String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + log.split(" +")[3] + ":" + log.split(" +")[4]; + + time = SwithtoNum(time); + SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy"); + Date date = formatter.parse(time); + + String bytes = log.split(" +")[7]; + + String request = log.split(" +")[8].toLowerCase(); + + if (!request.contains("/misc/") && !request.contains("readme")) { + FtpLog ftplog = new FtpLog(); + ftplog.LogType = "ftp"; + ftplog.IP = ip; + ftplog.Request = request; + ftplog.Bytes = Double.parseDouble(bytes); + + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'"); + ftplog.Time = df.format(date); + + return new Gson().toJson(ftplog); + } + } catch (Exception e) { + LOG.warn("Error parsing ftp log line [{}]. Skipping this line.", log, e); + } + return "{}"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/GeoIp.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/GeoIp.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/GeoIp.java new file mode 100644 index 0000000..d3e94dc --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/GeoIp.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.structure; + +import org.apache.sdap.mudrod.utils.HttpRequest; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +/** + * ClassName: GeoIp Function: convert IP to geo location + */ +public class GeoIp { + + /** + * toLocation: convert ip to location + * + * @param ip ip address + * @return coordinates + */ + public Coordinates toLocation(String ip) { + String url = "http://getcitydetails.geobytes.com/GetCityDetails?fqcn=" + ip; + HttpRequest http = new HttpRequest(); + String response = http.getRequest(url); + JsonParser parser = new JsonParser(); + JsonElement jobSon = parser.parse(response); + JsonObject responseObject = jobSon.getAsJsonObject(); + + Coordinates co = new Coordinates(); + String lon = responseObject.get("geobyteslongitude").toString().replace("\"", ""); + String lat = responseObject.get("geobyteslatitude").toString().replace("\"", ""); + co.latlon = lat + "," + lon; + return co; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RankingTrainData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RankingTrainData.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RankingTrainData.java new file mode 100644 index 0000000..cf4ec23 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RankingTrainData.java @@ -0,0 +1,147 @@ +package org.apache.sdap.mudrod.weblog.structure; + +import java.io.Serializable; +import java.util.Map; + +/** + * ClassName: train data extracted from web logs for training ranking weightss. + */ +public class RankingTrainData implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + // sessionID: session ID + private String sessionID; + // type: session type name + private String index; + // query: query words related to the click + private String query; + // datasetA + private String highRankDataset; + // datasetB + private String lowRankDataset; + + private Map<String, String> filter; + + /** + * Creates a new instance of ClickStream. + * + * @param query the user query string + * @param highRankDataset the dataset name for the highest ranked dataset + * @param lowRankDataset the dataset name for the lowest ranked dataset + */ + public RankingTrainData(String query, String highRankDataset, String lowRankDataset) { + this.query = query; + this.highRankDataset = highRankDataset; + this.lowRankDataset = lowRankDataset; + } + + public RankingTrainData() { + //default constructor + } + + public String getSessionID() { + return sessionID; + } + + /** + * setKeyWords: Set the query user searched. + * + * @param query search words + */ + public void setQuery(String query) { + this.query = query; + } + + /** + * getKeyWords: Get the query user searched + * + * @return data set name + */ + public String getQuery() { + return this.query; + } + + /** + * setViewDataset:Set the data set name user viewed + * + * @param dataset short name of data set + */ + public void setHighRankDataset(String dataset) { + this.highRankDataset = dataset; + } + + /** + * setDownloadDataset: Set the data set name user downloaded + * + * @param dataset short name of data set + */ + public void setLowRankDataset(String dataset) { + this.lowRankDataset = dataset; + } + + /** + * getViewDataset: Get the data set user viewed + * + * @return data set name + */ + public String getLowRankDataset() { + return this.lowRankDataset; + } + + /** + * setSessionId: Set ID of session + * + * @param sessionID session id + */ + public void setSessionId(String sessionID) { + this.sessionID = sessionID; + } + + /** + * setType: Set session type name + * + * @param index session type name in elasticsearch + */ + public void setIndex(String index) { + this.index = index; + } + + public void setFilter(Map<String, String> filter) { + this.filter = filter; + } + + /** + * Output click stream info in string format + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "query:" + query + "|| highRankDataset:" + highRankDataset + "|| lowRankDataset:" + lowRankDataset; + } + + /** + * toJson: Output click stream info in Json format + * + * @return session in string format + */ + public String toJson() { + String jsonQuery = "{"; + jsonQuery += "\"query\":\"" + this.query + "\","; + jsonQuery += "\"highRankDataset\":\"" + this.highRankDataset + "\","; + jsonQuery += "\"lowRankDataset\":\"" + this.lowRankDataset + "\","; + + if (this.filter != null) { + for (String key : filter.keySet()) { + jsonQuery += "\"" + key + "\":\"" + filter.get(key) + "\","; + } + } + + jsonQuery += "\"sessionId\":\"" + this.sessionID + "\","; + jsonQuery += "\"index\":\"" + this.index + "\""; + jsonQuery += "},"; + return jsonQuery; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java new file mode 100644 index 0000000..f86438d --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java @@ -0,0 +1,294 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.structure; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * ClassName: RequestUrl Function: request url relate operations + */ +public class RequestUrl { + + private static final Logger LOG = LoggerFactory.getLogger(RequestUrl.class); + + /** + * Default Constructor + */ + public RequestUrl() { + /* Default Constructor */ + } + + /** + * UrlPage: Get url page from url link + * + * @param strURL request url + * @return page name + */ + public static String urlPage(String strURL) { + String strPage = null; + String[] arrSplit = null; + + String newURL = strURL.trim().toLowerCase(); + + arrSplit = newURL.split("[?]"); + if (newURL.length() > 0 && arrSplit.length > 1 && arrSplit[0] != null) { + strPage = arrSplit[0]; + } + + return strPage; + } + + /** + * TruncateUrlPage: Get url params from url link + * + * @param strURL + * @return url params + */ + private static String truncateUrlPage(String strURL) { + String strAllParam = null; + String[] arrSplit = null; + + strURL = strURL.trim().toLowerCase(); // keep this in mind + + arrSplit = strURL.split("[?]"); + if (strURL.length() > 1) { + if (arrSplit.length > 1) { + if (arrSplit[1] != null) { + strAllParam = arrSplit[1]; + } + } + } + + return strAllParam; + } + + /** + * URLRequest: Get url params from url link in a map format + * + * @param URL request url + * @return url params key value map + */ + public static Map<String, String> uRLRequest(String URL) { + Map<String, String> mapRequest = new HashMap<String, String>(); + + String[] arrSplit = null; + + String strUrlParam = truncateUrlPage(URL); + if (strUrlParam == null) { + return mapRequest; + } + + arrSplit = strUrlParam.split("[&]"); + for (String strSplit : arrSplit) { + String[] arrSplitEqual = null; + arrSplitEqual = strSplit.split("[=]"); + + if (arrSplitEqual.length > 1) { + + mapRequest.put(arrSplitEqual[0], arrSplitEqual[1]); + + } else { + if (arrSplitEqual[0] != "") { + + mapRequest.put(arrSplitEqual[0], ""); + } + } + } + return mapRequest; + } + + /** + * GetSearchInfo: Get search information from url link + * + * @param URL request url + * @return search params + * @throws UnsupportedEncodingException UnsupportedEncodingException + */ + public String getSearchInfo(String URL) throws UnsupportedEncodingException { + List<String> info = new ArrayList<String>(); + String keyword = ""; + Map<String, String> mapRequest = RequestUrl.uRLRequest(URL); + if (mapRequest.get("search") != null) { + try { + keyword = mapRequest.get("search"); + + keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8"); + if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) { + keyword = keyword.replace("%2b", " "); + keyword = keyword.replace("%20", " "); + keyword = keyword.replace("%25", " "); + } + + keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); + + } catch (UnsupportedEncodingException e) { + LOG.error(mapRequest.get("search")); + e.printStackTrace(); + } + if (!"".equals(keyword)) { + info.add(keyword); + } + + } + + if (mapRequest.get("ids") != null && mapRequest.get("values") != null) { + String id_raw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8"); + String value_raw = URLDecoder.decode(mapRequest.get("values"), "UTF-8"); + String[] ids = id_raw.split(":"); + String[] values = value_raw.split(":"); + + int a = ids.length; + int b = values.length; + int l = a < b ? a : b; + + for (int i = 0; i < l; i++) { + if (ids[i].equals("collections") || ids[i].equals("measurement") || ids[i].equals("sensor") || ids[i].equals("platform") || ids[i].equals("variable") || ids[i].equals("spatialcoverage")) { + try { + values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25"); + if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && !URLDecoder.decode(values[i], "UTF-8").equals("")) { + String item = URLDecoder.decode(values[i], "UTF-8").trim(); + if (item.contains("%2b") || item.contains("%20") || item.contains("%25")) { + item = item.replace("%2b", " "); + item = item.replace("%20", " "); + item = item.replace("%25", " "); + } + item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); + info.add(item); + } + } catch (Exception e) { + LOG.error(values[i]); + e.printStackTrace(); + } + } + + } + } + + return String.join(",", info); + } + + /** + * GetSearchWord: Get search words from url link + * + * @param url request url + * @return query + */ + public static String getSearchWord(String url) { + String keyword = ""; + + Map<String, String> mapRequest = RequestUrl.uRLRequest(url); + if (mapRequest.get("search") != null) { + try { + keyword = mapRequest.get("search"); + + keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8"); + if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) { + keyword = keyword.replace("%2b", " "); + keyword = keyword.replace("%20", " "); + keyword = keyword.replace("%25", " "); + } + keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); + } catch (UnsupportedEncodingException e) { + LOG.error(mapRequest.get("search")); + e.printStackTrace(); + } + } + + return keyword; + } + + /** + * GetFilterInfo: Get filter params from url link + * + * @param url request url + * @return filter facet key pair map + * @throws UnsupportedEncodingException UnsupportedEncodingException + */ + public static Map<String, String> getFilterInfo(String url) throws UnsupportedEncodingException { + List<String> info = new ArrayList<>(); + Map<String, String> filterValues = new HashMap<>(); + + String keyword = ""; + Map<String, String> mapRequest = RequestUrl.uRLRequest(url); + if (mapRequest.get("search") != null) { + try { + keyword = mapRequest.get("search"); + + keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8"); + if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) { + keyword = keyword.replace("%2b", " "); + keyword = keyword.replace("%20", " "); + keyword = keyword.replace("%25", " "); + } + keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); + + } catch (UnsupportedEncodingException e) { + LOG.error(mapRequest.get("search")); + e.printStackTrace(); + } + if (!"".equals(keyword)) { + info.add(keyword); + } + + } + + if (mapRequest.get("ids") != null && mapRequest.get("values") != null) { + String idRaw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8"); + String valueRaw = URLDecoder.decode(mapRequest.get("values"), "UTF-8"); + String[] ids = idRaw.split(":"); + String[] values = valueRaw.split(":"); + + int a = ids.length; + int b = values.length; + int l = a < b ? a : b; + + for (int i = 0; i < l; i++) { + try { + values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25"); + if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && !URLDecoder.decode(values[i], "UTF-8").equals("")) { + String item = URLDecoder.decode(values[i], "UTF-8").trim(); + if (item.contains("%2b") || item.contains("%20") || item.contains("%25")) { + item = item.replace("%2b", " "); + item = item.replace("%20", " "); + item = item.replace("%25", " "); + } + item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); + filterValues.put(ids[i], item); + } + } catch (Exception e) { + LOG.error(values[i]); + e.printStackTrace(); + } + } + } + + if (mapRequest.get("temporalsearch") != null) { + String temporalsearch = mapRequest.get("temporalsearch"); + temporalsearch = URLDecoder.decode(temporalsearch.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8"); + + filterValues.put("temporalsearch", temporalsearch); + } + + return filterValues; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java new file mode 100644 index 0000000..15c3d81 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java @@ -0,0 +1,288 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.structure; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; +import org.joda.time.Seconds; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * ClassName: Session Function: Session operations. + */ +public class Session /*extends MudrodAbstract*/ implements Comparable<Session> { + private static final Logger LOG = LoggerFactory.getLogger(Session.class); + // start: start time of session + private String start; + // end: end time of session + private String end; + // id: original session ID + private String id; + // newid: new session ID + private String newid = null; + // fmt: time formatter + private DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + + private ESDriver es; + private Properties props; + + /** + * Creates a new instance of Session. + * + * @param props the Mudrod configuration + * @param es the Elasticsearch drive + * @param start start time of session + * @param end end time of session + * @param id session ID + */ + public Session(Properties props, ESDriver es, String start, String end, String id) { + this.start = start; + this.end = end; + this.id = id; + + this.props = props; + this.es = es; + } + + /** + * Creates a new instance of Session. + * + * @param props the Mudrod configuration + * @param es the Elasticsearch drive + */ + public Session(Properties props, ESDriver es) { + this.props = props; + this.es = es; + } + + /** + * getID: Get original session ID + * + * @return session id + */ + public String getID() { + return id; + } + + /** + * getNewID: Get new session ID + * + * @return new session id + */ + public String getNewID() { + return newid; + } + + /** + * setNewID: Set new session ID + * + * @param str: session ID + * @return new session id + */ + public String setNewID(String str) { + return newid = str; + } + + /** + * getStartTime:Get start time of current session + * + * @return start time of session + */ + public String getStartTime() { + return start; + } + + /** + * getEndTime:Get end time of current session + * + * @return end time of session + */ + public String getEndTime() { + return end; + } + + /** + * Compare current session with another session + * + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + @Override + public int compareTo(Session o) { + fmt.parseDateTime(this.end); + fmt.parseDateTime(o.end); + // ascending order + return Seconds.secondsBetween(fmt.parseDateTime(o.end), fmt.parseDateTime(this.end)).getSeconds(); + + } + + /** + * getSessionDetail:Get detail of current session, which is used for session + * tree reconstruct + * + * @param indexName name of index from which you wish to obtain session detail. + * @param type: Session type name in Elasticsearch + * @param sessionID: Session ID + * @return Session details in Json format + */ + public JsonObject getSessionDetail(String indexName, String type, String sessionID) { + JsonObject sessionResults = new JsonObject(); + // for session tree + SessionTree tree = null; + JsonElement jsonRequest = null; + try { + tree = this.getSessionTree(indexName, type, sessionID); + JsonObject jsonTree = tree.treeToJson(tree.root); + sessionResults.add("treeData", jsonTree); + + jsonRequest = this.getRequests(type, sessionID); + sessionResults.add("RequestList", jsonRequest); + } catch (UnsupportedEncodingException e) { + LOG.error("Encoding error detected.", e); + + } + + return sessionResults; + } + + /** + * getClickStreamList: Extracted click stream list from current session. + * + * @param indexName an index from which to query for a session list + * @param type: Session type name in Elasticsearch + * @param sessionID: Session ID + * @return Click stram data list + * {@link ClickStream} + */ + public List<ClickStream> getClickStreamList(String indexName, String type, String sessionID) { + SessionTree tree = null; + try { + tree = this.getSessionTree(indexName, type, sessionID); + } catch (UnsupportedEncodingException e) { + LOG.error("Erro whilst obtaining the Session Tree: {}", e); + } + + List<ClickStream> clickthroughs = tree.getClickStreamList(); + return clickthroughs; + } + + /** + * Method of converting a given session to a tree structure + * + * @param type session type name in Elasticsearch + * @param sessionID ID of session + * @return an instance of session tree structure + * @throws UnsupportedEncodingException UnsupportedEncodingException + */ + private SessionTree getSessionTree(String indexName, String type, String sessionID) throws UnsupportedEncodingException { + + SearchResponse response = es.getClient().prepareSearch(indexName).setTypes(type).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100).addSort("Time", SortOrder.ASC) + .execute().actionGet(); + + SessionTree tree = new SessionTree(this.props, this.es, sessionID, type); + int seq = 1; + for (SearchHit hit : response.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String request = (String) result.get("Request"); + String time = (String) result.get("Time"); + String logType = (String) result.get("LogType"); + String referer = (String) result.get("Referer"); + + SessionNode node = new SessionNode(request, logType, referer, time, seq); + tree.insert(node); + seq++; + } + + return tree; + } + + /** + * Method of getting all requests from a given current session + * + * @param cleanuptype Session type name in Elasticsearch + * @param sessionID Session ID + * @return all of these requests in JSON + * @throws UnsupportedEncodingException UnsupportedEncodingException + */ + private JsonElement getRequests(String cleanuptype, String sessionID) throws UnsupportedEncodingException { + SearchResponse response = es.getClient().prepareSearch(props.getProperty("indexName")).setTypes(cleanuptype).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100) + .addSort("Time", SortOrder.ASC).execute().actionGet(); + + Gson gson = new Gson(); + List<JsonObject> requestList = new ArrayList<>(); + int seq = 1; + for (SearchHit hit : response.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String request = (String) result.get("Request"); + String requestUrl = (String) result.get("RequestUrl"); + String time = (String) result.get("Time"); + String logType = (String) result.get("LogType"); + String referer = (String) result.get("Referer"); + + JsonObject req = new JsonObject(); + req.addProperty("Time", time); + req.addProperty("Request", request); + req.addProperty("RequestURL", requestUrl); + req.addProperty("LogType", logType); + req.addProperty("Referer", referer); + req.addProperty("Seq", seq); + requestList.add(req); + + seq++; + } + return gson.toJsonTree(requestList); + } + + /** + * getClickStreamList: Extracted ranking training data from current session. + * + * @param indexName an index from which to obtain ranked training data. + * @param cleanuptype: Session type name in Elasticsearch + * @param sessionID: Session ID + * @return Click stram data list + * {@link ClickStream} + */ + public List<RankingTrainData> getRankingTrainData(String indexName, String cleanuptype, String sessionID) { + SessionTree tree = null; + try { + tree = this.getSessionTree(indexName, cleanuptype, sessionID); + } catch (UnsupportedEncodingException e) { + LOG.error("Error whilst retreiving Session Tree: {}", e); + } + + List<RankingTrainData> trainData = new ArrayList<>(); + try { + trainData = tree.getRankingTrainData(indexName, sessionID); + } catch (UnsupportedEncodingException e) { + LOG.error("Error whilst retreiving ranking training data: {}", e); + } + + return trainData; + } +}
