http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/MetadataOpt.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/MetadataOpt.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/MetadataOpt.java new file mode 100644 index 0000000..69dc878 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/MetadataOpt.java @@ -0,0 +1,150 @@ +package gov.nasa.jpl.mudrod.recommendation.structure; + +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; +import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix; +import gov.nasa.jpl.mudrod.utils.MatrixUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.*; + +public class MetadataOpt implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + private String indexName; + private String metadataType; + private List<String> variables; + + public static final String SPLIT_BLANK = " "; + public static final String SPLIT_COMMA = ","; + + public MetadataOpt(Properties props) { + indexName = props.getProperty(MudrodConstants.ES_INDEX_NAME); + metadataType = props.getProperty("recom_metadataType"); + + variables = new ArrayList<>(); + variables.add("DatasetParameter-Term"); + variables.add("DatasetParameter-Variable"); + variables.add("Dataset-Description"); + variables.add("Dataset-LongName"); + } + + public JavaPairRDD<String, String> loadAll(ESDriver es, SparkDriver spark) throws Exception { + List<Tuple2<String, String>> datasetsTokens = this.loadMetadataFromES(es, variables); + return this.parallizeData(spark, datasetsTokens); + } + + public JavaPairRDD<String, String> loadAll(ESDriver es, SparkDriver spark, List<String> variables) throws Exception { + List<Tuple2<String, String>> datasetsTokens = this.loadMetadataFromES(es, variables); + return this.parallizeData(spark, datasetsTokens); + } + + private JavaPairRDD<String, String> parallizeData(SparkDriver spark, List<Tuple2<String, String>> datasetContent) { + + JavaRDD<Tuple2<String, String>> datasetContentRDD = spark.sc.parallelize(datasetContent); + + return datasetContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, String> call(Tuple2<String, String> term) throws Exception { + return term; + } + }); + + } + + public JavaPairRDD<String, List<String>> tokenizeData(JavaPairRDD<String, String> datasetsContentRDD, String splitter) throws Exception { + + return datasetsContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, List<String>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, List<String>> call(Tuple2<String, String> arg) throws Exception { + String content = arg._2; + List<String> tokens = getTokens(content, splitter); + + return new Tuple2<>(arg._1, tokens); + } + }); + + } + + public List<String> getTokens(String str, String splitter) throws Exception { + String[] tokens = null; + if (splitter.equals(SPLIT_BLANK)) { + tokens = str.split(" "); + } else if (splitter.equals(SPLIT_COMMA)) { + tokens = str.split(","); + } + return java.util.Arrays.asList(tokens); + } + + public List<Tuple2<String, String>> loadMetadataFromES(ESDriver es, List<String> variables) throws Exception { + + SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setQuery(QueryBuilders.matchAllQuery()).setScroll(new TimeValue(60000)).setSize(100).execute() + .actionGet(); + + List<Tuple2<String, String>> datasetsTokens = new ArrayList<>(); + while (true) { + + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String shortName = (String) result.get("Dataset-ShortName"); + + String filedStr = ""; + int size = variables.size(); + for (int i = 0; i < size; i++) { + String filed = variables.get(i); + Object filedValue = result.get(filed); + + if (filedValue != null) { + filedStr = es.customAnalyzing(indexName, filedValue.toString()); + } + } + + datasetsTokens.add(new Tuple2<String, String>(shortName, filedStr)); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + return datasetsTokens; + } + + public LabeledRowMatrix tFIDFTokens(JavaPairRDD<String, List<String>> datasetTokensRDD, SparkDriver spark) { + + LabeledRowMatrix labelMatrix = MatrixUtil.createDocWordMatrix(datasetTokensRDD, spark.sc); + + RowMatrix docwordMatrix = labelMatrix.rowMatrix; + + RowMatrix docwordTFIDFMatrix = MatrixUtil.createTFIDFMatrix(docwordMatrix); + + labelMatrix.rowMatrix = docwordTFIDFMatrix; + + return labelMatrix; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/RecomData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/RecomData.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/RecomData.java new file mode 100644 index 0000000..9025156 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/RecomData.java @@ -0,0 +1,196 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.recommendation.structure; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodEngine; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.*; + +/** + * This class is used to test recommendation result similarity and session-level + * similarity + */ +public class RecomData extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + protected transient List<LinkedTerm> termList = new ArrayList<>(); + DecimalFormat df = new DecimalFormat("#.00"); + protected static final String INDEX_NAME = "indexName"; + private static final String WEIGHT = "weight"; + + class LinkedTerm { + public String term = null; + public double weight = 0; + public String model = null; + + public LinkedTerm(String str, double w, String m) { + term = str; + weight = w; + model = m; + } + } + + public RecomData(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + public JsonObject getRecomDataInJson(String input, int num) { + String type = props.getProperty("metadataTermTFIDFSimType"); + Map<String, Double> sortedOBSimMap = getRelatedData(type, input, num + 5); + JsonElement linkedJson = mapToJson(sortedOBSimMap, num); + + // type = props.getProperty("metadataTermTFIDFSimType"); + type = props.getProperty("metadataCodeSimType"); + + Map<String, Double> sortedMBSimMap = getRelatedData(type, input, num + 5); + JsonElement relatedJson = mapToJson(sortedMBSimMap, num); + + JsonObject json = new JsonObject(); + + json.add("TFIDFSim", linkedJson); + json.add("TopicSim", relatedJson); + + return json; + } + + protected JsonElement mapToJson(Map<String, Double> wordweights, int num) { + Gson gson = new Gson(); + + List<JsonObject> nodes = new ArrayList<>(); + Set<String> words = wordweights.keySet(); + int i = 0; + for (String wordB : words) { + JsonObject node = new JsonObject(); + node.addProperty("name", wordB); + node.addProperty("weight", wordweights.get(wordB)); + nodes.add(node); + + i += 1; + if (i >= num) { + break; + } + } + + String nodesJson = gson.toJson(nodes); + JsonElement nodesElement = gson.fromJson(nodesJson, JsonElement.class); + + return nodesElement; + } + + public Map<String, Double> getRelatedData(String type, String input, int num) { + termList = new ArrayList<>(); + Map<String, Double> termsMap = new HashMap<>(); + Map<String, Double> sortedMap = new HashMap<>(); + try { + List<LinkedTerm> links = getRelatedDataFromES(type, input, num); + int size = links.size(); + for (int i = 0; i < size; i++) { + termsMap.put(links.get(i).term, links.get(i).weight); + } + + sortedMap = sortMapByValue(termsMap); // terms_map will be empty + } catch (Exception e) { + e.printStackTrace(); + } + + return sortedMap; + } + + public List<LinkedTerm> getRelatedDataFromES(String type, String input, int num) { + SearchRequestBuilder builder = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(type).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC) + .setSize(num); + + SearchResponse usrhis = builder.execute().actionGet(); + + for (SearchHit hit : usrhis.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String conceptB = (String) result.get("concept_B"); + + if (!conceptB.equals(input)) { + LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), type); + termList.add(lTerm); + } + } + + return termList; + } + + public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) { + List<String> mapKeys = new ArrayList<>(passedMap.keySet()); + List<Double> mapValues = new ArrayList<>(passedMap.values()); + Collections.sort(mapValues, Collections.reverseOrder()); + Collections.sort(mapKeys, Collections.reverseOrder()); + + LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>(); + + Iterator<Double> valueIt = mapValues.iterator(); + while (valueIt.hasNext()) { + Object val = valueIt.next(); + Iterator<String> keyIt = mapKeys.iterator(); + + while (keyIt.hasNext()) { + Object key = keyIt.next(); + String comp1 = passedMap.get(key).toString(); + String comp2 = val.toString(); + + if (comp1.equals(comp2)) { + passedMap.remove(key); + mapKeys.remove(key); + sortedMap.put((String) key, (Double) val); + break; + } + } + } + return sortedMap; + } + + public static void main(String[] args) throws IOException { + + MudrodEngine me = new MudrodEngine(); + Properties props = me.loadConfig(); + ESDriver es = new ESDriver(me.getConfig()); + RecomData test = new RecomData(props, es, null); + + String input = "AQUARIUS_L3_SSS_SMIA_MONTHLY-CLIMATOLOGY_V4"; + JsonObject json = test.getRecomDataInJson(input, 10); + + System.out.println(json.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/package-info.java new file mode 100644 index 0000000..99199ca --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/package-info.java @@ -0,0 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes the data structure required by recommendation module. + */ +package gov.nasa.jpl.mudrod.recommendation.structure; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/semantics/SVDAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/semantics/SVDAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/semantics/SVDAnalyzer.java new file mode 100644 index 0000000..3e63b04 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/semantics/SVDAnalyzer.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.semantics; + +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.utils.MatrixUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * ClassName: SVDAnalyzer Function: Analyze semantic relationship through SVD + * method + */ +public class SVDAnalyzer extends SemanticAnalyzer { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Creates a new instance of SVDAnalyzer. + * + * @param props the Mudrod configuration + * @param es the Elasticsearch drive + * @param spark the spark drive + */ + public SVDAnalyzer(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * GetSVDMatrix: Create SVD matrix csv file from original csv file. + * + * @param csvFileName each row is a term, and each column is a document. + * @param svdDimention Dimension of SVD matrix + * @param svdMatrixFileName CSV file name of SVD matrix + */ + public void getSVDMatrix(String csvFileName, int svdDimention, String svdMatrixFileName) { + + JavaPairRDD<String, Vector> importRDD = MatrixUtil.loadVectorFromCSV(spark, csvFileName, 1); + JavaRDD<Vector> vectorRDD = importRDD.values(); + RowMatrix wordDocMatrix = new RowMatrix(vectorRDD.rdd()); + RowMatrix tfidfMatrix = MatrixUtil.createTFIDFMatrix(wordDocMatrix); + RowMatrix svdMatrix = MatrixUtil.buildSVDMatrix(tfidfMatrix, svdDimention); + + List<String> rowKeys = importRDD.keys().collect(); + List<String> colKeys = new ArrayList<>(); + for (int i = 0; i < svdDimention; i++) { + colKeys.add("dimension" + i); + } + MatrixUtil.exportToCSV(svdMatrix, rowKeys, colKeys, svdMatrixFileName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/semantics/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/semantics/SemanticAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/semantics/SemanticAnalyzer.java new file mode 100644 index 0000000..be8b2b3 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/semantics/SemanticAnalyzer.java @@ -0,0 +1,148 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.semantics; + +import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.utils.LinkageTriple; +import gov.nasa.jpl.mudrod.utils.MatrixUtil; +import gov.nasa.jpl.mudrod.utils.SimilarityUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * ClassName: SemanticAnalyzer Function: Semantic analyzer + */ +public class SemanticAnalyzer extends MudrodAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Creates a new instance of SemanticAnalyzer. + * + * @param props + * the Mudrod configuration + * @param es + * the Elasticsearch drive + * @param spark + * the spark drive + */ + public SemanticAnalyzer(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Calculate term similarity from CSV matrix. + * + * @param csvFileName + * csv file of matrix, each row is a term, and each column is a + * dimension in feature space + * @return Linkage triple list + */ + public List<LinkageTriple> calTermSimfromMatrix(String csvFileName) { + File f = new File(csvFileName); + if (!f.exists()) { + return null; + } + return this.calTermSimfromMatrix(csvFileName, 1); + } + + /** + * Calculate term similarity from CSV matrix. + * + * @param csvFileName csv file of matrix, each row is a term, and each column is a + * dimension in feature space + * @param skipRow number of rows to skip in input CSV file e.g. header + * @return Linkage triple list + */ + public List<LinkageTriple> calTermSimfromMatrix(String csvFileName, int skipRow) { + + JavaPairRDD<String, Vector> importRDD = MatrixUtil.loadVectorFromCSV(spark, csvFileName, skipRow); + if (importRDD == null || importRDD.values().first().size() == 0) { + return null; + } + + CoordinateMatrix simMatrix = SimilarityUtil.calculateSimilarityFromVector(importRDD.values()); + JavaRDD<String> rowKeyRDD = importRDD.keys(); + return SimilarityUtil.matrixToTriples(rowKeyRDD, simMatrix); + } + + /** + * Calculate term similarity from CSV matrix. + * + * @param csvFileName csv file of matrix, each row is a term, and each column is a + * dimension in feature space + * @param simType the type of similary calculation to execute e.g. + * <ul> + * <li>{@link gov.nasa.jpl.mudrod.utils.SimilarityUtil#SIM_COSINE} - 3,</li> + * <li>{@link gov.nasa.jpl.mudrod.utils.SimilarityUtil#SIM_HELLINGER} - 2,</li> + * <li>{@link gov.nasa.jpl.mudrod.utils.SimilarityUtil#SIM_PEARSON} - 1</li> + * </ul> + * @param skipRow number of rows to skip in input CSV file e.g. header + * @return Linkage triple list + */ + public List<LinkageTriple> calTermSimfromMatrix(String csvFileName, int simType, int skipRow) { + + JavaPairRDD<String, Vector> importRDD = MatrixUtil.loadVectorFromCSV(spark, csvFileName, skipRow); + if (importRDD.values().first().size() == 0) { + return null; + } + + JavaRDD<LinkageTriple> triples = SimilarityUtil.calculateSimilarityFromVector(importRDD, simType); + + return triples.collect(); + } + + public void saveToES(List<LinkageTriple> tripleList, String index, String type) { + try { + LinkageTriple.insertTriples(es, tripleList, index, type); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Method of saving linkage triples to Elasticsearch. + * + * @param tripleList + * linkage triple list + * @param index + * index name + * @param type + * type name + * @param bTriple + * bTriple + * @param bSymmetry + * bSymmetry + */ + public void saveToES(List<LinkageTriple> tripleList, String index, String type, boolean bTriple, boolean bSymmetry) { + try { + LinkageTriple.insertTriples(es, tripleList, index, type, bTriple, bSymmetry); + } catch (IOException e) { + e.printStackTrace(); + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/semantics/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/semantics/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/semantics/package-info.java new file mode 100644 index 0000000..9c2e8ac --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/semantics/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes SVD transformation function, methods of calculating + * similarity from CSV, and saving triples into Elasticsearch + */ +package gov.nasa.jpl.mudrod.semantics; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ClickstreamImporter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ClickstreamImporter.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ClickstreamImporter.java new file mode 100644 index 0000000..5cb130c --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ClickstreamImporter.java @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch; + +import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to import click stream data into Elasticsearch + * through .csv file + */ +public class ClickstreamImporter extends MudrodAbstract { + /** + * + */ + private static final long serialVersionUID = 1L; + + public ClickstreamImporter(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + addClickStreamMapping(); + } + + /** + * Method to add Elasticsearch mapping for click stream data + */ + public void addClickStreamMapping() { + XContentBuilder Mapping; + try { + Mapping = jsonBuilder().startObject().startObject( + props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).startObject( + "properties").startObject("query").field("type", "string").field( + "index", "not_analyzed").endObject().startObject("dataID").field( + "type", "string").field("index", "not_analyzed").endObject() + + .endObject().endObject().endObject(); + + es.getClient().admin().indices().preparePutMapping( + props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType( + props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).setSource( + Mapping).execute().actionGet(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Method to import click stream CSV into Elasticsearch + */ + public void importfromCSVtoES() { + es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), + props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)); + es.createBulkProcessor(); + + BufferedReader br = null; + String cvsSplitBy = ","; + + try { + br = new BufferedReader(new FileReader(props.getProperty("clickstreamMatrix"))); + String line = br.readLine(); + // first item needs to be skipped + String[] dataList = line.split(cvsSplitBy); + while ((line = br.readLine()) != null) { + String[] clicks = line.split(cvsSplitBy); + for (int i = 1; i < clicks.length; i++) { + if (!"0.0".equals(clicks[i])) { + IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), + props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)) + .source(jsonBuilder().startObject().field("query", clicks[0]).field( + "dataID", dataList[i]).field("clicks", clicks[i]).endObject()); + es.getBulkProcessor().add(ir); + } + } + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (br != null) { + try { + br.close(); + es.destroyBulkProcessor(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Dispatcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Dispatcher.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Dispatcher.java new file mode 100644 index 0000000..a0f3a2c --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Dispatcher.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch; + +import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.integration.LinkageIntegration; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.MultiMatchQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +/** + * Supports ability to transform regular user query into a semantic query + */ +public class Dispatcher extends MudrodAbstract { + private static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class); + + public Dispatcher(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of getting semantically most related terms by number + * + * @param input regular input query + * @param num the number of most related terms + * @return a map from term to similarity + */ + public Map<String, Double> getRelatedTerms(String input, int num) { + LinkageIntegration li = new LinkageIntegration(props, this.es, null); + Map<String, Double> sortedMap = li.appyMajorRule(input); + Map<String, Double> selected_Map = new HashMap<>(); + int count = 0; + for (Entry<String, Double> entry : sortedMap.entrySet()) { + if (count < num) { + selected_Map.put(entry.getKey(), entry.getValue()); + } + count++; + } + return selected_Map; + } + + /** + * Method of getting semantically most related terms by similarity threshold + * + * @param input regular input query + * @param T value of threshold, raning from 0 to 1 + * @return a map from term to similarity + */ + public Map<String, Double> getRelatedTermsByT(String input, double T) { + LinkageIntegration li = new LinkageIntegration(this.props, this.es, null); + Map<String, Double> sortedMap = li.appyMajorRule(input); + Map<String, Double> selected_Map = new HashMap<>(); + + for (Entry<String, Double> entry : sortedMap.entrySet()) { + if (entry.getValue() >= T) { + selected_Map.put(entry.getKey(), entry.getValue()); + } + } + return selected_Map; + } + + /** + * Method of creating semantic query based on Threshold + * + * @param input regular query + * @param T threshold raning from 0 to 1 + * @param query_operator query mode + * @return a multiMatch query builder + */ + public BoolQueryBuilder createSemQuery(String input, double T, String query_operator) { + Map<String, Double> selected_Map = getRelatedTermsByT(input, T); + selected_Map.put(input, (double) 1); + + String fieldsList[] = { "Dataset-Metadata", "Dataset-ShortName", "Dataset-LongName", + "DatasetParameter-Topic", "DatasetParameter-VariableDetail", "DatasetParameter-Category", + "DatasetParameter-Variable", "DatasetParameter-Term", + "DatasetSource-Source-LongName", "DatasetSource-Source-LongName-Full", + "DatasetSource-Source-ShortName", "DatasetSource-Source-ShortName-Full", + "DatasetSource-Sensor-LongName", "DatasetSource-Sensor-LongName-Full", "DatasetSource-Sensor-ShortName", + "DatasetSource-Sensor-ShortName-Full" }; + BoolQueryBuilder qb = new BoolQueryBuilder(); + for (Entry<String, Double> entry : selected_Map.entrySet()) { + if (query_operator.toLowerCase().trim().equals("phrase")) { + qb.should(QueryBuilders.multiMatchQuery(entry.getKey(), fieldsList).boost(entry.getValue().floatValue()).type(MultiMatchQueryBuilder.Type.PHRASE).tieBreaker((float) 0.5)); // when + // set + // to + // 1.0, + // it + // would + // be + // equal + // to + // "most + // fields" + // query + } else if (query_operator.toLowerCase().trim().equals("and")) { + qb.should(QueryBuilders.multiMatchQuery(entry.getKey(), fieldsList).boost(entry.getValue().floatValue()).operator(MatchQueryBuilder.DEFAULT_OPERATOR.AND).tieBreaker((float) 0.5)); + } else { + qb.should(QueryBuilders.multiMatchQuery(entry.getKey(), fieldsList).boost(entry.getValue().floatValue()).operator(MatchQueryBuilder.DEFAULT_OPERATOR.OR).tieBreaker((float) 0.5)); + } + } + + // LOG.info(qb.toString()); + return qb; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Ranker.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Ranker.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Ranker.java new file mode 100644 index 0000000..32830d5 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Ranker.java @@ -0,0 +1,192 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch; + +import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; +import gov.nasa.jpl.mudrod.ssearch.ranking.Learner; +import gov.nasa.jpl.mudrod.ssearch.structure.SResult; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.regression.LabeledPoint; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.*; + +/** + * Supports the ability to calculating ranking score + */ +public class Ranker extends MudrodAbstract implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + transient List<SResult> resultList = new ArrayList<>(); + + String learnerType = null; + Learner le = null; + + public Ranker(Properties props, ESDriver es, SparkDriver spark, String learnerType) { + super(props, es, spark); + this.learnerType = learnerType; + le = new Learner(learnerType, spark, props.getProperty(MudrodConstants.SVM_SGD_MODEL)); + } + + /** + * Method of comparing results based on final score + */ + public class ResultComparator implements Comparator<SResult> { + @Override + public int compare(SResult o1, SResult o2) { + return o2.below.compareTo(o1.below); + } + } + + /** + * Method of calculating mean value + * + * @param attribute the attribute name that need to be calculated on + * @param resultList an array list of result + * @return mean value + */ + private double getMean(String attribute, List<SResult> resultList) { + double sum = 0.0; + for (SResult a : resultList) { + sum += (double) SResult.get(a, attribute); + } + return getNDForm(sum / resultList.size()); + } + + /** + * Method of calculating variance value + * + * @param attribute the attribute name that need to be calculated on + * @param resultList an array list of result + * @return variance value + */ + private double getVariance(String attribute, List<SResult> resultList) { + double mean = getMean(attribute, resultList); + double temp = 0.0; + double val; + for (SResult a : resultList) { + val = (Double) SResult.get(a, attribute); + temp += (mean - val) * (mean - val); + } + + return getNDForm(temp / resultList.size()); + } + + /** + * Method of calculating standard variance + * + * @param attribute the attribute name that need to be calculated on + * @param resultList an array list of result + * @return standard variance + */ + private double getStdDev(String attribute, List<SResult> resultList) { + return getNDForm(Math.sqrt(getVariance(attribute, resultList))); + } + + /** + * Method of calculating Z score + * + * @param val the value of an attribute + * @param mean the mean value of an attribute + * @param std the standard deviation of an attribute + * @return Z score + */ + private double getZscore(double val, double mean, double std) { + if (!equalComp(std, 0)) { + return getNDForm((val - mean) / std); + } else { + return 0; + } + } + + private boolean equalComp(double a, double b) { + return Math.abs(a - b) < 0.0001; + } + + /** + * Get the first N decimals of a double value + * + * @param d double value that needs to be processed + * @return processed double value + */ + private double getNDForm(double d) { + DecimalFormat ndForm = new DecimalFormat("#.###"); + return Double.valueOf(ndForm.format(d)); + } + + /** + * Method of ranking a list of result + * + * @param resultList result list + * @return ranked result list + */ + public List<SResult> rank(List<SResult> resultList) { + for (int i = 0; i < resultList.size(); i++) { + for (int m = 0; m < SResult.rlist.length; m++) { + String att = SResult.rlist[m].split("_")[0]; + double val = SResult.get(resultList.get(i), att); + double mean = getMean(att, resultList); + double std = getStdDev(att, resultList); + double score = getZscore(val, mean, std); + String scoreId = SResult.rlist[m]; + SResult.set(resultList.get(i), scoreId, score); + } + } + + // using collection.sort directly would cause an "not transitive" error + // this is because the training model is not a overfitting model + for (int j = 0; j < resultList.size(); j++) { + for (int k = 0; k < resultList.size(); k++) { + if (k != j) { + resultList.get(j).below += comp(resultList.get(j), resultList.get(k)); + } + } + } + + Collections.sort(resultList, new ResultComparator()); + return resultList; + } + + /** + * Method of compare two search resutls + * + * @param o1 search result 1 + * @param o2 search result 2 + * @return 1 if o1 is greater than o2, 0 otherwise + */ + public int comp(SResult o1, SResult o2) { + List<Double> instList = new ArrayList<>(); + for (int i = 0; i < SResult.rlist.length; i++) { + double o2Score = SResult.get(o2, SResult.rlist[i]); + double o1Score = SResult.get(o1, SResult.rlist[i]); + instList.add(o2Score - o1Score); + } + + double[] ins = instList.stream().mapToDouble(i -> i).toArray(); + LabeledPoint insPoint = new LabeledPoint(99.0, Vectors.dense(ins)); + double prediction = le.classify(insPoint); + if (equalComp(prediction, 1)) { //different from weka where the return value is 1 or 2 + return 0; + } else { + return 1; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Searcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Searcher.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Searcher.java new file mode 100644 index 0000000..f407f92 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/Searcher.java @@ -0,0 +1,282 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.ssearch.structure.SResult; + +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.regex.Pattern; + +/** + * Supports ability to performance semantic search with a given query + */ +public class Searcher extends MudrodAbstract implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + DecimalFormat NDForm = new DecimalFormat("#.##"); + final Integer MAX_CHAR = 700; + + public Searcher(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of converting processing level string into a number + * + * @param pro processing level string + * @return processing level number + */ + public Double getProLevelNum(String pro) { + if (pro == null) { + return 1.0; + } + Double proNum; + Pattern p = Pattern.compile(".*[a-zA-Z].*"); + if (pro.matches("[0-9]{1}[a-zA-Z]{1}")) { + proNum = Double.parseDouble(pro.substring(0, 1)); + } else if (p.matcher(pro).find()) { + proNum = 1.0; + } else { + proNum = Double.parseDouble(pro); + } + + return proNum; + } + + public Double getPop(Double pop) { + if (pop > 1000) { + pop = 1000.0; + } + return pop; + } + + /** + * Method of checking if query exists in a certain attribute + * + * @param strList attribute value in the form of ArrayList + * @param query query string + * @return 1 means query exists, 0 otherwise + */ + public Double exists(ArrayList<String> strList, String query) { + Double val = 0.0; + if (strList != null) { + String str = String.join(", ", strList); + if (str != null && str.length() != 0 && str.toLowerCase().trim().contains(query)) { + val = 1.0; + } + } + return val; + } + + /** + * Main method of semantic search + * + * @param index index name in Elasticsearch + * @param type type name in Elasticsearch + * @param query regular query string + * @param queryOperator query mode- query, or, and + * @return a list of search result + */ + @SuppressWarnings("unchecked") + public List<SResult> searchByQuery(String index, String type, String query, String queryOperator, String rankOption) { + boolean exists = es.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); + if (!exists) { + return new ArrayList<>(); + } + + SortOrder order = null; + String sortFiled = ""; + switch (rankOption) { + case "Rank-AllTimePopularity": + sortFiled = "Dataset-AllTimePopularity"; + order = SortOrder.DESC; + break; + case "Rank-MonthlyPopularity": + sortFiled = "Dataset-MonthlyPopularity"; + order = SortOrder.DESC; + break; + case "Rank-UserPopularity": + sortFiled = "Dataset-UserPopularity"; + order = SortOrder.DESC; + break; + case "Rank-LongName-Full": + sortFiled = "Dataset-LongName.raw"; + order = SortOrder.ASC; + break; + case "Rank-ShortName-Full": + sortFiled = "Dataset-ShortName.raw"; + order = SortOrder.ASC; + break; + case "Rank-GridSpatialResolution": + sortFiled = "Dataset-GridSpatialResolution"; + order = SortOrder.DESC; + break; + case "Rank-SatelliteSpatialResolution": + sortFiled = "Dataset-SatelliteSpatialResolution"; + order = SortOrder.DESC; + break; + case "Rank-StartTimeLong-Long": + sortFiled = "DatasetCoverage-StartTimeLong-Long"; + order = SortOrder.ASC; + break; + case "Rank-StopTimeLong-Long": + sortFiled = "DatasetCoverage-StopTimeLong-Long"; + order = SortOrder.DESC; + break; + default: + sortFiled = "Dataset-ShortName.raw"; + order = SortOrder.ASC; + break; + } + + Dispatcher dp = new Dispatcher(this.getConfig(), this.getES(), null); + BoolQueryBuilder qb = dp.createSemQuery(query, 1.0, queryOperator); + List<SResult> resultList = new ArrayList<>(); + + SearchRequestBuilder builder = es.getClient().prepareSearch(index).setTypes(type).setQuery(qb).addSort(sortFiled, order).setSize(500).setTrackScores(true); + SearchResponse response = builder.execute().actionGet(); + + for (SearchHit hit : response.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + Double relevance = Double.valueOf(NDForm.format(hit.getScore())); + String shortName = (String) result.get("Dataset-ShortName"); + String longName = (String) result.get("Dataset-LongName"); + + ArrayList<String> topicList = (ArrayList<String>) result.get("DatasetParameter-Variable"); + String topic = ""; + if (null != topicList) { + topic = String.join(", ", topicList); + } + String content = (String) result.get("Dataset-Description"); + + if (!"".equals(content)) { + int maxLength = (content.length() < MAX_CHAR) ? content.length() : MAX_CHAR; + content = content.trim().substring(0, maxLength - 1) + "..."; + } + + ArrayList<String> longdate = (ArrayList<String>) result.get("DatasetCitation-ReleaseDateLong"); + Date date = new Date(Long.valueOf(longdate.get(0)).longValue()); + SimpleDateFormat df2 = new SimpleDateFormat("MM/dd/yyyy"); + String dateText = df2.format(date); + + // start date + Long start = (Long) result.get("DatasetCoverage-StartTimeLong-Long"); + Date startDate = new Date(start); + String startDateTxt = df2.format(startDate); + + // end date + String end = (String) result.get("Dataset-DatasetCoverage-StopTimeLong"); + String endDateTxt = ""; + if ("".equals(end)) { + endDateTxt = "Present"; + } else { + Date endDate = new Date(Long.valueOf(end)); + endDateTxt = df2.format(endDate); + } + + String processingLevel = (String) result.get("Dataset-ProcessingLevel"); + Double proNum = getProLevelNum(processingLevel); + + Double userPop = getPop(((Integer) result.get("Dataset-UserPopularity")).doubleValue()); + Double allPop = getPop(((Integer) result.get("Dataset-AllTimePopularity")).doubleValue()); + Double monthPop = getPop(((Integer) result.get("Dataset-MonthlyPopularity")).doubleValue()); + + List<String> sensors = (List<String>) result.get("DatasetSource-Sensor-ShortName"); + + SResult re = new SResult(shortName, longName, topic, content, dateText); + + SResult.set(re, "term", relevance); + SResult.set(re, "releaseDate", Long.valueOf(longdate.get(0)).doubleValue()); + SResult.set(re, "processingLevel", processingLevel); + SResult.set(re, "processingL", proNum); + SResult.set(re, "userPop", userPop); + SResult.set(re, "allPop", allPop); + SResult.set(re, "monthPop", monthPop); + SResult.set(re, "startDate", startDateTxt); + SResult.set(re, "endDate", endDateTxt); + SResult.set(re, "sensors", String.join(", ", sensors)); + + QueryBuilder queryLabelSearch = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("query", query)).must(QueryBuilders.termQuery("dataID", shortName)); + SearchResponse labelRes = es.getClient().prepareSearch(index).setTypes("trainingranking").setQuery(queryLabelSearch).setSize(5).execute().actionGet(); + String labelString = null; + for (SearchHit label : labelRes.getHits().getHits()) { + Map<String, Object> labelItem = label.getSource(); + labelString = (String) labelItem.get("label"); + } + SResult.set(re, "label", labelString); + resultList.add(re); + } + + return resultList; + } + + /** + * Method of semantic search to generate JSON string + * + * @param index index name in Elasticsearch + * @param type type name in Elasticsearch + * @param query regular query string + * @param queryOperator query mode- query, or, and + * @param rr selected ranking method + * @return search results + */ + public String ssearch(String index, String type, String query, String queryOperator, String rankOption, Ranker rr) { + List<SResult> li = searchByQuery(index, type, query, queryOperator, rankOption); + if ("Rank-SVM".equals(rankOption)) { + li = rr.rank(li); + } + Gson gson = new Gson(); + List<JsonObject> fileList = new ArrayList<>(); + + for (int i = 0; i < li.size(); i++) { + JsonObject file = new JsonObject(); + file.addProperty("Short Name", (String) SResult.get(li.get(i), "shortName")); + file.addProperty("Long Name", (String) SResult.get(li.get(i), "longName")); + file.addProperty("Topic", (String) SResult.get(li.get(i), "topic")); + file.addProperty("Description", (String) SResult.get(li.get(i), "description")); + file.addProperty("Release Date", (String) SResult.get(li.get(i), "relase_date")); + fileList.add(file); + + file.addProperty("Start/End Date", (String) SResult.get(li.get(i), "startDate") + " - " + (String) SResult.get(li.get(i), "endDate")); + file.addProperty("Processing Level", (String) SResult.get(li.get(i), "processingLevel")); + + file.addProperty("Sensor", (String) SResult.get(li.get(i), "sensors")); + } + JsonElement fileListElement = gson.toJsonTree(fileList); + + JsonObject pDResults = new JsonObject(); + pDResults.add("PDResults", fileListElement); + return pDResults.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/package-info.java new file mode 100644 index 0000000..da6bea3 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes classes for semantic search, such as click stream importer, + * query dispatcher, semantic searcher, and ranker (ranksvm, ordinal/linear regression) + */ +package gov.nasa.jpl.mudrod.ssearch; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/DataGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/DataGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/DataGenerator.java new file mode 100644 index 0000000..598ad61 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/DataGenerator.java @@ -0,0 +1,313 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch.ranking; + +import au.com.bytecode.opencsv.CSVReader; +import au.com.bytecode.opencsv.CSVWriter; + +import java.io.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * SVMData is a program designed to create appropriate input data for the RankSVM + * algorithm that involves Pairwise Classification. Specifically, instead of working in + * the space of query-document vectors, e.g. x1, x2, x3, we transform them into a new space + * in which a pair of documents is represented as the difference between their feature vectors. + */ +public class DataGenerator { + private static String mySourceDir; + private static String myResultDir; + private static boolean isMultFiles; + + private static String[] myHeader; + private static List<List<String>> myMasterList = new ArrayList<List<String>>(); + + // HashMap used for comparing evaluation classes + public static final HashMap<String, Integer> map1 = new HashMap<String, Integer>(); + + static { + map1.put("Excellent", 7); + map1.put("Very good", 6); + map1.put("Good", 5); + map1.put("OK", 4); + map1.put("Bad", 3); + map1.put("Very bad", 2); + map1.put("Terrible", 1); + } + + /** + * Constructor which takes in path containing one or multiple files to process. + * Also takes in argument specifying whether or not a single file needs to be processed, + * or multiple files need to be processed. + * + * @param sourceDir directory containing single file or multiple files to be processed + * @param resultDir output folder + * @param multFiles true if multiple files in directory need to be processed and false if + * only a single file needs to be processed + */ + public DataGenerator(String sourceDir, String resultDir, boolean multFiles) { + mySourceDir = sourceDir; + myResultDir = resultDir; + isMultFiles = multFiles; + } + + /** + * Responsible for invoking the processing of data file(s) and their subsequent storage + * into a user specified directory. + */ + public void process() { + parseFile(); + writeCSVfile(myMasterList); + } + + /** + * Parses the original user-specified CSV file, storing the contents for future calculations + * and formatting. + */ + public static void parseFile() { + String[][] dataArr = null; + try { + String sourceDir = mySourceDir; + + if (isMultFiles == true) // Case where multiple files have to be processed + { + // Iterate over files in directory + File directory = new File(sourceDir); + File[] directoryListing = directory.listFiles(); + + if (directoryListing != null) { + for (File child : directoryListing) { + CSVReader csvReader = new CSVReader(new FileReader(child)); + List<String[]> list = csvReader.readAll(); + + // Store into 2D array by transforming array list to normal array + dataArr = new String[list.size()][]; + dataArr = list.toArray(dataArr); + + calculateVec(dataArr); + + csvReader.close(); + } + storeHead(dataArr); // Store the header + } + } else // Process only one file + { + File file = new File(sourceDir); + + if (file != null) { + CSVReader csvReader = new CSVReader(new FileReader(file)); + List<String[]> list = csvReader.readAll(); + + // Store into 2D array by transforming array list to normal array + dataArr = new String[list.size()][]; + dataArr = list.toArray(dataArr); + + storeHead(dataArr); // Store the header + calculateVec(dataArr); + + csvReader.close(); + } + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Performs the necessary vector calculations on each possible combination of vectors, + * also storing a value that indicates the evaluation. + * + * @param arr the parsed contents of the original CSV file + */ + public static void calculateVec(String[][] arr) { + List<List<String>> listofLists = new ArrayList<List<String>>(); // Holds calculations + + int rowStart = 1; + for (int row = rowStart; row < arr.length; row++) // Start at row 1 because row 0 is heading lol + { + for (int i = 1; i < arr.length - row; i++) { + List<String> colList = new ArrayList<String>(); // create vector to store all values inside of a column, which is stored inside 2D vector + for (int col = 0; col < arr[0].length - 1; col++) // Columns go until the next to last column + { + //System.out.println(col + " " + arr[row][col]); + // Extract double value from each cell + double x1 = Double.parseDouble(arr[row][col]); + double x2 = Double.parseDouble(arr[row + i][col]); + + // Perform calculation for each cell + double result = x1 - x2; + + // Convert this double value into string, and store inside array list + String strResult = Double.toString(result); + colList.add(strResult); + } + + // Finally, add either 1, -1, or do not add row at all when encountering evaluation value + int addEvalNum = compareEvaluation(arr[row][arr[0].length - 1], arr[row + i][arr[0].length - 1]); + if (addEvalNum == 1) { + colList.add("1"); + listofLists.add(colList); // Add this list to 2D list - row is finished now, move on + } else if (addEvalNum == -1) { + colList.add("-1"); + listofLists.add(colList); // Add this list to 2D list - row is finished now, move on + } + // Else, they are equal, do not even add this row to 2D vector + } + } + + // After all processing takes place, send to method that recreates data with equal # of 1's and -1's + List<List<String>> equalizedList = equalizeList(listofLists); + myMasterList.addAll(equalizedList); + } + + /** + * Taking in two vector evaluation parameters, compares these two evaluations, returning a 1 + * if the first evaluation is greater than the second, a -1 in the case the first evaluation is + * less than the second, and a 10 in the case that the two are equal, meaning this vector will + * not be used. + * + * @param eval1 evaluation from first vector + * @param eval2 evaluation from second vector + * @return 1 if first evaluation is greater than the second, -1 if first evaluation is less than the second, and + * 10 in the case that the two are equal + */ + public static int compareEvaluation(String eval1, String eval2) { + int evalNum1 = map1.get(eval1); + int evalNum2 = map1.get(eval2); + + if (evalNum1 > evalNum2) // ">" means it is more relevant - assign a 1 + { + return 1; + } else if (evalNum1 < evalNum2) { + return -1; + } else { + return 10; // Return 10 if they are equal - signifies you cannot use the row + } + } + + /** + * After vector calculations and new evaluation values are set, produces refined output data such that + * there is an equal or close to equal number of rows containing both "1" and "-1" as the new evaluation value. + * + * @param rawList originally calculated data from the input CSV file + * @return data that has an equal distribution of evaluation values + */ + public static List<List<String>> equalizeList(List<List<String>> rawList) { + // Create two sets - one containing row index for +1 and the other for -1 + List<Integer> pos1List = new ArrayList<Integer>(); + List<Integer> neg1List = new ArrayList<Integer>(); + + for (int i = 0; i < rawList.size(); i++) // Iterate through all rows to get indexes + { + int evalNum = Integer.parseInt(rawList.get(i).get(rawList.get(0).size() - 1)); // Get 1 or -1 from original array list + if (evalNum == 1) { + pos1List.add(i); // Add row index that has 1 + } else if (evalNum == -1) { + neg1List.add(i); // Add row index that has -1 + } + } + + int totPosCount = pos1List.size(); // Total # of 1's + int totNegCount = neg1List.size(); // Total # of -1's + + if ((totPosCount - totNegCount) >= 1) // There are more 1's than -1's, equalize them + { + int indexOfPosList = 0; // Start getting indexes from the first index of positive index location list + while ((totPosCount - totNegCount) >= 1) // Keep going until we have acceptable amount of both +1 and -1 + { + int pos1IndexVal = pos1List.get(indexOfPosList); // Get index from previously made list of indexes + for (int col = 0; col < rawList.get(0).size(); col++) // Go through elements of indexed row, negating it to transform to -1 row + { + double d = Double.parseDouble(rawList.get(pos1IndexVal).get(col)); // Transform to double first + d = d * -1; // Negate it + String negatedValue = Double.toString(d); // Change back to String + rawList.get(pos1IndexVal).set(col, negatedValue);// Put this value back into dat row + } + + totPosCount--; // We changed a +1 row to a -1 row, decrement count of positives + totNegCount++; // Increment count of negatives + indexOfPosList++; // Get next +1 location in raw data + } + } else if ((totNegCount - totPosCount) > 1) // There are more -1's than 1's, equalize them + { + int indexOfNegList = 0; + while ((totNegCount - totPosCount) > 1) // Keep going until we have acceptable amount of both +1 and -1 + { + int neg1IndexVal = neg1List.get(indexOfNegList); // Get index from previously made list of indexes + for (int col = 0; col < rawList.get(0).size(); col++) // Go through elements of indexed row, negating it to transform to +1 row + { + double d = Double.parseDouble(rawList.get(neg1IndexVal).get(col)); // Transform to double first + d = d * -1; // Negate it + String negatedValue = Double.toString(d); // Change back to String + rawList.get(neg1IndexVal).set(col, negatedValue);// Put this value back into dat row + } + + totNegCount--; // We changed a -1 row to a +1 row, decrement count of negatives now + totPosCount++; // Increment count of positives + indexOfNegList++; // Get next -1 location in raw data + } + } else { + // Do nothing - rows are within acceptable equality bounds of plus or minus 1 + } + + return rawList; + } + + /** + * Retrieves the heading from a file to be processed so it can be written to the output file later. + * + * @param arr 2D array containing the parsed information from input file + */ + public static void storeHead(String[][] arr) { + myHeader = new String[arr[0].length]; // Reside private variable + + for (int col = 0; col < arr[0].length; col++) { + myHeader[col] = arr[0][col]; + } + } + + /** + * Writes newly calculated and equally distributed vector data to user specified CSV file. + * + * @param list finalized vector data to write to user specified output file + */ + public static void writeCSVfile(List<List<String>> list) { + String outputFile = myResultDir; + boolean alreadyExists = new File(outputFile).exists(); + + try { + CSVWriter csvOutput = new CSVWriter(new FileWriter(outputFile), ','); // Create new instance of CSVWriter to write to file output + + if (!alreadyExists) { + csvOutput.writeNext(myHeader); // Write the text headers first before data + + for (int i = 0; i < list.size(); i++) // Iterate through all rows in 2D array + { + String[] temp = new String[list.get(i).size()]; // Convert row array list in 2D array to regular string array + temp = list.get(i).toArray(temp); + csvOutput.writeNext(temp); // Write this array to the file + } + } + + csvOutput.close(); // Close csvWriter + } catch (IOException e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/Evaluator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/Evaluator.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/Evaluator.java new file mode 100644 index 0000000..8edb6ad --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/Evaluator.java @@ -0,0 +1,145 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch.ranking; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Supports ability to evaluating ranking results + */ +public class Evaluator { + /** + * Method of calculating NDCG score + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return NDCG score + */ + public double getNDCG(int[] list, int K) { + double dcg = this.getDCG(list, K); + double idcg = this.getIDCG(list, K); + double ndcg = 0.0; + if (idcg > 0.0) { + ndcg = dcg / idcg; + } + return ndcg; + } + + /** + * Method of getting the precision of a list at position K + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return precision at K + */ + public double getPrecision(int[] list, int K) { + int size = list.length; + if (size == 0 || K == 0) { + return 0; + } + + if (K > size) { + K = size; + } + + int rel_doc_num = this.getRelevantDocNum(list, K); + double precision = (double) rel_doc_num / (double) K; + return precision; + } + + /** + * Method of getting the number of relevant element in a ranking results + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return the number of relevant element + */ + private int getRelevantDocNum(int[] list, int K) { + int size = list.length; + if (size == 0 || K == 0) { + return 0; + } + + if (K > size) { + K = size; + } + + int rel_num = 0; + for (int i = 0; i < K; i++) { + if (list[i] > 3) { // 3 refers to "OK" + rel_num++; + } + } + return rel_num; + } + + /** + * Method of calculating DCG score from a list of ranking results + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return DCG score + */ + private double getDCG(int[] list, int K) { + int size = list.length; + if (size == 0 || K == 0) { + return 0.0; + } + + if (K > size) { + K = size; + } + + double dcg = list[0]; + for (int i = 1; i < K; i++) { + int rel = list[i]; + int pos = i + 1; + double rel_log = Math.log(pos) / Math.log(2); + dcg += rel / rel_log; + } + return dcg; + } + + /** + * Method of calculating ideal DCG score from a list of ranking results + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return IDCG score + */ + private double getIDCG(int[] list, int K) { + Comparator<Integer> comparator = new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return o2.compareTo(o1); + } + }; + List<Integer> sortlist = IntStream.of(list).boxed().collect(Collectors.toList()); + ; + Collections.sort(sortlist, comparator); + int[] sortedArr = sortlist.stream().mapToInt(i -> i).toArray(); + double idcg = this.getDCG(sortedArr, K); + return idcg; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/Learner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/Learner.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/Learner.java new file mode 100644 index 0000000..d1c5199 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/Learner.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch.ranking; + +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.classification.SVMModel; +import org.apache.spark.mllib.regression.LabeledPoint; + +import java.io.Serializable; + +/** + * Supports the ability to importing classifier into memory + */ +public class Learner implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + private static final String SPARKSVM = "SparkSVM"; + SVMModel model = null; + transient SparkContext sc = null; + + /** + * Constructor to load in spark SVM classifier + * + * @param classifierName classifier type + * @param skd an instance of spark driver + * @param svmSgdModel path to a trained model + */ + public Learner(String classifierName, SparkDriver skd, String svmSgdModel) { + if (classifierName.equals(SPARKSVM)) { + sc = skd.sc.sc(); + sc.addFile(svmSgdModel, true); + model = SVMModel.load(sc, svmSgdModel); + } + } + + /** + * Method of classifying instance + * + * @param p the instance that needs to be classified + * @return the class id + */ + public double classify(LabeledPoint p) { + return model.predict(p.features()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/SparkFormatter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/SparkFormatter.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/SparkFormatter.java new file mode 100644 index 0000000..ba46d41 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/SparkFormatter.java @@ -0,0 +1,55 @@ +package gov.nasa.jpl.mudrod.ssearch.ranking; + +import java.io.*; +import java.text.DecimalFormat; + +public class SparkFormatter { + DecimalFormat NDForm = new DecimalFormat("#.###"); + + public SparkFormatter() { + } + + public void toSparkSVMformat(String inputCSVFileName, String outputTXTFileName) { + File file = new File(outputTXTFileName); + if (file.exists()) { + file.delete(); + } + try { + file.createNewFile(); + FileWriter fw = new FileWriter(outputTXTFileName); + BufferedWriter bw = new BufferedWriter(fw); + + BufferedReader br = new BufferedReader(new FileReader(inputCSVFileName)); + br.readLine(); + String line = br.readLine(); + while (line != null) { + String[] list = line.split(","); + String output = ""; + Double label = Double.parseDouble(list[list.length - 1].replace("\"", "")); + if (label == -1.0) { + output = "0 "; + } else if (label == 1.0) { + output = "1 "; + } + + for (int i = 0; i < list.length - 1; i++) { + int index = i + 1; + output += index + ":" + NDForm.format(Double.parseDouble(list[i].replace("\"", ""))) + " "; + } + bw.write(output + "\n"); + + line = br.readLine(); + } + br.close(); + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + SparkFormatter sf = new SparkFormatter(); + sf.toSparkSVMformat("C:/mudrodCoreTestData/rankingResults/inputDataForSVM.csv", "C:/mudrodCoreTestData/rankingResults/inputDataForSVM_spark.txt"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/SparkSVM.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/SparkSVM.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/SparkSVM.java new file mode 100644 index 0000000..1ddebf3 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/SparkSVM.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch.ranking; + +import gov.nasa.jpl.mudrod.main.MudrodEngine; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.classification.SVMModel; +import org.apache.spark.mllib.classification.SVMWithSGD; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; + +public class SparkSVM { + + private SparkSVM() { + //public constructor + } + + public static void main(String[] args) { + MudrodEngine me = new MudrodEngine(); + + JavaSparkContext jsc = me.startSparkDriver().sc; + + String path = SparkSVM.class.getClassLoader().getResource("inputDataForSVM_spark.txt").toString(); + JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), path).toJavaRDD(); + + // Run training algorithm to build the model. + int numIterations = 100; + final SVMModel model = SVMWithSGD.train(data.rdd(), numIterations); + + // Save and load model + model.save(jsc.sc(), SparkSVM.class.getClassLoader().getResource("javaSVMWithSGDModel").toString()); + + jsc.sc().stop(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/TrainingImporter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/TrainingImporter.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/TrainingImporter.java new file mode 100644 index 0000000..ae48b55 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/TrainingImporter.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gov.nasa.jpl.mudrod.ssearch.ranking; + +import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract; +import gov.nasa.jpl.mudrod.driver.ESDriver; +import gov.nasa.jpl.mudrod.driver.SparkDriver; +import gov.nasa.jpl.mudrod.main.MudrodConstants; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilders; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports the ability to importing training set into Elasticsearch + */ +public class TrainingImporter extends MudrodAbstract { + /** + * + */ + private static final long serialVersionUID = 1L; + + public TrainingImporter(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + es.deleteAllByQuery(props.getProperty(MudrodConstants.ES_INDEX_NAME), "trainingranking", QueryBuilders.matchAllQuery()); + addMapping(); + } + + /** + * Method of adding mapping to traning set type + */ + public void addMapping() { + XContentBuilder Mapping; + try { + Mapping = jsonBuilder().startObject().startObject("trainingranking").startObject("properties").startObject("query").field("type", "string").field("index", "not_analyzed").endObject() + .startObject("dataID").field("type", "string").field("index", "not_analyzed").endObject().startObject("label").field("type", "string").field("index", "not_analyzed").endObject().endObject() + .endObject().endObject(); + + es.getClient().admin().indices().preparePutMapping(props.getProperty("indexName")).setType("trainingranking").setSource(Mapping).execute().actionGet(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Method of importing training set in to Elasticsearch + * + * @param dataFolder the path to the traing set + * @throws IOException IOException + */ + public void importTrainingSet(String dataFolder) throws IOException { + es.createBulkProcessor(); + + File[] files = new File(dataFolder).listFiles(); + for (File file : files) { + BufferedReader br = new BufferedReader(new FileReader(file.getAbsolutePath())); + br.readLine(); + String line = br.readLine(); + while (line != null) { + String[] list = line.split(","); + String query = file.getName().replace(".csv", ""); + if (list.length > 0) { + IndexRequest ir = new IndexRequest(props.getProperty("indexName"), "trainingranking") + .source(jsonBuilder().startObject().field("query", query).field("dataID", list[0]).field("label", list[list.length - 1]).endObject()); + es.getBulkProcessor().add(ir); + } + line = br.readLine(); + } + br.close(); + } + es.destroyBulkProcessor(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/package-info.java new file mode 100644 index 0000000..205e7a7 --- /dev/null +++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/ranking/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes classes for importing training data, ML models, + * generating input data for RankSVM, and evaluating ranking results + */ +package gov.nasa.jpl.mudrod.ssearch.ranking; \ No newline at end of file
