http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/UserHistoryAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/UserHistoryAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/UserHistoryAnalyzer.java deleted file mode 100644 index 248756b..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/UserHistoryAnalyzer.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.process; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import gov.nasa.jpl.mudrod.semantics.SemanticAnalyzer; -import gov.nasa.jpl.mudrod.utils.LinkageTriple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Properties; - -/** - * Supports ability to calculate term similarity based on user history - */ -public class UserHistoryAnalyzer extends DiscoveryStepAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(UserHistoryAnalyzer.class); - - public UserHistoryAnalyzer(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - /** - * Method of executing user history analyzer - */ - @Override - public Object execute() { - LOG.info("Starting UserHistoryAnalyzer..."); - startTime = System.currentTimeMillis(); - - SemanticAnalyzer sa = new SemanticAnalyzer(props, es, spark); - List<LinkageTriple> tripleList = sa.calTermSimfromMatrix(props.getProperty("userHistoryMatrix")); - sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.USE_HISTORY_LINKAGE_TYPE)); - - endTime = System.currentTimeMillis(); - es.refreshIndex(); - LOG.info("UserHistoryAnalyzer complete. Time elapsed: {}s", (endTime - startTime) / 1000); - return null; - } - - @Override - public Object execute(Object o) { - return null; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/package-info.java deleted file mode 100644 index e96fd3c..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/package-info.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes web log processing classes. - */ -package gov.nasa.jpl.mudrod.weblog.process; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ApacheAccessLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ApacheAccessLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ApacheAccessLog.java deleted file mode 100644 index 1051384..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ApacheAccessLog.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import com.google.gson.Gson; -import gov.nasa.jpl.mudrod.weblog.pre.CrawlerDetection; - -import java.io.IOException; -import java.io.Serializable; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * This class represents an Apache access log line. See - * http://httpd.apache.org/docs/2.2/logs.html for more details. - */ -public class ApacheAccessLog extends WebLog implements Serializable { - - // double Bytes; - String Response; - String Referer; - String Browser; - - @Override - public double getBytes() { - return this.Bytes; - } - - public String getBrowser() { - return this.Browser; - } - - public String getResponse() { - return this.Response; - } - - public String getReferer() { - return this.Referer; - } - - public ApacheAccessLog() { - - } - - public static String parseFromLogLine(String log) throws IOException, ParseException { - - String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" \"([^\"]+)\""; - final int NUM_FIELDS = 9; - Pattern p = Pattern.compile(logEntryPattern); - Matcher matcher; - - String lineJson = "{}"; - matcher = p.matcher(log); - if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) { - return lineJson; - } - - String time = matcher.group(4); - time = SwithtoNum(time); - SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); - Date date = formatter.parse(time); - - String bytes = matcher.group(7); - - if (bytes.equals("-")) { - bytes = "0"; - } - - String request = matcher.group(5).toLowerCase(); - String agent = matcher.group(9); - CrawlerDetection crawlerDe = new CrawlerDetection(); - if (crawlerDe.checkKnownCrawler(agent)) { - return lineJson; - } else { - - boolean tag = false; - String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" }; - for (int i = 0; i < mimeTypes.length; i++) { - if (request.contains(mimeTypes[i])) { - tag = true; - return lineJson; - } - } - - if (tag == false) { - ApacheAccessLog accesslog = new ApacheAccessLog(); - accesslog.LogType = "PO.DAAC"; - accesslog.IP = matcher.group(1); - accesslog.Request = matcher.group(5); - accesslog.Response = matcher.group(6); - accesslog.Bytes = Double.parseDouble(bytes); - accesslog.Referer = matcher.group(8); - accesslog.Browser = matcher.group(9); - SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'"); - accesslog.Time = df.format(date); - - Gson gson = new Gson(); - lineJson = gson.toJson(accesslog); - - return lineJson; - } - } - - lineJson = "{}"; - return lineJson; - } - - public static boolean checknull(WebLog s) { - if (s == null) { - return false; - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ClickStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ClickStream.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ClickStream.java deleted file mode 100644 index 76e8d7a..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/ClickStream.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; - -import java.io.Serializable; - -/** - * ClassName: ClickStream Function: user click stream data related operations. - */ -public class ClickStream implements Serializable { - /** - * - */ - private static final long serialVersionUID = 1L; - // keywords: query words related to the click behaviour - private String keywords; - // viewDataset: the dataset name user viewed - private String viewDataset; - // downloadDataset: the dataset name user downloaded - private String downloadDataset; - // sessionID: session ID - private String sessionID; - // type: session type name - private String type; - - /** - * Creates a new instance of ClickStream. - * - * @param keywords the query user searched - * @param viewDataset the dataset name user viewed - * @param download: if user download the data set after viewing it, this parameter is - * true, otherwise, it is false. - */ - public ClickStream(String keywords, String viewDataset, boolean download) { - this.keywords = keywords; - this.viewDataset = viewDataset; - this.downloadDataset = ""; - if (download) { - this.downloadDataset = viewDataset; - } - } - - public ClickStream() { - //default constructor - } - - public String getSessionID() { - return sessionID; - } - - /** - * setKeyWords: Set the query user searched. - * - * @param query search words - */ - public void setKeyWords(String query) { - this.keywords = query; - } - - /** - * setViewDataset:Set the data set name user viewed - * - * @param dataset short name of data set - */ - public void setViewDataset(String dataset) { - this.viewDataset = dataset; - } - - /** - * setDownloadDataset: Set the data set name user downloaded - * - * @param dataset short name of data set - */ - public void setDownloadDataset(String dataset) { - this.downloadDataset = dataset; - } - - /** - * getKeyWords: Get the query user searched - * - * @return data set name - */ - public String getKeyWords() { - return this.keywords; - } - - /** - * getViewDataset: Get the data set user viewed - * - * @return data set name - */ - public String getViewDataset() { - return this.viewDataset; - } - - /** - * isDownload: Show whether the data is downloaded in the session. - * - * @return True or False - */ - public Boolean isDownload() { - if ("".equals(this.downloadDataset)) { - return false; - } - return true; - } - - /** - * setSessionId: Set ID of session - * - * @param sessionID session id - */ - public void setSessionId(String sessionID) { - this.sessionID = sessionID; - } - - /** - * setType: Set session type name - * - * @param type session type name in elasticsearch - */ - public void setType(String type) { - this.type = type; - } - - /** - * Output click stream info in string format - * - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - return "Query: " + keywords + " || View Dataset: " + viewDataset + " || Download Dataset: " + downloadDataset; - } - - /** - * toJson: Output click stream info in Json format - * - * @return session in string format - */ - public String toJson() { - String jsonQuery = "{"; - jsonQuery += "\"query\":\"" + this.keywords + "\","; - jsonQuery += "\"viewdataset\":\"" + this.viewDataset + "\","; - jsonQuery += "\"downloaddataset\":\"" + this.downloadDataset + "\","; - jsonQuery += "\"sessionId\":\"" + this.sessionID + "\","; - jsonQuery += "\"type\":\"" + this.type + "\""; - jsonQuery += "},"; - return jsonQuery; - } - - /** - * parseFromTextLine: Convert string to click stream data - * - * @param logline http log line - * @return {@link ClickStream} - */ - public static ClickStream parseFromTextLine(String logline) { - JSONObject jsonData = null; - ClickStream data = null; - try { - jsonData = new JSONObject(logline); - data = new ClickStream(); - data.setKeyWords(jsonData.getString("query")); - data.setViewDataset(jsonData.getString("viewdataset")); - data.setDownloadDataset(jsonData.getString("downloaddataset")); - - } catch (JSONException e) { - e.printStackTrace(); - } - - return data; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Coordinates.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Coordinates.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Coordinates.java deleted file mode 100644 index f416eb4..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Coordinates.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -public class Coordinates { - /* - * public String lat; public String lon; - */ - public String latlon; -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/FtpLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/FtpLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/FtpLog.java deleted file mode 100644 index 5ddc717..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/FtpLog.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import com.google.gson.Gson; -import gov.nasa.jpl.mudrod.weblog.pre.ImportLogFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.text.SimpleDateFormat; -import java.util.Date; - -/** - * This class represents an FTP access log line. - */ -public class FtpLog extends WebLog implements Serializable { - - private static final Logger LOG = LoggerFactory.getLogger(ImportLogFile.class); - - public static String parseFromLogLine(String log) { - - try { - String ip = log.split(" +")[6]; - - String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + log.split(" +")[3] + ":" + log.split(" +")[4]; - - time = SwithtoNum(time); - SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy"); - Date date = formatter.parse(time); - - String bytes = log.split(" +")[7]; - - String request = log.split(" +")[8].toLowerCase(); - - if (!request.contains("/misc/") && !request.contains("readme")) { - FtpLog ftplog = new FtpLog(); - ftplog.LogType = "ftp"; - ftplog.IP = ip; - ftplog.Request = request; - ftplog.Bytes = Double.parseDouble(bytes); - - SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'"); - ftplog.Time = df.format(date); - - return new Gson().toJson(ftplog); - } - } catch (Exception e) { - LOG.warn("Error parsing ftp log line [{}]. Skipping this line.", log, e); - } - return "{}"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/GeoIp.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/GeoIp.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/GeoIp.java deleted file mode 100644 index 778224b..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/GeoIp.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import gov.nasa.jpl.mudrod.utils.HttpRequest; - -/** - * ClassName: GeoIp Function: convert IP to geo location - */ -public class GeoIp { - - /** - * toLocation: convert ip to location - * - * @param ip ip address - * @return coordinates - */ - public Coordinates toLocation(String ip) { - String url = "http://getcitydetails.geobytes.com/GetCityDetails?fqcn=" + ip; - HttpRequest http = new HttpRequest(); - String response = http.getRequest(url); - JsonParser parser = new JsonParser(); - JsonElement jobSon = parser.parse(response); - JsonObject responseObject = jobSon.getAsJsonObject(); - - Coordinates co = new Coordinates(); - String lon = responseObject.get("geobyteslongitude").toString().replace("\"", ""); - String lat = responseObject.get("geobyteslatitude").toString().replace("\"", ""); - co.latlon = lat + "," + lon; - return co; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RankingTrainData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RankingTrainData.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RankingTrainData.java deleted file mode 100644 index 7ea17c0..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RankingTrainData.java +++ /dev/null @@ -1,147 +0,0 @@ -package gov.nasa.jpl.mudrod.weblog.structure; - -import java.io.Serializable; -import java.util.Map; - -/** - * ClassName: train data extracted from web logs for training ranking weightss. - */ -public class RankingTrainData implements Serializable { - /** - * - */ - private static final long serialVersionUID = 1L; - // sessionID: session ID - private String sessionID; - // type: session type name - private String index; - // query: query words related to the click - private String query; - // datasetA - private String highRankDataset; - // datasetB - private String lowRankDataset; - - private Map<String, String> filter; - - /** - * Creates a new instance of ClickStream. - * - * @param query the user query string - * @param highRankDataset the dataset name for the highest ranked dataset - * @param lowRankDataset the dataset name for the lowest ranked dataset - */ - public RankingTrainData(String query, String highRankDataset, String lowRankDataset) { - this.query = query; - this.highRankDataset = highRankDataset; - this.lowRankDataset = lowRankDataset; - } - - public RankingTrainData() { - //default constructor - } - - public String getSessionID() { - return sessionID; - } - - /** - * setKeyWords: Set the query user searched. - * - * @param query search words - */ - public void setQuery(String query) { - this.query = query; - } - - /** - * getKeyWords: Get the query user searched - * - * @return data set name - */ - public String getQuery() { - return this.query; - } - - /** - * setViewDataset:Set the data set name user viewed - * - * @param dataset short name of data set - */ - public void setHighRankDataset(String dataset) { - this.highRankDataset = dataset; - } - - /** - * setDownloadDataset: Set the data set name user downloaded - * - * @param dataset short name of data set - */ - public void setLowRankDataset(String dataset) { - this.lowRankDataset = dataset; - } - - /** - * getViewDataset: Get the data set user viewed - * - * @return data set name - */ - public String getLowRankDataset() { - return this.lowRankDataset; - } - - /** - * setSessionId: Set ID of session - * - * @param sessionID session id - */ - public void setSessionId(String sessionID) { - this.sessionID = sessionID; - } - - /** - * setType: Set session type name - * - * @param index session type name in elasticsearch - */ - public void setIndex(String index) { - this.index = index; - } - - public void setFilter(Map<String, String> filter) { - this.filter = filter; - } - - /** - * Output click stream info in string format - * - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - return "query:" + query + "|| highRankDataset:" + highRankDataset + "|| lowRankDataset:" + lowRankDataset; - } - - /** - * toJson: Output click stream info in Json format - * - * @return session in string format - */ - public String toJson() { - String jsonQuery = "{"; - jsonQuery += "\"query\":\"" + this.query + "\","; - jsonQuery += "\"highRankDataset\":\"" + this.highRankDataset + "\","; - jsonQuery += "\"lowRankDataset\":\"" + this.lowRankDataset + "\","; - - if (this.filter != null) { - for (String key : filter.keySet()) { - jsonQuery += "\"" + key + "\":\"" + filter.get(key) + "\","; - } - } - - jsonQuery += "\"sessionId\":\"" + this.sessionID + "\","; - jsonQuery += "\"index\":\"" + this.index + "\""; - jsonQuery += "},"; - return jsonQuery; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java deleted file mode 100644 index bbfb79c..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * ClassName: RequestUrl Function: request url relate operations - */ -public class RequestUrl { - - private static final Logger LOG = LoggerFactory.getLogger(RequestUrl.class); - - /** - * Default Constructor - */ - public RequestUrl() { - /* Default Constructor */ - } - - /** - * UrlPage: Get url page from url link - * - * @param strURL request url - * @return page name - */ - public static String urlPage(String strURL) { - String strPage = null; - String[] arrSplit = null; - - String newURL = strURL.trim().toLowerCase(); - - arrSplit = newURL.split("[?]"); - if (newURL.length() > 0 && arrSplit.length > 1 && arrSplit[0] != null) { - strPage = arrSplit[0]; - } - - return strPage; - } - - /** - * TruncateUrlPage: Get url params from url link - * - * @param strURL - * @return url params - */ - private static String truncateUrlPage(String strURL) { - String strAllParam = null; - String[] arrSplit = null; - - strURL = strURL.trim().toLowerCase(); // keep this in mind - - arrSplit = strURL.split("[?]"); - if (strURL.length() > 1) { - if (arrSplit.length > 1) { - if (arrSplit[1] != null) { - strAllParam = arrSplit[1]; - } - } - } - - return strAllParam; - } - - /** - * URLRequest: Get url params from url link in a map format - * - * @param URL request url - * @return url params key value map - */ - public static Map<String, String> uRLRequest(String URL) { - Map<String, String> mapRequest = new HashMap<String, String>(); - - String[] arrSplit = null; - - String strUrlParam = truncateUrlPage(URL); - if (strUrlParam == null) { - return mapRequest; - } - - arrSplit = strUrlParam.split("[&]"); - for (String strSplit : arrSplit) { - String[] arrSplitEqual = null; - arrSplitEqual = strSplit.split("[=]"); - - if (arrSplitEqual.length > 1) { - - mapRequest.put(arrSplitEqual[0], arrSplitEqual[1]); - - } else { - if (arrSplitEqual[0] != "") { - - mapRequest.put(arrSplitEqual[0], ""); - } - } - } - return mapRequest; - } - - /** - * GetSearchInfo: Get search information from url link - * - * @param URL request url - * @return search params - * @throws UnsupportedEncodingException UnsupportedEncodingException - */ - public String getSearchInfo(String URL) throws UnsupportedEncodingException { - List<String> info = new ArrayList<String>(); - String keyword = ""; - Map<String, String> mapRequest = RequestUrl.uRLRequest(URL); - if (mapRequest.get("search") != null) { - try { - keyword = mapRequest.get("search"); - - keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8"); - if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) { - keyword = keyword.replace("%2b", " "); - keyword = keyword.replace("%20", " "); - keyword = keyword.replace("%25", " "); - } - - keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); - - } catch (UnsupportedEncodingException e) { - LOG.error(mapRequest.get("search")); - e.printStackTrace(); - } - if (!"".equals(keyword)) { - info.add(keyword); - } - - } - - if (mapRequest.get("ids") != null && mapRequest.get("values") != null) { - String id_raw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8"); - String value_raw = URLDecoder.decode(mapRequest.get("values"), "UTF-8"); - String[] ids = id_raw.split(":"); - String[] values = value_raw.split(":"); - - int a = ids.length; - int b = values.length; - int l = a < b ? a : b; - - for (int i = 0; i < l; i++) { - if (ids[i].equals("collections") || ids[i].equals("measurement") || ids[i].equals("sensor") || ids[i].equals("platform") || ids[i].equals("variable") || ids[i].equals("spatialcoverage")) { - try { - values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25"); - if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && !URLDecoder.decode(values[i], "UTF-8").equals("")) { - String item = URLDecoder.decode(values[i], "UTF-8").trim(); - if (item.contains("%2b") || item.contains("%20") || item.contains("%25")) { - item = item.replace("%2b", " "); - item = item.replace("%20", " "); - item = item.replace("%25", " "); - } - item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); - info.add(item); - } - } catch (Exception e) { - LOG.error(values[i]); - e.printStackTrace(); - } - } - - } - } - - return String.join(",", info); - } - - /** - * GetSearchWord: Get search words from url link - * - * @param url request url - * @return query - */ - public static String getSearchWord(String url) { - String keyword = ""; - - Map<String, String> mapRequest = RequestUrl.uRLRequest(url); - if (mapRequest.get("search") != null) { - try { - keyword = mapRequest.get("search"); - - keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8"); - if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) { - keyword = keyword.replace("%2b", " "); - keyword = keyword.replace("%20", " "); - keyword = keyword.replace("%25", " "); - } - keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); - } catch (UnsupportedEncodingException e) { - LOG.error(mapRequest.get("search")); - e.printStackTrace(); - } - } - - return keyword; - } - - /** - * GetFilterInfo: Get filter params from url link - * - * @param url request url - * @return filter facet key pair map - * @throws UnsupportedEncodingException UnsupportedEncodingException - */ - public static Map<String, String> getFilterInfo(String url) throws UnsupportedEncodingException { - List<String> info = new ArrayList<>(); - Map<String, String> filterValues = new HashMap<>(); - - String keyword = ""; - Map<String, String> mapRequest = RequestUrl.uRLRequest(url); - if (mapRequest.get("search") != null) { - try { - keyword = mapRequest.get("search"); - - keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8"); - if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) { - keyword = keyword.replace("%2b", " "); - keyword = keyword.replace("%20", " "); - keyword = keyword.replace("%25", " "); - } - keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); - - } catch (UnsupportedEncodingException e) { - LOG.error(mapRequest.get("search")); - e.printStackTrace(); - } - if (!"".equals(keyword)) { - info.add(keyword); - } - - } - - if (mapRequest.get("ids") != null && mapRequest.get("values") != null) { - String idRaw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8"); - String valueRaw = URLDecoder.decode(mapRequest.get("values"), "UTF-8"); - String[] ids = idRaw.split(":"); - String[] values = valueRaw.split(":"); - - int a = ids.length; - int b = values.length; - int l = a < b ? a : b; - - for (int i = 0; i < l; i++) { - try { - values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25"); - if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && !URLDecoder.decode(values[i], "UTF-8").equals("")) { - String item = URLDecoder.decode(values[i], "UTF-8").trim(); - if (item.contains("%2b") || item.contains("%20") || item.contains("%25")) { - item = item.replace("%2b", " "); - item = item.replace("%20", " "); - item = item.replace("%25", " "); - } - item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim(); - filterValues.put(ids[i], item); - } - } catch (Exception e) { - LOG.error(values[i]); - e.printStackTrace(); - } - } - } - - if (mapRequest.get("temporalsearch") != null) { - String temporalsearch = mapRequest.get("temporalsearch"); - temporalsearch = URLDecoder.decode(temporalsearch.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8"); - - filterValues.put("temporalsearch", temporalsearch); - } - - return filterValues; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java deleted file mode 100644 index 93f4288..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.sort.SortOrder; -import org.joda.time.Seconds; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * ClassName: Session Function: Session operations. - */ -public class Session /*extends MudrodAbstract*/ implements Comparable<Session> { - private static final Logger LOG = LoggerFactory.getLogger(Session.class); - // start: start time of session - private String start; - // end: end time of session - private String end; - // id: original session ID - private String id; - // newid: new session ID - private String newid = null; - // fmt: time formatter - private DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); - - private ESDriver es; - private Properties props; - - /** - * Creates a new instance of Session. - * - * @param props the Mudrod configuration - * @param es the Elasticsearch drive - * @param start start time of session - * @param end end time of session - * @param id session ID - */ - public Session(Properties props, ESDriver es, String start, String end, String id) { - this.start = start; - this.end = end; - this.id = id; - - this.props = props; - this.es = es; - } - - /** - * Creates a new instance of Session. - * - * @param props the Mudrod configuration - * @param es the Elasticsearch drive - */ - public Session(Properties props, ESDriver es) { - this.props = props; - this.es = es; - } - - /** - * getID: Get original session ID - * - * @return session id - */ - public String getID() { - return id; - } - - /** - * getNewID: Get new session ID - * - * @return new session id - */ - public String getNewID() { - return newid; - } - - /** - * setNewID: Set new session ID - * - * @param str: session ID - * @return new session id - */ - public String setNewID(String str) { - return newid = str; - } - - /** - * getStartTime:Get start time of current session - * - * @return start time of session - */ - public String getStartTime() { - return start; - } - - /** - * getEndTime:Get end time of current session - * - * @return end time of session - */ - public String getEndTime() { - return end; - } - - /** - * Compare current session with another session - * - * @see java.lang.Comparable#compareTo(java.lang.Object) - */ - @Override - public int compareTo(Session o) { - fmt.parseDateTime(this.end); - fmt.parseDateTime(o.end); - // ascending order - return Seconds.secondsBetween(fmt.parseDateTime(o.end), fmt.parseDateTime(this.end)).getSeconds(); - - } - - /** - * getSessionDetail:Get detail of current session, which is used for session - * tree reconstruct - * - * @param indexName name of index from which you wish to obtain session detail. - * @param type: Session type name in Elasticsearch - * @param sessionID: Session ID - * @return Session details in Json format - */ - public JsonObject getSessionDetail(String indexName, String type, String sessionID) { - JsonObject sessionResults = new JsonObject(); - // for session tree - SessionTree tree = null; - JsonElement jsonRequest = null; - try { - tree = this.getSessionTree(indexName, type, sessionID); - JsonObject jsonTree = tree.treeToJson(tree.root); - sessionResults.add("treeData", jsonTree); - - jsonRequest = this.getRequests(type, sessionID); - sessionResults.add("RequestList", jsonRequest); - } catch (UnsupportedEncodingException e) { - LOG.error("Encoding error detected.", e); - - } - - return sessionResults; - } - - /** - * getClickStreamList: Extracted click stream list from current session. - * - * @param indexName an index from which to query for a session list - * @param type: Session type name in Elasticsearch - * @param sessionID: Session ID - * @return Click stram data list - * {@link ClickStream} - */ - public List<ClickStream> getClickStreamList(String indexName, String type, String sessionID) { - SessionTree tree = null; - try { - tree = this.getSessionTree(indexName, type, sessionID); - } catch (UnsupportedEncodingException e) { - LOG.error("Erro whilst obtaining the Session Tree: {}", e); - } - - List<ClickStream> clickthroughs = tree.getClickStreamList(); - return clickthroughs; - } - - /** - * Method of converting a given session to a tree structure - * - * @param type session type name in Elasticsearch - * @param sessionID ID of session - * @return an instance of session tree structure - * @throws UnsupportedEncodingException UnsupportedEncodingException - */ - private SessionTree getSessionTree(String indexName, String type, String sessionID) throws UnsupportedEncodingException { - - SearchResponse response = es.getClient().prepareSearch(indexName).setTypes(type).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100).addSort("Time", SortOrder.ASC) - .execute().actionGet(); - - SessionTree tree = new SessionTree(this.props, this.es, sessionID, type); - int seq = 1; - for (SearchHit hit : response.getHits().getHits()) { - Map<String, Object> result = hit.getSource(); - String request = (String) result.get("Request"); - String time = (String) result.get("Time"); - String logType = (String) result.get("LogType"); - String referer = (String) result.get("Referer"); - - SessionNode node = new SessionNode(request, logType, referer, time, seq); - tree.insert(node); - seq++; - } - - return tree; - } - - /** - * Method of getting all requests from a given current session - * - * @param cleanuptype Session type name in Elasticsearch - * @param sessionID Session ID - * @return all of these requests in JSON - * @throws UnsupportedEncodingException UnsupportedEncodingException - */ - private JsonElement getRequests(String cleanuptype, String sessionID) throws UnsupportedEncodingException { - SearchResponse response = es.getClient().prepareSearch(props.getProperty("indexName")).setTypes(cleanuptype).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100) - .addSort("Time", SortOrder.ASC).execute().actionGet(); - - Gson gson = new Gson(); - List<JsonObject> requestList = new ArrayList<>(); - int seq = 1; - for (SearchHit hit : response.getHits().getHits()) { - Map<String, Object> result = hit.getSource(); - String request = (String) result.get("Request"); - String requestUrl = (String) result.get("RequestUrl"); - String time = (String) result.get("Time"); - String logType = (String) result.get("LogType"); - String referer = (String) result.get("Referer"); - - JsonObject req = new JsonObject(); - req.addProperty("Time", time); - req.addProperty("Request", request); - req.addProperty("RequestURL", requestUrl); - req.addProperty("LogType", logType); - req.addProperty("Referer", referer); - req.addProperty("Seq", seq); - requestList.add(req); - - seq++; - } - return gson.toJsonTree(requestList); - } - - /** - * getClickStreamList: Extracted ranking training data from current session. - * - * @param indexName an index from which to obtain ranked training data. - * @param cleanuptype: Session type name in Elasticsearch - * @param sessionID: Session ID - * @return Click stram data list - * {@link ClickStream} - */ - public List<RankingTrainData> getRankingTrainData(String indexName, String cleanuptype, String sessionID) { - SessionTree tree = null; - try { - tree = this.getSessionTree(indexName, cleanuptype, sessionID); - } catch (UnsupportedEncodingException e) { - LOG.error("Error whilst retreiving Session Tree: {}", e); - } - - List<RankingTrainData> trainData = new ArrayList<>(); - try { - trainData = tree.getRankingTrainData(indexName, sessionID); - } catch (UnsupportedEncodingException e) { - LOG.error("Error whilst retreiving ranking training data: {}", e); - } - - return trainData; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java deleted file mode 100644 index edba32e..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; - -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import scala.Tuple2; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * ClassName: SessionExtractor Function: Extract sessions details from - * reconstructed sessions. - */ -public class SessionExtractor implements Serializable { - - private static final Logger LOG = LoggerFactory.getLogger(SessionExtractor.class); - - /** - * - */ - private static final long serialVersionUID = 1L; - - public SessionExtractor() { - // default constructor - } - - /** - * extractClickStreamFromES:Extract click streams from logs stored in - * Elasticsearch - * - * @param props - * the Mudrod configuration - * @param es - * the Elasticsearch drive - * @param spark - * the spark driver - * @return clickstream list in JavaRDD format {@link ClickStream} - */ - public JavaRDD<ClickStream> extractClickStreamFromES(Properties props, ESDriver es, SparkDriver spark) { - switch (props.getProperty(MudrodConstants.PROCESS_TYPE)) { - case "sequential": - List<ClickStream> queryList = this.getClickStreamList(props, es); - return spark.sc.parallelize(queryList); - case "parallel": - return getClickStreamListInParallel(props, spark, es); - default: - LOG.error("Error finding processing type for '{}'. Please check your config.xml.", props.getProperty(MudrodConstants.PROCESS_TYPE)); - } - return null; - } - - /** - * getClickStreamList:Extract click streams from logs stored in Elasticsearch. - * - * @param props - * the Mudrod configuration - * @param es - * the Elasticsearch driver - * @return clickstream list {@link ClickStream} - */ - protected List<ClickStream> getClickStreamList(Properties props, ESDriver es) { - List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX)); - - List<ClickStream> result = new ArrayList<>(); - for (int n = 0; n < logIndexList.size(); n++) { - String logIndex = logIndexList.get(n); - List<String> sessionIdList; - try { - sessionIdList = this.getSessions(props, es, logIndex); - Session session = new Session(props, es); - int sessionNum = sessionIdList.size(); - for (int i = 0; i < sessionNum; i++) { - String[] sArr = sessionIdList.get(i).split(","); - List<ClickStream> datas = session.getClickStreamList(sArr[1], sArr[2], sArr[0]); - result.addAll(datas); - } - } catch (Exception e) { - LOG.error("Error during extraction of Clickstreams from log index. {}", e); - } - } - - return result; - } - - protected JavaRDD<ClickStream> getClickStreamListInParallel(Properties props, SparkDriver spark, ESDriver es) { - - List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX)); - - LOG.info("Retrieved {}", logIndexList.toString()); - - List<String> sessionIdList = new ArrayList<>(); - for (int n = 0; n < logIndexList.size(); n++) { - String logIndex = logIndexList.get(n); - List<String> tmpsessionList = this.getSessions(props, es, logIndex); - sessionIdList.addAll(tmpsessionList); - } - - JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16); - - JavaRDD<ClickStream> clickStreamRDD = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, ClickStream>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Iterator<ClickStream> call(Iterator<String> arg0) throws Exception { - ESDriver tmpES = new ESDriver(props); - tmpES.createBulkProcessor(); - - Session session = new Session(props, tmpES); - List<ClickStream> clickstreams = new ArrayList<>(); - while (arg0.hasNext()) { - String s = arg0.next(); - String[] sArr = s.split(","); - List<ClickStream> clicks = session.getClickStreamList(sArr[1], sArr[2], sArr[0]); - clickstreams.addAll(clicks); - } - tmpES.destroyBulkProcessor(); - tmpES.close(); - return clickstreams.iterator(); - } - }); - - LOG.info("Clickstream number: {}", clickStreamRDD.count()); - - return clickStreamRDD; - } - - // This function is reserved and not being used for now - - /** - * loadClickStremFromTxt:Load click stream form txt file - * - * @param clickthroughFile - * txt file - * @param sc - * the spark context - * @return clickstream list in JavaRDD format {@link ClickStream} - */ - public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile, JavaSparkContext sc) { - return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String, ClickStream>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @SuppressWarnings("unchecked") - @Override - public Iterator<ClickStream> call(String line) throws Exception { - List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line); - return (Iterator<ClickStream>) clickthroughs; - } - }); - } - - /** - * bulidDataQueryRDD: convert click stream list to data set queries pairs. - * - * @param clickstreamRDD: - * click stream data - * @param downloadWeight: - * weight of download behavior - * @return JavaPairRDD, key is short name of data set, and values are queries - */ - public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) { - return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, List<String>> call(ClickStream click) throws Exception { - List<String> query = new ArrayList<>(); - // important! download behavior is given higher weights - // than viewing - // behavior - boolean download = click.isDownload(); - int weight = 1; - if (download) { - weight = downloadWeight; - } - for (int i = 0; i < weight; i++) { - query.add(click.getKeyWords()); - } - - return new Tuple2<>(click.getViewDataset(), query); - } - }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public List<String> call(List<String> v1, List<String> v2) throws Exception { - List<String> list = new ArrayList<>(); - list.addAll(v1); - list.addAll(v2); - return list; - } - }); - } - - /** - * getSessions: Get sessions from logs - * - * @param props - * the Mudrod configuration - * @param es - * the Elasticsearch driver - * @param logIndex - * a log index name - * @return list of session names - */ - protected List<String> getSessions(Properties props, ESDriver es, String logIndex) { - - String cleanupPrefix = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX); - String sessionStatPrefix = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX); - - List<String> sessions = new ArrayList<>(); - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(sessionStatPrefix).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute() - .actionGet(); - while (true) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - Map<String, Object> session = hit.getSource(); - String sessionID = (String) session.get("SessionID"); - sessions.add(sessionID + "," + logIndex + "," + cleanupPrefix); - } - - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - if (scrollResp.getHits().getHits().length == 0) { - break; - } - } - - return sessions; - } - - public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) { - return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, Double> call(ClickStream click) throws Exception { - double rate = 1; - boolean download = click.isDownload(); - if (download) { - rate = 2; - } - - String sessionID = click.getSessionID(); - String user = sessionID.split("@")[0]; - - return new Tuple2<>(user + "," + click.getViewDataset(), rate); - } - }).reduceByKey(new Function2<Double, Double, Double>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Double call(Double v1, Double v2) throws Exception { - return v1 >= v2 ? v1 : v2; - - } - }); - } - - public JavaPairRDD<String, Double> bulidSessionItermRDD(JavaRDD<ClickStream> clickstreamRDD) { - JavaPairRDD<String, String> sessionItemRDD = clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, String>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, String> call(ClickStream click) throws Exception { - - String sessionID = click.getSessionID(); - return new Tuple2<>(sessionID, click.getViewDataset()); - } - }).distinct(); - - // remove some sessions - JavaPairRDD<String, Double> sessionItemNumRDD = sessionItemRDD.keys().mapToPair(new PairFunction<String, String, Double>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, Double> call(String item) throws Exception { - return new Tuple2<>(item, 1.0); - } - }).reduceByKey(new Function2<Double, Double, Double>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Double call(Double v1, Double v2) throws Exception { - return v1 + v2; - } - }).filter(new Function<Tuple2<String, Double>, Boolean>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Boolean call(Tuple2<String, Double> arg0) throws Exception { - Boolean b = true; - if (arg0._2 < 2) { - b = false; - } - return b; - } - }); - - return sessionItemNumRDD.leftOuterJoin(sessionItemRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<Double, Optional<String>>>, String, Double>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, Double> call(Tuple2<String, Tuple2<Double, Optional<String>>> arg0) throws Exception { - - Tuple2<Double, Optional<String>> test = arg0._2; - Optional<String> optStr = test._2; - String item = ""; - if (optStr.isPresent()) { - item = optStr.get(); - } - return new Tuple2<>(arg0._1 + "," + item, 1.0); - } - - }); - } - - public JavaPairRDD<String, List<String>> bulidSessionDatasetRDD(Properties props, ESDriver es, SparkDriver spark) { - - List<String> result = new ArrayList<>(); - List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX)); - for (int n = 0; n < logIndexList.size(); n++) { - String logIndex = logIndexList.get(n); - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(props.getProperty(MudrodConstants.SESSION_STATS_PREFIX)).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()) - .setSize(100).execute().actionGet(); - while (true) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - Map<String, Object> session = hit.getSource(); - String sessionID = (String) session.get("SessionID"); - String views = (String) session.get("views"); - if (views != null && !"".equals(views)) { - String sessionItems = sessionID + ":" + views; - result.add(sessionItems); - } - } - - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - if (scrollResp.getHits().getHits().length == 0) { - break; - } - } - } - - JavaRDD<String> sessionRDD = spark.sc.parallelize(result); - - return sessionRDD.mapToPair(new PairFunction<String, String, List<String>>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, List<String>> call(String sessionitem) throws Exception { - String[] splits = sessionitem.split(":"); - String sessionId = splits[0]; - List<String> itemList = new ArrayList<>(); - - String items = splits[1]; - String[] itemArr = items.split(","); - int size = itemArr.length; - for (int i = 0; i < size; i++) { - String item = itemArr[i]; - if (!itemList.contains(item)) - itemList.add(itemArr[i]); - } - - return new Tuple2<>(sessionId, itemList); - } - }); - } - - /** - * extractClickStreamFromES:Extract click streams from logs stored in - * Elasticsearch - * - * @param props - * the Mudrod configuration - * @param es - * the Elasticsearch drive - * @param spark - * the spark driver - * @return clickstream list in JavaRDD format {@link ClickStream} - */ - public JavaRDD<RankingTrainData> extractRankingTrainData(Properties props, ESDriver es, SparkDriver spark) { - - List<RankingTrainData> queryList = this.extractRankingTrainData(props, es); - return spark.sc.parallelize(queryList); - - } - - /** - * getClickStreamList:Extract click streams from logs stored in Elasticsearch. - * - * @param props - * the Mudrod configuration - * @param es - * the Elasticsearch driver - * @return clickstream list {@link ClickStream} - */ - protected List<RankingTrainData> extractRankingTrainData(Properties props, ESDriver es) { - List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX)); - - LOG.info(logIndexList.toString()); - - List<RankingTrainData> result = new ArrayList<>(); - for (int n = 0; n < logIndexList.size(); n++) { - String logIndex = logIndexList.get(n); - List<String> sessionIdList; - try { - sessionIdList = this.getSessions(props, es, logIndex); - Session session = new Session(props, es); - int sessionNum = sessionIdList.size(); - for (int i = 0; i < sessionNum; i++) { - String[] sArr = sessionIdList.get(i).split(","); - List<RankingTrainData> datas = session.getRankingTrainData(sArr[1], sArr[2], sArr[0]); - result.addAll(datas); - } - } catch (Exception e) { - LOG.error("Error which extracting ranking train data: {}", e); - } - } - - return result; - } - - protected JavaRDD<RankingTrainData> extractRankingTrainDataInParallel(Properties props, SparkDriver spark, ESDriver es) { - - List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX)); - - LOG.info(logIndexList.toString()); - - List<String> sessionIdList = new ArrayList<>(); - for (int n = 0; n < logIndexList.size(); n++) { - String logIndex = logIndexList.get(n); - List<String> tmpsessionList = this.getSessions(props, es, logIndex); - sessionIdList.addAll(tmpsessionList); - } - - JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16); - - JavaRDD<RankingTrainData> clickStreamRDD = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, RankingTrainData>() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Iterator<RankingTrainData> call(Iterator<String> arg0) throws Exception { - ESDriver tmpES = new ESDriver(props); - tmpES.createBulkProcessor(); - - Session session = new Session(props, tmpES); - List<RankingTrainData> clickstreams = new ArrayList<>(); - while (arg0.hasNext()) { - String s = arg0.next(); - String[] sArr = s.split(","); - List<RankingTrainData> clicks = session.getRankingTrainData(sArr[1], sArr[2], sArr[0]); - clickstreams.addAll(clicks); - } - tmpES.destroyBulkProcessor(); - tmpES.close(); - return clickstreams.iterator(); - } - }); - - LOG.info("Clickstream number: {}", clickStreamRDD.count()); - - return clickStreamRDD; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java deleted file mode 100644 index 958e184..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * ClassName: SessionNode Function: Functions related to a node in a session - * tree sturcture. - */ -public class SessionNode { - // id: Node ID - protected String id; - // value: Node value - protected String value; - // parent: Parent node of this node - protected SessionNode parent; - // children: Child nodes of this node - protected List<SessionNode> children = new ArrayList<>(); - // time: request time of node - protected String time; - // request: request url of this node - protected String request; - // referer: previous request url of this node - protected String referer; - // seq: sequence of this node - protected int seq; - // key: type of this node extracted from url, including three types - - // dataset,datasetlist,ftp - protected String key; - // logType: log types of this node, including two types - po.dacc, ftp - protected String logType; - // search: query extracted from this node - protected String search; - // filter: filter facets extracted from this node - protected Map<String, String> filter; - // datasetId: viewed/downloaded data set ID - protected String datasetId; - - public SessionNode() { - - } - - /** - * Creates a new instance of SessionNode. - * - * @param request: request url - * @param logType: including two types - po.dacc, ftp - * @param referer: previous request url - * @param time: request time of node - * @param seq: sequence of this node - */ - public SessionNode(String request, String logType, String referer, String time, int seq) { - this.logType = logType; - this.time = time; - this.seq = seq; - this.setRequest(request); - this.setReferer(referer); - this.setKey(request, logType); - } - - /** - * setReferer: Set previous request url of this node - * - * @param referer previous request url - */ - public void setReferer(String referer) { - if (referer == null) { - this.referer = ""; - return; - } - this.referer = referer.toLowerCase().replace("http://podaac.jpl.nasa.gov", ""); - } - - /** - * setRequest: Set request url of this node - * - * @param req request url - */ - public void setRequest(String req) { - this.request = req; - if (this.logType.equals("PO.DAAC")) { - this.parseRequest(req); - } - } - - /** - * getChildren:Get child nodes of this node - * - * @return child nodes - */ - public List<SessionNode> getChildren() { - return this.children; - } - - /** - * setChildren: Set child nodes of this node - * - * @param children child nodes of this node - */ - public void setChildren(List<SessionNode> children) { - this.children = children; - } - - /** - * addChildren: Add a children node - * - * @param node session node - */ - public void addChildren(SessionNode node) { - this.children.add(node); - } - - /** - * getId:Get node ID - * - * @return node ID of this node - */ - public String getId() { - return this.id; - } - - /** - * bSame:Compare this node with another node - * - * @param node {@link SessionNode} - * @return boolean value, true mean the two nodes are same - */ - public Boolean bSame(SessionNode node) { - Boolean bsame = false; - if (this.request.equals(node.request)) { - bsame = true; - } - return bsame; - } - - /** - * setKey:Set request type which contains three categories - - * dataset,datasetlist,ftp - * - * @param request request url - * @param logType url type - */ - public void setKey(String request, String logType) { - this.key = ""; - String datasetlist = "/datasetlist?"; - String dataset = "/dataset/"; - if (logType.equals("ftp")) { - this.key = "ftp"; - } else if (logType.equals("root")) { - this.key = "root"; - } else { - if (request.contains(datasetlist)) { - this.key = "datasetlist"; - } else if (request.contains(dataset) /* || request.contains(granule) */) { - this.key = "dataset"; - } - } - } - - /** - * getKey:Get request type which contains three categories - - * dataset,datasetlist,ftp - * - * @return request url type of this node - */ - public String getKey() { - return this.key; - } - - /** - * getRequest:Get node request - * - * @return request url of this node - */ - public String getRequest() { - return this.request; - } - - /** - * getReferer:Get previous request url of this node - * - * @return previous request url of this node - */ - public String getReferer() { - return this.referer; - } - - /** - * getParent:Get parent node of this node - * - * @return parent node of this node - */ - public SessionNode getParent() { - return this.parent; - } - - /** - * setParent: Set parent node of this node - * - * @param parent the previous request node of this node - */ - public void setParent(SessionNode parent) { - this.parent = parent; - } - - /** - * getSearch:Get query of this node - * - * @return search query of this node - */ - public String getSearch() { - return this.search; - } - - /** - * getFilter:Get filter facets of this node - * - * @return filter values of this node - */ - public Map<String, String> getFilter() { - return this.filter; - } - - /** - * getDatasetId:Get data set ID of this node - * - * @return viewing/downloading data set of this node - */ - public String getDatasetId() { - return this.datasetId; - } - - /** - * getSeq:Get sequence of this node - * - * @return request sequence of this node - */ - public int getSeq() { - return this.seq; - } - - /** - * getFilterStr:Get filter facets of this node - * - * @return filters values of this node - */ - public String getFilterStr() { - String filter = ""; - if (this.filter.size() > 0) { - Iterator iter = this.filter.keySet().iterator(); - while (iter.hasNext()) { - String key = (String) iter.next(); - String val = this.filter.get(key); - filter += key + "=" + val + ","; - } - - filter = filter.substring(0, filter.length() - 1); - } - - return filter; - } - - /** - * parseRequest:Parse request to extract request type - * - * @param request request url of this node - */ - public void parseRequest(String request) { - Pattern pattern = Pattern.compile("get (.*?) http/*"); - Matcher matcher = pattern.matcher(request.trim().toLowerCase()); - while (matcher.find()) { - request = matcher.group(1); - } - if (request.contains("/dataset/")) { - this.parseDatasetId(request); - } - - this.request = request.toLowerCase(); - } - - /** - * parseFilterParams:Parse filter facets information - * - * @param params filter key value pairs of this node - */ - private void parseFilterParams(Map<String, String> params) { - this.filter = new HashMap<String, String>(); - if (params.containsKey("ids")) { - String idsStr = params.get("ids"); - if (!idsStr.equals("")) { - idsStr = URLDecoder.decode(idsStr); - String[] ids = idsStr.split(":"); - String valueStr = params.get("values"); - if (valueStr != null) { - valueStr = URLDecoder.decode(valueStr); - String[] values = valueStr.split(":"); - int size = ids.length; - for (int i = 0; i < size; i++) { - this.filter.put(ids[i], values[i]); - } - } - } - } - - if (!this.search.equals("")) { - this.filter.put("search", this.search); - } - } - - /** - * parseDatasetId:Parse Request to extract data set ID - * - * @param request request url - */ - public void parseDatasetId(String request) { - try { - request = URLDecoder.decode(request, "UTF-8"); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - String[] twoparts = request.split("[?]"); - String[] parts = twoparts[0].split("/"); - if (parts.length <= 2) { - return; - } - this.datasetId = parts[2]; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java deleted file mode 100644 index 46c8d0c..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java +++ /dev/null @@ -1,521 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.util.*; -import java.util.concurrent.ExecutionException; - -/** - * ClassName: SessionTree Function: Convert request list in a session to a tree - */ -public class SessionTree extends MudrodAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(SessionTree.class); - // size: node numbers in the session tree - public int size = 0; - // root: root node of session tree - protected SessionNode root = null; - // binsert: indicates inserting a node or not - public boolean binsert = false; - // tmpnode: tempt node - public SessionNode tmpnode; - // latestDatasetnode: the latest inserted node whose key is "dataset" - public SessionNode latestDatasetnode; - // sessionID: session ID - private String sessionID; - // cleanupType: session type in Elasticsearch - private String cleanupType; - - /** - * Creates a new instance of SessionTree. - * - * @param props: the Mudrod configuration - * @param es: the Elasticsearch drive - * @param rootData: root node of the tree - * @param sessionID: session ID - * @param cleanupType: session type - */ - public SessionTree(Properties props, ESDriver es, SessionNode rootData, String sessionID, String cleanupType) { - super(props, es, null); - root = new SessionNode("root", "root", "", "", 0); - tmpnode = root; - this.sessionID = sessionID; - this.cleanupType = cleanupType; - } - - /** - * Creates a new instance of SessionTree. - * - * @param props: the Mudrod configuration - * @param es: the Elasticsearch drive - * @param sessionID: session ID - * @param cleanupType: session type - */ - public SessionTree(Properties props, ESDriver es, String sessionID, String cleanupType) { - super(props, es, null); - root = new SessionNode("root", "root", "", "", 0); - root.setParent(root); - tmpnode = root; - this.sessionID = sessionID; - this.cleanupType = cleanupType; - } - - /** - * insert: insert a node into the session tree. - * - * @param node {@link SessionNode} - * @return session node - */ - public SessionNode insert(SessionNode node) { - // begin with datasetlist - if (node.getKey().equals("datasetlist")) { - this.binsert = true; - } - if (!this.binsert) { - return null; - } - // remove unrelated node - if (!node.getKey().equals("datasetlist") && !node.getKey().equals("dataset") && !node.getKey().equals("ftp")) { - return null; - } - // remove dumplicated click - if (node.getRequest().equals(tmpnode.getRequest())) { - return null; - } - // search insert node - SessionNode parentnode = this.searchParentNode(node); - if (parentnode == null) { - return null; - } - node.setParent(parentnode); - parentnode.addChildren(node); - - // record insert node - tmpnode = node; - if ("dataset".equals(node.getKey())) { - latestDatasetnode = node; - } - - size++; - return node; - } - - /** - * printTree: Print session tree - * - * @param node root node of the session tree - */ - public void printTree(SessionNode node) { - LOG.info("node: {} \n", node.getRequest()); - if (node.children.isEmpty()) { - for (int i = 0; i < node.children.size(); i++) { - printTree(node.children.get(i)); - } - } - } - - /** - * TreeToJson: Convert the session tree to Json object - * - * @param node node of the session tree - * @return tree content in Json format - */ - public JsonObject treeToJson(SessionNode node) { - Gson gson = new Gson(); - JsonObject json = new JsonObject(); - - json.addProperty("seq", node.getSeq()); - if ("datasetlist".equals(node.getKey())) { - json.addProperty("icon", "./resources/images/searching.png"); - json.addProperty("name", node.getRequest()); - } else if ("dataset".equals(node.getKey())) { - json.addProperty("icon", "./resources/images/viewing.png"); - json.addProperty("name", node.getDatasetId()); - } else if ("ftp".equals(node.getKey())) { - json.addProperty("icon", "./resources/images/downloading.png"); - json.addProperty("name", node.getRequest()); - } else if ("root".equals(node.getKey())) { - json.addProperty("name", ""); - json.addProperty("icon", "./resources/images/users.png"); - } - - if (!node.children.isEmpty()) { - List<JsonObject> jsonChildren = new ArrayList<>(); - for (int i = 0; i < node.children.size(); i++) { - JsonObject jsonChild = treeToJson(node.children.get(i)); - jsonChildren.add(jsonChild); - } - JsonElement jsonElement = gson.toJsonTree(jsonChildren); - json.add("children", jsonElement); - } - - return json; - } - - /** - * getClickStreamList: Get click stream list in the session - * - * @return {@link ClickStream} - */ - public List<ClickStream> getClickStreamList() { - - List<ClickStream> clickthroughs = new ArrayList<>(); - List<SessionNode> viewnodes = this.getViewNodes(this.root); - for (int i = 0; i < viewnodes.size(); i++) { - - SessionNode viewnode = viewnodes.get(i); - SessionNode parent = viewnode.getParent(); - List<SessionNode> children = viewnode.getChildren(); - - if (!"datasetlist".equals(parent.getKey())) { - continue; - } - - RequestUrl requestURL = new RequestUrl(); - String viewquery = ""; - try { - String infoStr = requestURL.getSearchInfo(viewnode.getRequest()); - viewquery = es.customAnalyzing(props.getProperty("indexName"), infoStr); - } catch (UnsupportedEncodingException | InterruptedException | ExecutionException e) { - LOG.warn("Exception getting search info. Ignoring...", e); - } - - String dataset = viewnode.getDatasetId(); - boolean download = false; - for (int j = 0; j < children.size(); j++) { - SessionNode child = children.get(j); - if ("ftp".equals(child.getKey())) { - download = true; - break; - } - } - - if (viewquery != null && !"".equals(viewquery)) { - String[] queries = viewquery.trim().split(","); - if (queries.length > 0) { - for (int k = 0; k < queries.length; k++) { - ClickStream data = new ClickStream(queries[k], dataset, download); - data.setSessionId(this.sessionID); - data.setType(this.cleanupType); - clickthroughs.add(data); - } - } - } - } - - return clickthroughs; - } - - /** - * searchParentNode:Get parent node of a session node - * - * @param node {@link SessionNode} - * @return node {@link SessionNode} - */ - private SessionNode searchParentNode(SessionNode node) { - - String nodeKey = node.getKey(); - - if ("datasetlist".equals(nodeKey)) { - if ("-".equals(node.getReferer())) { - return root; - } else { - SessionNode tmp = this.findLatestRefer(tmpnode, node.getReferer()); - if (tmp == null) { - return root; - } else { - return tmp; - } - } - } else if ("dataset".equals(nodeKey)) { - if ("-".equals(node.getReferer())) { - return null; - } else { - return this.findLatestRefer(tmpnode, node.getReferer()); - } - } else if ("ftp".equals(nodeKey)) { - return latestDatasetnode; - } - - return tmpnode; - } - - /** - * findLatestRefer: Find parent node whose visiting url is equal to the refer - * url of a session node - * - * @param node: {@link SessionNode} - * @param refer: request url - * @return - */ - private SessionNode findLatestRefer(SessionNode node, String refer) { - while (true) { - if ("root".equals(node.getKey())) { - return null; - } - SessionNode parentNode = node.getParent(); - if (refer.equals(parentNode.getRequest())) { - return parentNode; - } - - SessionNode tmp = this.iterChild(parentNode, refer); - if (tmp == null) { - node = parentNode; - continue; - } else { - return tmp; - } - } - } - - /** - * iterChild: - * - * @param start - * @param refer - * @return - */ - private SessionNode iterChild(SessionNode start, String refer) { - List<SessionNode> children = start.getChildren(); - for (int i = children.size() - 1; i >= 0; i--) { - SessionNode tmp = children.get(i); - if (tmp.getChildren().isEmpty()) { - if (refer.equals(tmp.getRequest())) { - return tmp; - } else { - continue; - } - } else { - iterChild(tmp, refer); - } - } - - return null; - } - - /** - * check: - * - * @param children - * @param str - * @return - */ - private boolean check(List<SessionNode> children, String str) { - for (int i = 0; i < children.size(); i++) { - if (children.get(i).key.equals(str)) { - return true; - } - } - return false; - } - - /** - * insertHelperChildren: - * - * @param entry - * @param children - * @return - */ - private boolean insertHelperChildren(SessionNode entry, List<SessionNode> children) { - for (int i = 0; i < children.size(); i++) { - boolean result = insertHelper(entry, children.get(i)); - if (result) { - return result; - } - } - return false; - - } - - /** - * insertHelper: - * - * @param entry - * @param node - * @return - */ - private boolean insertHelper(SessionNode entry, SessionNode node) { - if ("datasetlist".equals(entry.key) || "dataset".equals(entry.key)) { - if ("datasetlist".equals(node.key)) { - if (node.children.isEmpty()) { - node.children.add(entry); - return true; - } else { - boolean flag = check(node.children, "datasetlist"); - if (!flag) { - node.children.add(entry); - return true; - } else { - insertHelperChildren(entry, node.children); - } - } - } else { - insertHelperChildren(entry, node.children); - } - } else if ("ftp".equals(entry.key)) { - if ("dataset".equals(node.key)) { - if (node.children.isEmpty()) { - node.children.add(entry); - return true; - } else { - boolean flag = check(node.children, "dataset"); - if (!flag) { - node.children.add(entry); - return true; - } else { - insertHelperChildren(entry, node.children); - } - } - } else { - insertHelperChildren(entry, node.children); - } - } - - return false; - } - - /** - * getViewNodes: Get a session node's child nodes whose key is "dataset". - * - * @param node - * @return a list of session node - */ - private List<SessionNode> getViewNodes(SessionNode node) { - - List<SessionNode> viewnodes = new ArrayList<>(); - if ("dataset".equals(node.getKey())) { - viewnodes.add(node); - } - - if (!node.children.isEmpty()) { - for (int i = 0; i < node.children.size(); i++) { - SessionNode childNode = node.children.get(i); - viewnodes.addAll(getViewNodes(childNode)); - } - } - - return viewnodes; - } - - private List<SessionNode> getQueryNodes(SessionNode node) { - return this.getNodes(node, "datasetlist"); - } - - private List<SessionNode> getNodes(SessionNode node, String nodeKey) { - - List<SessionNode> nodes = new ArrayList<>(); - if (node.getKey().equals(nodeKey)) { - nodes.add(node); - } - - if (!node.children.isEmpty()) { - for (int i = 0; i < node.children.size(); i++) { - SessionNode childNode = node.children.get(i); - nodes.addAll(getNodes(childNode, nodeKey)); - } - } - - return nodes; - } - - /** - * Obtain the ranking training data. - * - * @param indexName the index from whcih to obtain the data - * @param sessionID a valid session identifier - * @return {@link ClickStream} - * @throws UnsupportedEncodingException if there is an error whilst - * processing the ranking training data. - */ - public List<RankingTrainData> getRankingTrainData(String indexName, String sessionID) throws UnsupportedEncodingException { - - List<RankingTrainData> trainDatas = new ArrayList<>(); - - List<SessionNode> queryNodes = this.getQueryNodes(this.root); - for (int i = 0; i < queryNodes.size(); i++) { - SessionNode querynode = queryNodes.get(i); - List<SessionNode> children = querynode.getChildren(); - - LinkedHashMap<String, Boolean> datasetOpt = new LinkedHashMap<>(); - int ndownload = 0; - for (int j = 0; j < children.size(); j++) { - SessionNode node = children.get(j); - if ("dataset".equals(node.getKey())) { - Boolean bDownload = false; - List<SessionNode> nodeChildren = node.getChildren(); - int childSize = nodeChildren.size(); - for (int k = 0; k < childSize; k++) { - if ("ftp".equals(nodeChildren.get(k).getKey())) { - bDownload = true; - ndownload += 1; - break; - } - } - datasetOpt.put(node.datasetId, bDownload); - } - } - - // method 1: The priority of download data are higher - if (datasetOpt.size() > 1 && ndownload > 0) { - // query - RequestUrl requestURL = new RequestUrl(); - String queryUrl = querynode.getRequest(); - String infoStr = requestURL.getSearchInfo(queryUrl); - String query = null; - try { - query = es.customAnalyzing(props.getProperty("indexName"), infoStr); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Error performing custom analyzing", e); - } - Map<String, String> filter = RequestUrl.getFilterInfo(queryUrl); - - for (String datasetA : datasetOpt.keySet()) { - Boolean bDownloadA = datasetOpt.get(datasetA); - if (bDownloadA) { - for (String datasetB : datasetOpt.keySet()) { - Boolean bDownloadB = datasetOpt.get(datasetB); - if (!bDownloadB) { - - String[] queries = query.split(","); - for (int l = 0; l < queries.length; l++) { - RankingTrainData trainData = new RankingTrainData(queries[l], datasetA, datasetB); - - trainData.setSessionId(this.sessionID); - trainData.setIndex(indexName); - trainData.setFilter(filter); - trainDatas.add(trainData); - } - } - } - } - } - } - } - - return trainDatas; - } -}
