http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java deleted file mode 100644 index e3392e4..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.weblog.structure; - -import java.io.Serializable; - -/** - * This class represents an Apache access log line. See - * http://httpd.apache.org/docs/2.2/logs.html for more details. - */ -public class WebLog implements Serializable { - String LogType; - String IP; - String Time; - String Request; - double Bytes; - - public String getLogType() { - return this.LogType; - } - - public String getIP() { - return this.IP; - } - - public String getTime() { - return this.Time; - } - - public String getRequest() { - return this.Request; - } - - public double getBytes() { - return this.Bytes; - } - - public WebLog() { - - } - - public static String SwithtoNum(String time) { - if (time.contains("Jan")) { - time = time.replace("Jan", "1"); - } else if (time.contains("Feb")) { - time = time.replace("Feb", "2"); - } else if (time.contains("Mar")) { - time = time.replace("Mar", "3"); - } else if (time.contains("Apr")) { - time = time.replace("Apr", "4"); - } else if (time.contains("May")) { - time = time.replace("May", "5"); - } else if (time.contains("Jun")) { - time = time.replace("Jun", "6"); - } else if (time.contains("Jul")) { - time = time.replace("Jul", "7"); - } else if (time.contains("Aug")) { - time = time.replace("Aug", "8"); - } else if (time.contains("Sep")) { - time = time.replace("Sep", "9"); - } else if (time.contains("Oct")) { - time = time.replace("Oct", "10"); - } else if (time.contains("Nov")) { - time = time.replace("Nov", "11"); - } else if (time.contains("Dec")) { - time = time.replace("Dec", "12"); - } - return time; - } - - public static boolean checknull(String s) { - if (s.equals("{}")) { - return false; - } - return true; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java deleted file mode 100644 index 7aa9898..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes data structure needed for web log analysis - */ -package gov.nasa.jpl.mudrod.weblog.structure; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/DiscoveryEngineAbstract.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/DiscoveryEngineAbstract.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/DiscoveryEngineAbstract.java new file mode 100644 index 0000000..a3a7b58 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/DiscoveryEngineAbstract.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.discoveryengine; + +import java.io.Serializable; +import java.util.Properties; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; + +public abstract class DiscoveryEngineAbstract extends MudrodAbstract implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + + public DiscoveryEngineAbstract(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Abstract method of preprocess + */ + public abstract void preprocess(); + + /** + * Abstract method of process + */ + public abstract void process(); + + /** + * Abstract method of output + */ + public abstract void output(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/DiscoveryStepAbstract.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/DiscoveryStepAbstract.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/DiscoveryStepAbstract.java new file mode 100644 index 0000000..66da114 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/DiscoveryStepAbstract.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.discoveryengine; + +import java.util.Properties; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; + +/* + * Generic class of discovery engine step + */ +public abstract class DiscoveryStepAbstract extends MudrodAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public DiscoveryStepAbstract(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Abstract class of step execution without parameter + * + * @return An instance of Object + */ + public abstract Object execute(); + + /** + * Abstract class of step execution with parameter + * + * @param o an instance of object + * @return An instance of object + */ + public abstract Object execute(Object o); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MetadataDiscoveryEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MetadataDiscoveryEngine.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MetadataDiscoveryEngine.java new file mode 100644 index 0000000..a069f96 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MetadataDiscoveryEngine.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.discoveryengine; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.metadata.pre.ApiHarvester; +import org.apache.sdap.mudrod.metadata.pre.MatrixGenerator; +import org.apache.sdap.mudrod.metadata.process.MetadataAnalyzer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Properties; + +/** + * Supports to preprocess and process metadata + */ +public class MetadataDiscoveryEngine extends DiscoveryEngineAbstract implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(MetadataDiscoveryEngine.class); + + public MetadataDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of preprocessing metadata + */ + public void preprocess() { + LOG.info("Starting metadata preprocessing..."); + startTime = System.currentTimeMillis(); + + DiscoveryStepAbstract harvester = new ApiHarvester(this.props, this.es, this.spark); + harvester.execute(); + + endTime = System.currentTimeMillis(); + LOG.info("Finished metadata preprocessing. Time elapsed: {}s", (endTime - startTime) / 1000); + } + + /** + * Method of processing metadata + */ + public void process() { + LOG.info("Starting metadata processing..."); + startTime = System.currentTimeMillis(); + + DiscoveryStepAbstract matrix = new MatrixGenerator(this.props, this.es, this.spark); + matrix.execute(); + + DiscoveryStepAbstract svd = new MetadataAnalyzer(this.props, this.es, this.spark); + svd.execute(); + + endTime = System.currentTimeMillis(); + LOG.info("Finished metadata processing. Time elapsed: {}s", (endTime - startTime) / 1000); + } + + public void output() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MudrodAbstract.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MudrodAbstract.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MudrodAbstract.java new file mode 100644 index 0000000..d62c627 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MudrodAbstract.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.discoveryengine; + +import org.apache.commons.io.IOUtils; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckForNull; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.Properties; + +/** + * This is the most generic class of Mudrod + */ +public abstract class MudrodAbstract implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(MudrodAbstract.class); + /** + * + */ + private static final long serialVersionUID = 1L; + protected Properties props = new Properties(); + protected ESDriver es = null; + protected SparkDriver spark = null; + protected long startTime; + protected long endTime; + + protected static final String ES_SETTINGS = "elastic_settings.json"; + protected static final String ES_MAPPINGS = "elastic_mappings.json"; + + public MudrodAbstract(Properties props, ESDriver es, SparkDriver spark) { + this.props = props; + this.es = es; + this.spark = spark; + + if (this.props != null) { + this.initMudrod(); + } + } + + /** + * Method of setting up essential configuration for MUDROD to start + */ + @CheckForNull + protected void initMudrod() { + InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS); + InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS); + JSONObject settingsJSON = null; + JSONObject mappingJSON = null; + + try { + settingsJSON = new JSONObject(IOUtils.toString(settingsStream)); + } catch (JSONException | IOException e1) { + LOG.error("Error reading Elasticsearch settings!", e1); + } + + try { + mappingJSON = new JSONObject(IOUtils.toString(mappingsStream)); + } catch (JSONException | IOException e1) { + LOG.error("Error reading Elasticsearch mappings!", e1); + } + + try { + if (settingsJSON != null && mappingJSON != null) { + this.es.putMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME), settingsJSON.toString(), mappingJSON.toString()); + } + } catch (IOException e) { + LOG.error("Error entering Elasticsearch Mappings!", e); + } + } + + /** + * Get driver of Elasticsearch + * + * @return driver of Elasticsearch + */ + public ESDriver getES() { + return this.es; + } + + /** + * Get configuration of MUDROD (read from configuration file) + * + * @return configuration of MUDROD + */ + public Properties getConfig() { + return this.props; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/OntologyDiscoveryEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/OntologyDiscoveryEngine.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/OntologyDiscoveryEngine.java new file mode 100644 index 0000000..48df7de --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/OntologyDiscoveryEngine.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.discoveryengine; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.ontology.pre.AggregateTriples; +import org.apache.sdap.mudrod.ontology.process.OntologyLinkCal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Supports to preprocess and process ontology + */ +public class OntologyDiscoveryEngine extends DiscoveryEngineAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(OntologyDiscoveryEngine.class); + + public OntologyDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of preprocessing ontology + */ + public void preprocess() { + LOG.info("*****************Ontology preprocessing starts******************"); + startTime = System.currentTimeMillis(); + + DiscoveryStepAbstract at = new AggregateTriples(this.props, this.es, this.spark); + at.execute(); + + endTime = System.currentTimeMillis(); + LOG.info("*****************Ontology preprocessing ends******************Took {}s", (endTime - startTime) / 1000); + } + + /** + * Method of processing ontology + */ + public void process() { + LOG.info("*****************Ontology processing starts******************"); + startTime = System.currentTimeMillis(); + + DiscoveryStepAbstract ol = new OntologyLinkCal(this.props, this.es, this.spark); + ol.execute(); + + endTime = System.currentTimeMillis(); + LOG.info("*****************Ontology processing ends******************Took {}s", (endTime - startTime) / 1000); + } + + public void output() { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/RecommendEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/RecommendEngine.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/RecommendEngine.java new file mode 100644 index 0000000..2c829ff --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/RecommendEngine.java @@ -0,0 +1,77 @@ +package org.apache.sdap.mudrod.discoveryengine; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.recommendation.pre.ImportMetadata; +import org.apache.sdap.mudrod.recommendation.pre.MetadataTFIDFGenerator; +import org.apache.sdap.mudrod.recommendation.pre.NormalizeVariables; +import org.apache.sdap.mudrod.recommendation.pre.SessionCooccurence; +import org.apache.sdap.mudrod.recommendation.process.AbstractBasedSimilarity; +import org.apache.sdap.mudrod.recommendation.process.VariableBasedSimilarity; +import org.apache.sdap.mudrod.recommendation.process.sessionBasedCF; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class RecommendEngine extends DiscoveryEngineAbstract { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(RecommendEngine.class); + + public RecommendEngine(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + LOG.info("Started Mudrod Recommend Engine."); + } + + @Override + public void preprocess() { + LOG.info("*****************Recommendation preprocessing starts******************"); + + startTime = System.currentTimeMillis(); + + DiscoveryStepAbstract harvester = new ImportMetadata(this.props, this.es, this.spark); + harvester.execute(); + + DiscoveryStepAbstract tfidf = new MetadataTFIDFGenerator(this.props, this.es, this.spark); + tfidf.execute(); + + DiscoveryStepAbstract sessionMatrixGen = new SessionCooccurence(this.props, this.es, this.spark); + sessionMatrixGen.execute(); + + DiscoveryStepAbstract transformer = new NormalizeVariables(this.props, this.es, this.spark); + transformer.execute(); + + endTime = System.currentTimeMillis(); + + LOG.info("*****************Recommendation preprocessing ends******************Took {}s {}", (endTime - startTime) / 1000); + } + + @Override + public void process() { + // TODO Auto-generated method stub + LOG.info("*****************Recommendation processing starts******************"); + + startTime = System.currentTimeMillis(); + + DiscoveryStepAbstract tfCF = new AbstractBasedSimilarity(this.props, this.es, this.spark); + tfCF.execute(); + + DiscoveryStepAbstract cbCF = new VariableBasedSimilarity(this.props, this.es, this.spark); + cbCF.execute(); + + DiscoveryStepAbstract sbCF = new sessionBasedCF(this.props, this.es, this.spark); + sbCF.execute(); + + endTime = System.currentTimeMillis(); + + LOG.info("*****************Recommendation processing ends******************Took {}s {}", (endTime - startTime) / 1000); + } + + @Override + public void output() { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/WeblogDiscoveryEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/WeblogDiscoveryEngine.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/WeblogDiscoveryEngine.java new file mode 100644 index 0000000..b672a54 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/WeblogDiscoveryEngine.java @@ -0,0 +1,199 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.discoveryengine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.weblog.pre.*; +import org.apache.sdap.mudrod.weblog.process.ClickStreamAnalyzer; +import org.apache.sdap.mudrod.weblog.process.UserHistoryAnalyzer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Supports to preprocess and process web log + */ +public class WeblogDiscoveryEngine extends DiscoveryEngineAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(WeblogDiscoveryEngine.class); + public String timeSuffix = null; + + public WeblogDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + LOG.info("Started Mudrod Weblog Discovery Engine."); + } + + /** + * Get log file list from a directory + * + * @param logDir path to directory containing logs either local or in HDFS. + * @return a list of log files + */ + public List<String> getFileList(String logDir) { + + ArrayList<String> inputList = new ArrayList<>(); + if (!logDir.startsWith("hdfs://")) { + File directory = new File(logDir); + File[] fList = directory.listFiles(); + for (File file : fList) { + if (file.isFile() && file.getName().matches(".*\\d+.*") && file.getName().contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) { + inputList.add(file.getName().replace(props.getProperty(MudrodConstants.HTTP_PREFIX), "")); + } + } + } else { + Configuration conf = new Configuration(); + try (FileSystem fs = FileSystem.get(new URI(logDir), conf)) { + FileStatus[] fileStatus; + fileStatus = fs.listStatus(new Path(logDir)); + for (FileStatus status : fileStatus) { + String path1 = status.getPath().toString(); + if (path1.matches(".*\\d+.*") && path1.contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) { + + String time = path1.substring(path1.lastIndexOf('.') + 1); + inputList.add(time); + } + } + } catch (IllegalArgumentException | IOException | URISyntaxException e) { + LOG.error("An error occured whilst obtaining the log file list.", e); + } + } + + return inputList; + } + + /** + * Method of preprocessing web logs, generating vocab similarity based on web + * logs + */ + @Override + public void preprocess() { + LOG.info("Starting Web log preprocessing."); + + ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR)); + + for (int i = 0; i < inputList.size(); i++) { + timeSuffix = inputList.get(i); + props.put(MudrodConstants.TIME_SUFFIX, timeSuffix); + startTime = System.currentTimeMillis(); + LOG.info("Processing logs dated {}", inputList.get(i)); + + DiscoveryStepAbstract im = new ImportLogFile(this.props, this.es, this.spark); + im.execute(); + + DiscoveryStepAbstract cd = new CrawlerDetection(this.props, this.es, this.spark); + cd.execute(); + + DiscoveryStepAbstract sg = new SessionGenerator(this.props, this.es, this.spark); + sg.execute(); + + DiscoveryStepAbstract ss = new SessionStatistic(this.props, this.es, this.spark); + ss.execute(); + + DiscoveryStepAbstract rr = new RemoveRawLog(this.props, this.es, this.spark); + rr.execute(); + + endTime = System.currentTimeMillis(); + + LOG.info("Web log preprocessing for logs dated {} complete. Time elapsed {} seconds.", inputList.get(i), (endTime - startTime) / 1000); + } + + DiscoveryStepAbstract hg = new HistoryGenerator(this.props, this.es, this.spark); + hg.execute(); + + DiscoveryStepAbstract cg = new ClickStreamGenerator(this.props, this.es, this.spark); + cg.execute(); + + LOG.info("Web log preprocessing (user history and clickstream) complete."); + } + + /** + * Method of web log ingest + */ + public void logIngest() { + LOG.info("Starting Web log ingest."); + ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR)); + for (int i = 0; i < inputList.size(); i++) { + timeSuffix = inputList.get(i); + props.put("TimeSuffix", timeSuffix); + DiscoveryStepAbstract im = new ImportLogFile(this.props, this.es, this.spark); + im.execute(); + } + + LOG.info("Web log ingest complete."); + + } + + /** + * Method of reconstructing user sessions from raw web logs + */ + public void sessionRestruct() { + LOG.info("Starting Session reconstruction."); + ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR)); + for (int i = 0; i < inputList.size(); i++) { + timeSuffix = inputList.get(i); // change timeSuffix dynamically + props.put(MudrodConstants.TIME_SUFFIX, timeSuffix); + DiscoveryStepAbstract cd = new CrawlerDetection(this.props, this.es, this.spark); + cd.execute(); + + DiscoveryStepAbstract sg = new SessionGenerator(this.props, this.es, this.spark); + sg.execute(); + + DiscoveryStepAbstract ss = new SessionStatistic(this.props, this.es, this.spark); + ss.execute(); + + DiscoveryStepAbstract rr = new RemoveRawLog(this.props, this.es, this.spark); + rr.execute(); + + endTime = System.currentTimeMillis(); + } + LOG.info("Session reconstruction complete."); + } + + @Override + public void process() { + LOG.info("Starting Web log processing."); + startTime = System.currentTimeMillis(); + + DiscoveryStepAbstract svd = new ClickStreamAnalyzer(this.props, this.es, this.spark); + svd.execute(); + + DiscoveryStepAbstract ua = new UserHistoryAnalyzer(this.props, this.es, this.spark); + ua.execute(); + + endTime = System.currentTimeMillis(); + LOG.info("Web log processing complete. Time elaspsed {} seconds.", (endTime - startTime) / 1000); + } + + @Override + public void output() { + // not implemented yet! + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/package-info.java new file mode 100644 index 0000000..40e7ead --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes abstract classes of MUDROD, discovery step, and engine. + * Workflow classes such as weblogDiscoveryEngine, OntologyDiscoveryEngine, and + * MetadataDiscoveryEngine are also included here. + */ +package org.apache.sdap.mudrod.discoveryengine; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java new file mode 100644 index 0000000..7fe0cb4 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java @@ -0,0 +1,573 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.driver; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import com.google.gson.GsonBuilder; + +import org.apache.commons.lang.StringUtils; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.main.MudrodEngine; +import org.apache.sdap.mudrod.utils.ESTransportClient; +import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse; +import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Fuzziness; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.node.Node; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.SuggestBuilder; +import org.elasticsearch.search.suggest.SuggestBuilders; +import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetAddress; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Driver implementation for all Elasticsearch functionality. + */ +public class ESDriver implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(ESDriver.class); + private static final long serialVersionUID = 1L; + private transient Client client = null; + private transient Node node = null; + private transient BulkProcessor bulkProcessor = null; + + /** + * Default constructor for this class. To load client configuration call + * substantiated constructor. + */ + public ESDriver() { + // Default constructor, to load configuration call ESDriver(props) + } + + /** + * Substantiated constructor which accepts a {@link java.util.Properties} + * + * @param props a populated properties object. + */ + public ESDriver(Properties props) { + try { + setClient(makeClient(props)); + } catch (IOException e) { + LOG.error("Error whilst constructing Elastcisearch client.", e); + } + } + + public void createBulkProcessor() { + LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", 1000, 2500500); + setBulkProcessor(BulkProcessor.builder(getClient(), new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + LOG.debug("ESDriver#createBulkProcessor @Override #beforeBulk is not implemented yet!"); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + LOG.debug("ESDriver#createBulkProcessor @Override #afterBulk is not implemented yet!"); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + LOG.error("Bulk request has failed!"); + throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure); + } + }).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1) + .build()); + } + + public void destroyBulkProcessor() { + try { + getBulkProcessor().awaitClose(20, TimeUnit.MINUTES); + setBulkProcessor(null); + refreshIndex(); + } catch (InterruptedException e) { + LOG.error("Error destroying the Bulk Processor.", e); + } + } + + public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException { + + boolean exists = getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists(); + if (exists) { + return; + } + + getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet(); + getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet(); + } + + public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException { + return this.customAnalyzing(indexName, "cody", str); + } + + public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException { + String[] strList = str.toLowerCase().split(","); + for (int i = 0; i < strList.length; i++) { + String tmp = ""; + AnalyzeResponse r = client.admin().indices().prepareAnalyze(strList[i]).setIndex(indexName).setAnalyzer(analyzer).execute().get(); + for (AnalyzeToken token : r.getTokens()) { + tmp += token.getTerm() + " "; + } + strList[i] = tmp.trim(); + } + return String.join(",", strList); + } + + public List<String> customAnalyzing(String indexName, List<String> list) throws InterruptedException, ExecutionException { + if (list == null) { + return list; + } + int size = list.size(); + List<String> customlist = new ArrayList<>(); + for (int i = 0; i < size; i++) { + customlist.add(this.customAnalyzing(indexName, list.get(i))); + } + + return customlist; + } + + public void deleteAllByQuery(String index, String type, QueryBuilder query) { + createBulkProcessor(); + SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new TimeValue(60000)).setQuery(query).setSize(10000).execute() + .actionGet(); + + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + DeleteRequest deleteRequest = new DeleteRequest(index, type, hit.getId()); + getBulkProcessor().add(deleteRequest); + } + + scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + + } + destroyBulkProcessor(); + } + + public void deleteType(String index, String type) { + this.deleteAllByQuery(index, type, QueryBuilders.matchAllQuery()); + } + + public List<String> getTypeListWithPrefix(Object object, Object object2) { + ArrayList<String> typeList = new ArrayList<>(); + GetMappingsResponse res; + try { + res = getClient().admin().indices().getMappings(new GetMappingsRequest().indices(object.toString())).get(); + ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(object.toString()); + for (ObjectObjectCursor<String, MappingMetaData> c : mapping) { + if (c.key.startsWith(object2.toString())) { + typeList.add(c.key); + } + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error whilst obtaining type list from Elasticsearch mappings.", e); + } + return typeList; + } + + public List<String> getIndexListWithPrefix(Object object) { + + LOG.info("Retrieving index list with prefix: {}", object.toString()); + String[] indices = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices(); + + ArrayList<String> indexList = new ArrayList<>(); + int length = indices.length; + for (int i = 0; i < length; i++) { + String indexName = indices[i]; + if (indexName.startsWith(object.toString())) { + indexList.add(indexName); + } + } + + return indexList; + } + + public String searchByQuery(String index, String type, String query) throws IOException, InterruptedException, ExecutionException { + return searchByQuery(index, type, query, false); + } + + @SuppressWarnings("unchecked") + public String searchByQuery(String index, String type, String query, Boolean bDetail) throws IOException, InterruptedException, ExecutionException { + boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); + if (!exists) { + return null; + } + + QueryBuilder qb = QueryBuilders.queryStringQuery(query); + SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet(); + + // Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same. + Map<String, String> fieldsToReturn = new HashMap<>(); + + fieldsToReturn.put("Dataset-ShortName", "Short Name"); + fieldsToReturn.put("Dataset-LongName", "Long Name"); + fieldsToReturn.put("DatasetParameter-Topic", "Topic"); + fieldsToReturn.put("Dataset-Description", "Dataset-Description"); + fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date"); + + if (bDetail) { + fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat"); + fieldsToReturn.put("Dataset-Doi", "Dataset-Doi"); + fieldsToReturn.put("Dataset-ProcessingLevel", "Processing Level"); + fieldsToReturn.put("DatasetCitation-Version", "Version"); + fieldsToReturn.put("DatasetSource-Sensor-ShortName", "DatasetSource-Sensor-ShortName"); + fieldsToReturn.put("DatasetProject-Project-ShortName", "DatasetProject-Project-ShortName"); + fieldsToReturn.put("DatasetParameter-Category", "DatasetParameter-Category"); + fieldsToReturn.put("DatasetLocationPolicy-BasePath", "DatasetLocationPolicy-BasePath"); + fieldsToReturn.put("DatasetParameter-Variable-Full", "DatasetParameter-Variable-Full"); + fieldsToReturn.put("DatasetParameter-Term-Full", "DatasetParameter-Term-Full"); + fieldsToReturn.put("DatasetParameter-VariableDetail", "DatasetParameter-VariableDetail"); + + fieldsToReturn.put("DatasetRegion-Region", "Region"); + fieldsToReturn.put("DatasetCoverage-NorthLat", "NorthLat"); + fieldsToReturn.put("DatasetCoverage-SouthLat", "SouthLat"); + fieldsToReturn.put("DatasetCoverage-WestLon", "WestLon"); + fieldsToReturn.put("DatasetCoverage-EastLon", "EastLon"); + fieldsToReturn.put("DatasetCoverage-StartTimeLong-Long", "DatasetCoverage-StartTimeLong-Long"); + fieldsToReturn.put("Dataset-DatasetCoverage-StopTimeLong", "Dataset-DatasetCoverage-StopTimeLong"); + + fieldsToReturn.put("Dataset-TemporalResolution", "Dataset-TemporalResolution"); + fieldsToReturn.put("Dataset-TemporalRepeat", "Dataset-TemporalRepeat"); + fieldsToReturn.put("Dataset-LatitudeResolution", "Dataset-LatitudeResolution"); + fieldsToReturn.put("Dataset-LongitudeResolution", "Dataset-LongitudeResolution"); + fieldsToReturn.put("Dataset-AcrossTrackResolution", "Dataset-AcrossTrackResolution"); + fieldsToReturn.put("Dataset-AlongTrackResolution", "Dataset-AlongTrackResolution"); + } + + List<Map<String, Object>> searchResults = new ArrayList<>(); + + for (SearchHit hit : response.getHits().getHits()) { + Map<String, Object> source = hit.getSource(); + + Map<String, Object> searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey())) + .collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue)); + + // searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result + + // Some results require special handling/formatting: + // Release Date formatting + LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList<String>) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate(); + searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE)); + + if (bDetail) { + + // DataFormat value, translate RAW to BINARY + if ("RAW".equals(searchResult.get("DataFormat"))) { + searchResult.put("DataFormat", "BINARY"); + } + + // DatasetLocationPolicy-BasePath Should only contain ftp, http, or https URLs + List<String> urls = ((List<String>) searchResult.get("DatasetLocationPolicy-BasePath")).stream().filter(url -> url.startsWith("ftp") || url.startsWith("http")).collect(Collectors.toList()); + searchResult.put("DatasetLocationPolicy-BasePath", urls); + + // Time Span Formatting + LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate(); + LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ? + null : + Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate(); + searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE))); + + // Temporal resolution can come from one of two fields + searchResult.put("TemporalResolution", "".equals(searchResult.get("Dataset-TemporalResolution")) ? searchResult.get("Dataset-TemporalRepeat") : searchResult.get("Dataset-TemporalResolution")); + + // Special formatting for spatial resolution + String latResolution = (String) searchResult.get("Dataset-LatitudeResolution"); + String lonResolution = (String) searchResult.get("Dataset-LongitudeResolution"); + if (!latResolution.isEmpty() && !lonResolution.isEmpty()) { + searchResult.put("SpatialResolution", latResolution + " degrees (latitude) x " + lonResolution + " degrees (longitude)"); + } else { + String acrossResolution = (String) searchResult.get("Dataset-AcrossTrackResolution"); + String alonResolution = (String) searchResult.get("Dataset-AlongTrackResolution"); + double dAcrossResolution = Double.parseDouble(acrossResolution) / 1000; + double dAlonResolution = Double.parseDouble(alonResolution) / 1000; + searchResult.put("SpatialResolution", dAlonResolution + " km (Along) x " + dAcrossResolution + " km (Across)"); + } + + // Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies. + List<List<String>> measurements = buildMeasurementHierarchies((List<String>) searchResult.get("Topic"), (List<String>) searchResult.get("DatasetParameter-Term-Full"), + (List<String>) searchResult.get("DatasetParameter-Variable-Full"), (List<String>) searchResult.get("DatasetParameter-VariableDetail")); + + searchResult.put("Measurements", measurements); + + } + + searchResults.add(searchResult); + } + + Map<String, List<?>> pdResults = new HashMap<>(); + pdResults.put("PDResults", searchResults); + + return new GsonBuilder().create().toJson(pdResults); + } + + /** + * Builds a List of Measurement Hierarchies given the individual source lists. + * The hierarchy is built from the element in the same position from each input list in the order: Topic -> Term -> Variable -> VariableDetail + * "None" and blank strings are ignored. If, at any level, an element does not exist for that position or it is "None" or blank, that hierarchy is considered complete. + * + * For example, if the input is: + * <pre> + * topics = ["Oceans", "Oceans"] + * terms = ["Sea Surface Topography", "Ocean Waves"] + * variables = ["Sea Surface Height", "Significant Wave Height"] + * variableDetails = ["None", "None"] + * </pre> + * + * The output would be: + * <pre> + * [ + * ["Oceans", "Sea Surface Topography", "Sea Surface Height"], + * ["Oceans", "Ocean Waves", "Significant Wave Height"] + * ] + * </pre> + * Oceans > Sea Surface Topography > Sea Surface Height + * Oceans > Ocean Waves > Significant Wave Height + * + * @param topics List of topics, the first element of a measurement + * @param terms List of terms, the second element of a measurement + * @param variables List of variables, the third element of a measurement + * @param variableDetails List of variable details, the fourth element of a measurement + * + * @return A List where each element is a single hierarchy (as a List) built from the provided input lists. + */ + private List<List<String>> buildMeasurementHierarchies(List<String> topics, List<String> terms, List<String> variables, List<String> variableDetails) { + + List<List<String>> measurements = new ArrayList<>(); + + for (int x = 0; x < topics.size(); x++) { + measurements.add(new ArrayList<>()); + measurements.get(x).add(topics.get(x)); + // Only add the next 'level' if we can + if (x < terms.size() && !"None".equalsIgnoreCase(terms.get(x)) && StringUtils.isNotBlank(terms.get(x))) { + measurements.get(x).add(terms.get(x)); + if (x < variables.size() && !"None".equalsIgnoreCase(variables.get(x)) && StringUtils.isNotBlank(variables.get(x))) { + measurements.get(x).add(variables.get(x)); + if (x < variableDetails.size() && !"None".equalsIgnoreCase(variableDetails.get(x)) && StringUtils.isNotBlank(variableDetails.get(x))) { + measurements.get(x).add(variableDetails.get(x)); + } + } + } + } + + return measurements; + + } + + public List<String> autoComplete(String index, String term) { + boolean exists = this.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); + if (!exists) { + return new ArrayList<>(); + } + + Set<String> suggestHS = new HashSet<String>(); + List<String> suggestList = new ArrayList<>(); + + // please make sure that the completion field is configured in the ES mapping + CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100); + SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder)); + SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet(); + + Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator(); + + while (iterator.hasNext()) { + Suggest.Suggestion.Entry.Option next = iterator.next(); + String suggest = next.getText().string().toLowerCase(); + suggestList.add(suggest); + } + + suggestHS.addAll(suggestList); + suggestList.clear(); + suggestList.addAll(suggestHS); + return suggestList; + } + + public void close() { + client.close(); + } + + public void refreshIndex() { + client.admin().indices().prepareRefresh().execute().actionGet(); + } + + /** + * Generates a TransportClient or NodeClient + * + * @param props a populated {@link java.util.Properties} object + * @return a constructed {@link org.elasticsearch.client.Client} + * @throws IOException if there is an error building the + * {@link org.elasticsearch.client.Client} + */ + protected Client makeClient(Properties props) throws IOException { + String clusterName = props.getProperty(MudrodConstants.ES_CLUSTER); + String hostsString = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS); + String[] hosts = hostsString.split(","); + String portStr = props.getProperty(MudrodConstants.ES_TRANSPORT_TCP_PORT); + int port = Integer.parseInt(portStr); + + Settings.Builder settingsBuilder = Settings.builder(); + + // Set the cluster name and build the settings + if (!clusterName.isEmpty()) + settingsBuilder.put("cluster.name", clusterName); + + settingsBuilder.put("http.type", "netty3"); + settingsBuilder.put("transport.type", "netty3"); + + Settings settings = settingsBuilder.build(); + + Client client = null; + + // Prefer TransportClient + if (hosts != null && port > 1) { + TransportClient transportClient = new ESTransportClient(settings); + for (String host : hosts) + transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); + client = transportClient; + } else if (clusterName != null) { + node = new Node(settings); + client = node.client(); + } + + return client; + } + + /** + * Main method used to invoke the ESDriver implementation. + * + * @param args no arguments are required to invoke the Driver. + */ + public static void main(String[] args) { + MudrodEngine mudrodEngine = new MudrodEngine(); + ESDriver es = new ESDriver(mudrodEngine.loadConfig()); + es.getTypeListWithPrefix("podaacsession", "sessionstats"); + } + + /** + * @return the client + */ + public Client getClient() { + return client; + } + + /** + * @param client the client to set + */ + public void setClient(Client client) { + this.client = client; + } + + /** + * @return the bulkProcessor + */ + public BulkProcessor getBulkProcessor() { + return bulkProcessor; + } + + /** + * @param bulkProcessor the bulkProcessor to set + */ + public void setBulkProcessor(BulkProcessor bulkProcessor) { + this.bulkProcessor = bulkProcessor; + } + + public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) { + + UpdateRequest ur = null; + try { + ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject()); + } catch (IOException e) { + LOG.error("Error whilst attempting to generate a new Update Request.", e); + } + + return ur; + } + + public UpdateRequest generateUpdateRequest(String index, String type, String id, Map<String, Object> filedValueMap) { + + UpdateRequest ur = null; + try { + XContentBuilder builder = jsonBuilder().startObject(); + for (Entry<String, Object> entry : filedValueMap.entrySet()) { + String key = entry.getKey(); + builder.field(key, filedValueMap.get(key)); + } + builder.endObject(); + ur = new UpdateRequest(index, type, id).doc(builder); + } catch (IOException e) { + LOG.error("Error whilst attempting to generate a new Update Request.", e); + } + + return ur; + } + + public int getDocCount(String index, String... type) { + MatchAllQueryBuilder search = QueryBuilders.matchAllQuery(); + String[] indexArr = new String[] { index }; + return this.getDocCount(indexArr, type, search); + } + + public int getDocCount(String[] index, String[] type) { + MatchAllQueryBuilder search = QueryBuilders.matchAllQuery(); + return this.getDocCount(index, type, search); + } + + public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch) { + SearchRequestBuilder countSrBuilder = getClient().prepareSearch(index).setTypes(type).setQuery(filterSearch).setSize(0); + SearchResponse countSr = countSrBuilder.execute().actionGet(); + int docCount = (int) countSr.getHits().getTotalHits(); + return docCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/driver/SparkDriver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/driver/SparkDriver.java b/core/src/main/java/org/apache/sdap/mudrod/driver/SparkDriver.java new file mode 100644 index 0000000..c6b0aef --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/driver/SparkDriver.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.driver; + +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.serializer.KryoSerializer; +import org.apache.spark.sql.SQLContext; + +import java.io.File; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.util.Properties; +//import org.apache.spark.sql.SparkSession; + +public class SparkDriver implements Serializable { + + //TODO the commented out code below is the API uprgade + //for Spark 2.0.0. It requires a large upgrade and simplification + //across the mudrod codebase so should be done in an individual ticket. + // /** + // * + // */ + // private static final long serialVersionUID = 1L; + // private SparkSession builder; + // + // public SparkDriver() { + // builder = SparkSession.builder() + // .master("local[2]") + // .config("spark.hadoop.validateOutputSpecs", "false") + // .config("spark.files.overwrite", "true") + // .getOrCreate(); + // } + // + // public SparkSession getBuilder() { + // return builder; + // } + // + // public void setBuilder(SparkSession builder) { + // this.builder = builder; + // } + // + // public void close() { + // builder.stop(); + // } + + /** + * + */ + private static final long serialVersionUID = 1L; + public transient JavaSparkContext sc; + public transient SQLContext sqlContext; + + public SparkDriver() { + // empty default constructor + } + + public SparkDriver(Properties props) { + SparkConf conf = new SparkConf().setAppName(props.getProperty(MudrodConstants.SPARK_APP_NAME, "MudrodSparkApp")).setIfMissing("spark.master", props.getProperty(MudrodConstants.SPARK_MASTER)) + .set("spark.hadoop.validateOutputSpecs", "false").set("spark.files.overwrite", "true"); + + String esHost = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS); + String esPort = props.getProperty(MudrodConstants.ES_HTTP_PORT); + + if (!"".equals(esHost)) { + conf.set("es.nodes", esHost); + } + + if (!"".equals(esPort)) { + conf.set("es.port", esPort); + } + + conf.set("spark.serializer", KryoSerializer.class.getName()); + conf.set("es.batch.size.entries", "1500"); + + sc = new JavaSparkContext(conf); + sqlContext = new SQLContext(sc); + } + + public void close() { + sc.sc().stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/driver/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/driver/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/driver/package-info.java new file mode 100644 index 0000000..79ab611 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/driver/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes commonly used Elasticsearch and Spark related + * functions + */ +package org.apache.sdap.mudrod.driver; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java new file mode 100644 index 0000000..edb97ca --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java @@ -0,0 +1,325 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.integration; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.DecimalFormat; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * Supports ability to integrate vocab similarity results from metadata, ontology, and web logs. + */ +public class LinkageIntegration extends DiscoveryStepAbstract { + + private static final Logger LOG = LoggerFactory.getLogger(LinkageIntegration.class); + private static final long serialVersionUID = 1L; + transient List<LinkedTerm> termList = new ArrayList<>(); + DecimalFormat df = new DecimalFormat("#.00"); + private static final String INDEX_NAME = "indexName"; + private static final String WEIGHT = "weight"; + + public LinkageIntegration(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * The data structure to store semantic triple. + */ + class LinkedTerm { + String term = null; + double weight = 0; + String model = null; + + public LinkedTerm(String str, double w, String m) { + term = str; + weight = w; + model = m; + } + } + + /** + * Method of executing integration step + */ + @Override + public Object execute() { + getIngeratedList("ocean wind", 11); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + /** + * Method of getting integrated results + * + * @param input query string + * @return a hash map where the string is a related term, and double is the + * similarity to the input query + */ + public Map<String, Double> appyMajorRule(String input) { + termList = new ArrayList<>(); + Map<String, Double> termsMap = new HashMap<>(); + Map<String, List<LinkedTerm>> map = new HashMap<>(); + try { + map = aggregateRelatedTermsFromAllmodel(es.customAnalyzing(props.getProperty(INDEX_NAME), input)); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error applying majority rule", e); + } + + for (Entry<String, List<LinkedTerm>> entry : map.entrySet()) { + List<LinkedTerm> list = entry.getValue(); + double sumModelWeight = 0; + double tmp = 0; + for (LinkedTerm element : list) { + sumModelWeight += getModelweight(element.model); + + if (element.weight > tmp) { + tmp = element.weight; + } + } + + double finalWeight = tmp + ((sumModelWeight - 2) * 0.05); + if (finalWeight < 0) { + finalWeight = 0; + } + + if (finalWeight > 1) { + finalWeight = 1; + } + termsMap.put(entry.getKey(), Double.parseDouble(df.format(finalWeight))); + } + + return sortMapByValue(termsMap); + } + + /** + * Method of getting integrated results + * + * @param input query string + * @param num the number of most related terms + * @return a string of related terms along with corresponding similarities + */ + public String getIngeratedList(String input, int num) { + String output = ""; + Map<String, Double> sortedMap = appyMajorRule(input); + int count = 0; + for (Entry<String, Double> entry : sortedMap.entrySet()) { + if (count < num) { + output += entry.getKey() + " = " + entry.getValue() + ", "; + } + count++; + } + LOG.info("\n************************Integrated results***************************"); + LOG.info(output); + return output; + } + + /** + * Method of getting integrated results + * + * @param input query string + * @return a JSON object of related terms along with corresponding similarities + */ + public JsonObject getIngeratedListInJson(String input) { + Map<String, Double> sortedMap = appyMajorRule(input); + int count = 0; + Map<String, Double> trimmedMap = new LinkedHashMap<>(); + for (Entry<String, Double> entry : sortedMap.entrySet()) { + if (!entry.getKey().contains("china")) { + if (count < 10) { + trimmedMap.put(entry.getKey(), entry.getValue()); + } + count++; + } + } + + return mapToJson(trimmedMap); + } + + /** + * Method of aggregating terms from web logs, metadata, and ontology + * + * @param input query string + * @return a hash map where the string is a related term, and the list is + * the similarities from different sources + */ + public Map<String, List<LinkedTerm>> aggregateRelatedTermsFromAllmodel(String input) { + aggregateRelatedTerms(input, props.getProperty("userHistoryLinkageType")); + aggregateRelatedTerms(input, props.getProperty("clickStreamLinkageType")); + aggregateRelatedTerms(input, props.getProperty("metadataLinkageType")); + aggregateRelatedTermsSWEET(input, props.getProperty("ontologyLinkageType")); + + return termList.stream().collect(Collectors.groupingBy(w -> w.term)); + } + + public int getModelweight(String model) { + if (model.equals(props.getProperty("userHistoryLinkageType"))) { + return Integer.parseInt(props.getProperty("userHistory_w")); + } + + if (model.equals(props.getProperty("clickStreamLinkageType"))) { + return Integer.parseInt(props.getProperty("clickStream_w")); + } + + if (model.equals(props.getProperty("metadataLinkageType"))) { + return Integer.parseInt(props.getProperty("metadata_w")); + } + + if (model.equals(props.getProperty("ontologyLinkageType"))) { + return Integer.parseInt(props.getProperty("ontology_w")); + } + + return 999999; + } + + /** + * Method of extracting the related term from a comma string + * + * @param str input string + * @param input query string + * @return related term contained in the input string + */ + public String extractRelated(String str, String input) { + String[] strList = str.split(","); + if (input.equals(strList[0])) { + return strList[1]; + } else { + return strList[0]; + } + } + + public void aggregateRelatedTerms(String input, String model) { + //get the first 10 related terms + SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("keywords", input)).addSort(WEIGHT, SortOrder.DESC).setSize(11) + .execute().actionGet(); + + LOG.info("\n************************ {} results***************************", model); + for (SearchHit hit : usrhis.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String keywords = (String) result.get("keywords"); + String relatedKey = extractRelated(keywords, input); + + if (!relatedKey.equals(input)) { + LinkedTerm lTerm = new LinkedTerm(relatedKey, (double) result.get(WEIGHT), model); + LOG.info("( {} {} )", relatedKey, (double) result.get(WEIGHT)); + termList.add(lTerm); + } + + } + } + + /** + * Method of querying related terms from ontology + * + * @param input input query + * @param model source name + */ + public void aggregateRelatedTermsSWEET(String input, String model) { + SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC) + .setSize(11).execute().actionGet(); + LOG.info("\n************************ {} results***************************", model); + for (SearchHit hit : usrhis.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String conceptB = (String) result.get("concept_B"); + if (!conceptB.equals(input)) { + LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), model); + LOG.info("( {} {} )", conceptB, (double) result.get(WEIGHT)); + termList.add(lTerm); + } + } + } + + /** + * Method of sorting a map by value + * + * @param passedMap input map + * @return sorted map + */ + public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) { + List<String> mapKeys = new ArrayList<>(passedMap.keySet()); + List<Double> mapValues = new ArrayList<>(passedMap.values()); + Collections.sort(mapValues, Collections.reverseOrder()); + Collections.sort(mapKeys, Collections.reverseOrder()); + + LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>(); + + Iterator<Double> valueIt = mapValues.iterator(); + while (valueIt.hasNext()) { + Object val = valueIt.next(); + Iterator<String> keyIt = mapKeys.iterator(); + + while (keyIt.hasNext()) { + Object key = keyIt.next(); + String comp1 = passedMap.get(key).toString(); + String comp2 = val.toString(); + + if (comp1.equals(comp2)) { + passedMap.remove(key); + mapKeys.remove(key); + sortedMap.put((String) key, (Double) val); + break; + } + + } + + } + return sortedMap; + } + + /** + * Method of converting hashmap to JSON + * + * @param word input query + * @param wordweights a map from related terms to weights + * @return converted JSON object + */ + private JsonObject mapToJson(Map<String, Double> wordweights) { + Gson gson = new Gson(); + JsonObject json = new JsonObject(); + List<JsonObject> nodes = new ArrayList<>(); + + for (Entry<String, Double> entry : wordweights.entrySet()) { + JsonObject node = new JsonObject(); + String key = entry.getKey(); + Double value = entry.getValue(); + node.addProperty("word", key); + node.addProperty("weight", value); + nodes.add(node); + } + + JsonElement nodesElement = gson.toJsonTree(nodes); + json.add("ontology", nodesElement); + + return json; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/integration/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/integration/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/integration/package-info.java new file mode 100644 index 0000000..4ed6a07 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/integration/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes integration method of web log, ontology, and metdata + * mining results. + */ +package org.apache.sdap.mudrod.integration; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java new file mode 100644 index 0000000..d9435d9 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.main; + +import org.apache.sdap.mudrod.ontology.Ontology; + +/** + * Class contains static constant keys and values relating to Mudrod + * configuration properties. Property values are read from <a href= + * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a> + */ +public interface MudrodConstants { + + public static final String CLEANUP_TYPE_PREFIX = "Cleanup_type_prefix"; + + public static final String CLICK_STREAM_LINKAGE_TYPE = "clickStreamLinkageType"; + + public static final String CLICK_STREAM_MATRIX_TYPE = "clickStreamMatrixType"; + + public static final String CLICKSTREAM_SVD_DIM = "clickstreamSVDDimension"; + + public static final String CLICKSTREAM_W = "clickStream_w"; + + public static final String COMMENT_TYPE = "commentType"; + + /** Defined on CLI */ + public static final String DATA_DIR = "dataDir"; + + public static final String DOWNLOAD_F = "downloadf"; + + public static final String DOWNLOAD_WEIGHT = "downloadWeight"; + + public static final String ES_CLUSTER = "clusterName"; + + public static final String ES_TRANSPORT_TCP_PORT = "ES_Transport_TCP_Port"; + + public static final String ES_UNICAST_HOSTS = "ES_unicast_hosts"; + + public static final String ES_HTTP_PORT = "ES_HTTP_port"; + + public static final String ES_INDEX_NAME = "indexName"; + + public static final String FTP_PREFIX = "ftpPrefix"; + + public static final String FTP_TYPE_PREFIX = "FTP_type_prefix"; + + public static final String HTTP_PREFIX = "httpPrefix"; + + public static final String HTTP_TYPE_PREFIX = "HTTP_type_prefix"; + + public static final String LOG_INDEX = "logIndexName"; + + public static final String METADATA_LINKAGE_TYPE = "metadataLinkageType"; + + public static final String METADATA_SVD_DIM = "metadataSVDDimension"; + + public static final String METADATA_URL = "metadataurl"; + + public static final String METADATA_W = "metadata_w"; + + public static final String MINI_USER_HISTORY = "mini_userHistory"; + + public static final String MUDROD = "mudrod"; + + /** Defined on CLI */ + public static final String MUDROD_CONFIG = "MUDROD_CONFIG"; + /** + * An {@link Ontology} implementation. + */ + public static final String ONTOLOGY_IMPL = MUDROD + "ontology.implementation"; + + public static final String ONTOLOGY_LINKAGE_TYPE = "ontologyLinkageType"; + + public static final String ONTOLOGY_W = "ontology_w"; + + public static final String PROCESS_TYPE = "processingType"; + + /** Defined on CLI */ + public static final String RAW_METADATA_PATH = "raw_metadataPath"; + + public static final String RAW_METADATA_TYPE = "raw_metadataType"; + + public static final String SEARCH_F = "searchf"; + + public static final String SENDING_RATE = "sendingrate"; + + public static final String SESSION_PORT = "SessionPort"; + + public static final String SESSION_STATS_PREFIX = "SessionStats_prefix"; + + public static final String SESSION_URL = "SessionUrl"; + + public static final String SPARK_APP_NAME = "spark.app.name"; + + public static final String SPARK_MASTER = "spark.master"; + /** + * Absolute local location of javaSVMWithSGDModel directory. This is typically + * <code>file:///usr/local/mudrod/core/src/main/resources/javaSVMWithSGDModel</code> + */ + public static final String SVM_SGD_MODEL = "svmSgdModel"; + + public static final String TIMEGAP = "timegap"; + + public static final String TIME_SUFFIX = "TimeSuffix"; + + public static final String USE_HISTORY_LINKAGE_TYPE = "userHistoryLinkageType"; + + public static final String USER_HISTORY_W = "userHistory_w"; + + public static final String VIEW_F = "viewf"; + +}
