http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/semantics/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/semantics/SemanticAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/semantics/SemanticAnalyzer.java new file mode 100644 index 0000000..a4ecdc8 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/semantics/SemanticAnalyzer.java @@ -0,0 +1,148 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.semantics; + +import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.utils.LinkageTriple; +import org.apache.sdap.mudrod.utils.MatrixUtil; +import org.apache.sdap.mudrod.utils.SimilarityUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * ClassName: SemanticAnalyzer Function: Semantic analyzer + */ +public class SemanticAnalyzer extends MudrodAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Creates a new instance of SemanticAnalyzer. + * + * @param props + * the Mudrod configuration + * @param es + * the Elasticsearch drive + * @param spark + * the spark drive + */ + public SemanticAnalyzer(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Calculate term similarity from CSV matrix. + * + * @param csvFileName + * csv file of matrix, each row is a term, and each column is a + * dimension in feature space + * @return Linkage triple list + */ + public List<LinkageTriple> calTermSimfromMatrix(String csvFileName) { + File f = new File(csvFileName); + if (!f.exists()) { + return null; + } + return this.calTermSimfromMatrix(csvFileName, 1); + } + + /** + * Calculate term similarity from CSV matrix. + * + * @param csvFileName csv file of matrix, each row is a term, and each column is a + * dimension in feature space + * @param skipRow number of rows to skip in input CSV file e.g. header + * @return Linkage triple list + */ + public List<LinkageTriple> calTermSimfromMatrix(String csvFileName, int skipRow) { + + JavaPairRDD<String, Vector> importRDD = MatrixUtil.loadVectorFromCSV(spark, csvFileName, skipRow); + if (importRDD == null || importRDD.values().first().size() == 0) { + return null; + } + + CoordinateMatrix simMatrix = SimilarityUtil.calculateSimilarityFromVector(importRDD.values()); + JavaRDD<String> rowKeyRDD = importRDD.keys(); + return SimilarityUtil.matrixToTriples(rowKeyRDD, simMatrix); + } + + /** + * Calculate term similarity from CSV matrix. + * + * @param csvFileName csv file of matrix, each row is a term, and each column is a + * dimension in feature space + * @param simType the type of similary calculation to execute e.g. + * <ul> + * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_COSINE} - 3,</li> + * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_HELLINGER} - 2,</li> + * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_PEARSON} - 1</li> + * </ul> + * @param skipRow number of rows to skip in input CSV file e.g. header + * @return Linkage triple list + */ + public List<LinkageTriple> calTermSimfromMatrix(String csvFileName, int simType, int skipRow) { + + JavaPairRDD<String, Vector> importRDD = MatrixUtil.loadVectorFromCSV(spark, csvFileName, skipRow); + if (importRDD.values().first().size() == 0) { + return null; + } + + JavaRDD<LinkageTriple> triples = SimilarityUtil.calculateSimilarityFromVector(importRDD, simType); + + return triples.collect(); + } + + public void saveToES(List<LinkageTriple> tripleList, String index, String type) { + try { + LinkageTriple.insertTriples(es, tripleList, index, type); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Method of saving linkage triples to Elasticsearch. + * + * @param tripleList + * linkage triple list + * @param index + * index name + * @param type + * type name + * @param bTriple + * bTriple + * @param bSymmetry + * bSymmetry + */ + public void saveToES(List<LinkageTriple> tripleList, String index, String type, boolean bTriple, boolean bSymmetry) { + try { + LinkageTriple.insertTriples(es, tripleList, index, type, bTriple, bSymmetry); + } catch (IOException e) { + e.printStackTrace(); + + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/semantics/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/semantics/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/semantics/package-info.java new file mode 100644 index 0000000..d543e98 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/semantics/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes SVD transformation function, methods of calculating + * similarity from CSV, and saving triples into Elasticsearch + */ +package org.apache.sdap.mudrod.semantics; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java new file mode 100644 index 0000000..d5224dd --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch; + +import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to import click stream data into Elasticsearch + * through .csv file + */ +public class ClickstreamImporter extends MudrodAbstract { + /** + * + */ + private static final long serialVersionUID = 1L; + + public ClickstreamImporter(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + addClickStreamMapping(); + } + + /** + * Method to add Elasticsearch mapping for click stream data + */ + public void addClickStreamMapping() { + XContentBuilder Mapping; + try { + Mapping = jsonBuilder().startObject().startObject( + props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).startObject( + "properties").startObject("query").field("type", "string").field( + "index", "not_analyzed").endObject().startObject("dataID").field( + "type", "string").field("index", "not_analyzed").endObject() + + .endObject().endObject().endObject(); + + es.getClient().admin().indices().preparePutMapping( + props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType( + props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).setSource( + Mapping).execute().actionGet(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Method to import click stream CSV into Elasticsearch + */ + public void importfromCSVtoES() { + es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), + props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)); + es.createBulkProcessor(); + + BufferedReader br = null; + String cvsSplitBy = ","; + + try { + br = new BufferedReader(new FileReader(props.getProperty("clickstreamMatrix"))); + String line = br.readLine(); + // first item needs to be skipped + String[] dataList = line.split(cvsSplitBy); + while ((line = br.readLine()) != null) { + String[] clicks = line.split(cvsSplitBy); + for (int i = 1; i < clicks.length; i++) { + if (!"0.0".equals(clicks[i])) { + IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), + props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)) + .source(jsonBuilder().startObject().field("query", clicks[0]).field( + "dataID", dataList[i]).field("clicks", clicks[i]).endObject()); + es.getBulkProcessor().add(ir); + } + } + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (br != null) { + try { + br.close(); + es.destroyBulkProcessor(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/Dispatcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/Dispatcher.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/Dispatcher.java new file mode 100644 index 0000000..611c76b --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/Dispatcher.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch; + +import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.integration.LinkageIntegration; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.MultiMatchQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +/** + * Supports ability to transform regular user query into a semantic query + */ +public class Dispatcher extends MudrodAbstract { + private static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class); + + public Dispatcher(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of getting semantically most related terms by number + * + * @param input regular input query + * @param num the number of most related terms + * @return a map from term to similarity + */ + public Map<String, Double> getRelatedTerms(String input, int num) { + LinkageIntegration li = new LinkageIntegration(props, this.es, null); + Map<String, Double> sortedMap = li.appyMajorRule(input); + Map<String, Double> selected_Map = new HashMap<>(); + int count = 0; + for (Entry<String, Double> entry : sortedMap.entrySet()) { + if (count < num) { + selected_Map.put(entry.getKey(), entry.getValue()); + } + count++; + } + return selected_Map; + } + + /** + * Method of getting semantically most related terms by similarity threshold + * + * @param input regular input query + * @param T value of threshold, raning from 0 to 1 + * @return a map from term to similarity + */ + public Map<String, Double> getRelatedTermsByT(String input, double T) { + LinkageIntegration li = new LinkageIntegration(this.props, this.es, null); + Map<String, Double> sortedMap = li.appyMajorRule(input); + Map<String, Double> selected_Map = new HashMap<>(); + + for (Entry<String, Double> entry : sortedMap.entrySet()) { + if (entry.getValue() >= T) { + selected_Map.put(entry.getKey(), entry.getValue()); + } + } + return selected_Map; + } + + /** + * Method of creating semantic query based on Threshold + * + * @param input regular query + * @param T threshold raning from 0 to 1 + * @param query_operator query mode + * @return a multiMatch query builder + */ + public BoolQueryBuilder createSemQuery(String input, double T, String query_operator) { + Map<String, Double> selected_Map = getRelatedTermsByT(input, T); + selected_Map.put(input, (double) 1); + + String fieldsList[] = { "Dataset-Metadata", "Dataset-ShortName", "Dataset-LongName", + "DatasetParameter-Topic", "DatasetParameter-VariableDetail", "DatasetParameter-Category", + "DatasetParameter-Variable", "DatasetParameter-Term", + "DatasetSource-Source-LongName", "DatasetSource-Source-LongName-Full", + "DatasetSource-Source-ShortName", "DatasetSource-Source-ShortName-Full", + "DatasetSource-Sensor-LongName", "DatasetSource-Sensor-LongName-Full", "DatasetSource-Sensor-ShortName", + "DatasetSource-Sensor-ShortName-Full" }; + BoolQueryBuilder qb = new BoolQueryBuilder(); + for (Entry<String, Double> entry : selected_Map.entrySet()) { + if (query_operator.toLowerCase().trim().equals("phrase")) { + qb.should(QueryBuilders.multiMatchQuery(entry.getKey(), fieldsList).boost(entry.getValue().floatValue()).type(MultiMatchQueryBuilder.Type.PHRASE).tieBreaker((float) 0.5)); // when + // set + // to + // 1.0, + // it + // would + // be + // equal + // to + // "most + // fields" + // query + } else if (query_operator.toLowerCase().trim().equals("and")) { + qb.should(QueryBuilders.multiMatchQuery(entry.getKey(), fieldsList).boost(entry.getValue().floatValue()).operator(MatchQueryBuilder.DEFAULT_OPERATOR.AND).tieBreaker((float) 0.5)); + } else { + qb.should(QueryBuilders.multiMatchQuery(entry.getKey(), fieldsList).boost(entry.getValue().floatValue()).operator(MatchQueryBuilder.DEFAULT_OPERATOR.OR).tieBreaker((float) 0.5)); + } + } + + // LOG.info(qb.toString()); + return qb; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/Ranker.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/Ranker.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/Ranker.java new file mode 100644 index 0000000..98522b4 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/Ranker.java @@ -0,0 +1,192 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch; + +import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.ssearch.ranking.Learner; +import org.apache.sdap.mudrod.ssearch.structure.SResult; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.regression.LabeledPoint; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.*; + +/** + * Supports the ability to calculating ranking score + */ +public class Ranker extends MudrodAbstract implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + transient List<SResult> resultList = new ArrayList<>(); + + String learnerType = null; + Learner le = null; + + public Ranker(Properties props, ESDriver es, SparkDriver spark, String learnerType) { + super(props, es, spark); + this.learnerType = learnerType; + le = new Learner(learnerType, spark, props.getProperty(MudrodConstants.SVM_SGD_MODEL)); + } + + /** + * Method of comparing results based on final score + */ + public class ResultComparator implements Comparator<SResult> { + @Override + public int compare(SResult o1, SResult o2) { + return o2.below.compareTo(o1.below); + } + } + + /** + * Method of calculating mean value + * + * @param attribute the attribute name that need to be calculated on + * @param resultList an array list of result + * @return mean value + */ + private double getMean(String attribute, List<SResult> resultList) { + double sum = 0.0; + for (SResult a : resultList) { + sum += (double) SResult.get(a, attribute); + } + return getNDForm(sum / resultList.size()); + } + + /** + * Method of calculating variance value + * + * @param attribute the attribute name that need to be calculated on + * @param resultList an array list of result + * @return variance value + */ + private double getVariance(String attribute, List<SResult> resultList) { + double mean = getMean(attribute, resultList); + double temp = 0.0; + double val; + for (SResult a : resultList) { + val = (Double) SResult.get(a, attribute); + temp += (mean - val) * (mean - val); + } + + return getNDForm(temp / resultList.size()); + } + + /** + * Method of calculating standard variance + * + * @param attribute the attribute name that need to be calculated on + * @param resultList an array list of result + * @return standard variance + */ + private double getStdDev(String attribute, List<SResult> resultList) { + return getNDForm(Math.sqrt(getVariance(attribute, resultList))); + } + + /** + * Method of calculating Z score + * + * @param val the value of an attribute + * @param mean the mean value of an attribute + * @param std the standard deviation of an attribute + * @return Z score + */ + private double getZscore(double val, double mean, double std) { + if (!equalComp(std, 0)) { + return getNDForm((val - mean) / std); + } else { + return 0; + } + } + + private boolean equalComp(double a, double b) { + return Math.abs(a - b) < 0.0001; + } + + /** + * Get the first N decimals of a double value + * + * @param d double value that needs to be processed + * @return processed double value + */ + private double getNDForm(double d) { + DecimalFormat ndForm = new DecimalFormat("#.###"); + return Double.valueOf(ndForm.format(d)); + } + + /** + * Method of ranking a list of result + * + * @param resultList result list + * @return ranked result list + */ + public List<SResult> rank(List<SResult> resultList) { + for (int i = 0; i < resultList.size(); i++) { + for (int m = 0; m < SResult.rlist.length; m++) { + String att = SResult.rlist[m].split("_")[0]; + double val = SResult.get(resultList.get(i), att); + double mean = getMean(att, resultList); + double std = getStdDev(att, resultList); + double score = getZscore(val, mean, std); + String scoreId = SResult.rlist[m]; + SResult.set(resultList.get(i), scoreId, score); + } + } + + // using collection.sort directly would cause an "not transitive" error + // this is because the training model is not a overfitting model + for (int j = 0; j < resultList.size(); j++) { + for (int k = 0; k < resultList.size(); k++) { + if (k != j) { + resultList.get(j).below += comp(resultList.get(j), resultList.get(k)); + } + } + } + + Collections.sort(resultList, new ResultComparator()); + return resultList; + } + + /** + * Method of compare two search resutls + * + * @param o1 search result 1 + * @param o2 search result 2 + * @return 1 if o1 is greater than o2, 0 otherwise + */ + public int comp(SResult o1, SResult o2) { + List<Double> instList = new ArrayList<>(); + for (int i = 0; i < SResult.rlist.length; i++) { + double o2Score = SResult.get(o2, SResult.rlist[i]); + double o1Score = SResult.get(o1, SResult.rlist[i]); + instList.add(o2Score - o1Score); + } + + double[] ins = instList.stream().mapToDouble(i -> i).toArray(); + LabeledPoint insPoint = new LabeledPoint(99.0, Vectors.dense(ins)); + double prediction = le.classify(insPoint); + if (equalComp(prediction, 1)) { //different from weka where the return value is 1 or 2 + return 0; + } else { + return 1; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/Searcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/Searcher.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/Searcher.java new file mode 100644 index 0000000..34b9fa4 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/Searcher.java @@ -0,0 +1,282 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.ssearch.structure.SResult; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.regex.Pattern; + +/** + * Supports ability to performance semantic search with a given query + */ +public class Searcher extends MudrodAbstract implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + DecimalFormat NDForm = new DecimalFormat("#.##"); + final Integer MAX_CHAR = 700; + + public Searcher(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * Method of converting processing level string into a number + * + * @param pro processing level string + * @return processing level number + */ + public Double getProLevelNum(String pro) { + if (pro == null) { + return 1.0; + } + Double proNum; + Pattern p = Pattern.compile(".*[a-zA-Z].*"); + if (pro.matches("[0-9]{1}[a-zA-Z]{1}")) { + proNum = Double.parseDouble(pro.substring(0, 1)); + } else if (p.matcher(pro).find()) { + proNum = 1.0; + } else { + proNum = Double.parseDouble(pro); + } + + return proNum; + } + + public Double getPop(Double pop) { + if (pop > 1000) { + pop = 1000.0; + } + return pop; + } + + /** + * Method of checking if query exists in a certain attribute + * + * @param strList attribute value in the form of ArrayList + * @param query query string + * @return 1 means query exists, 0 otherwise + */ + public Double exists(ArrayList<String> strList, String query) { + Double val = 0.0; + if (strList != null) { + String str = String.join(", ", strList); + if (str != null && str.length() != 0 && str.toLowerCase().trim().contains(query)) { + val = 1.0; + } + } + return val; + } + + /** + * Main method of semantic search + * + * @param index index name in Elasticsearch + * @param type type name in Elasticsearch + * @param query regular query string + * @param queryOperator query mode- query, or, and + * @return a list of search result + */ + @SuppressWarnings("unchecked") + public List<SResult> searchByQuery(String index, String type, String query, String queryOperator, String rankOption) { + boolean exists = es.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); + if (!exists) { + return new ArrayList<>(); + } + + SortOrder order = null; + String sortFiled = ""; + switch (rankOption) { + case "Rank-AllTimePopularity": + sortFiled = "Dataset-AllTimePopularity"; + order = SortOrder.DESC; + break; + case "Rank-MonthlyPopularity": + sortFiled = "Dataset-MonthlyPopularity"; + order = SortOrder.DESC; + break; + case "Rank-UserPopularity": + sortFiled = "Dataset-UserPopularity"; + order = SortOrder.DESC; + break; + case "Rank-LongName-Full": + sortFiled = "Dataset-LongName.raw"; + order = SortOrder.ASC; + break; + case "Rank-ShortName-Full": + sortFiled = "Dataset-ShortName.raw"; + order = SortOrder.ASC; + break; + case "Rank-GridSpatialResolution": + sortFiled = "Dataset-GridSpatialResolution"; + order = SortOrder.DESC; + break; + case "Rank-SatelliteSpatialResolution": + sortFiled = "Dataset-SatelliteSpatialResolution"; + order = SortOrder.DESC; + break; + case "Rank-StartTimeLong-Long": + sortFiled = "DatasetCoverage-StartTimeLong-Long"; + order = SortOrder.ASC; + break; + case "Rank-StopTimeLong-Long": + sortFiled = "DatasetCoverage-StopTimeLong-Long"; + order = SortOrder.DESC; + break; + default: + sortFiled = "Dataset-ShortName.raw"; + order = SortOrder.ASC; + break; + } + + Dispatcher dp = new Dispatcher(this.getConfig(), this.getES(), null); + BoolQueryBuilder qb = dp.createSemQuery(query, 1.0, queryOperator); + List<SResult> resultList = new ArrayList<>(); + + SearchRequestBuilder builder = es.getClient().prepareSearch(index).setTypes(type).setQuery(qb).addSort(sortFiled, order).setSize(500).setTrackScores(true); + SearchResponse response = builder.execute().actionGet(); + + for (SearchHit hit : response.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + Double relevance = Double.valueOf(NDForm.format(hit.getScore())); + String shortName = (String) result.get("Dataset-ShortName"); + String longName = (String) result.get("Dataset-LongName"); + + ArrayList<String> topicList = (ArrayList<String>) result.get("DatasetParameter-Variable"); + String topic = ""; + if (null != topicList) { + topic = String.join(", ", topicList); + } + String content = (String) result.get("Dataset-Description"); + + if (!"".equals(content)) { + int maxLength = (content.length() < MAX_CHAR) ? content.length() : MAX_CHAR; + content = content.trim().substring(0, maxLength - 1) + "..."; + } + + ArrayList<String> longdate = (ArrayList<String>) result.get("DatasetCitation-ReleaseDateLong"); + Date date = new Date(Long.valueOf(longdate.get(0)).longValue()); + SimpleDateFormat df2 = new SimpleDateFormat("MM/dd/yyyy"); + String dateText = df2.format(date); + + // start date + Long start = (Long) result.get("DatasetCoverage-StartTimeLong-Long"); + Date startDate = new Date(start); + String startDateTxt = df2.format(startDate); + + // end date + String end = (String) result.get("Dataset-DatasetCoverage-StopTimeLong"); + String endDateTxt = ""; + if ("".equals(end)) { + endDateTxt = "Present"; + } else { + Date endDate = new Date(Long.valueOf(end)); + endDateTxt = df2.format(endDate); + } + + String processingLevel = (String) result.get("Dataset-ProcessingLevel"); + Double proNum = getProLevelNum(processingLevel); + + Double userPop = getPop(((Integer) result.get("Dataset-UserPopularity")).doubleValue()); + Double allPop = getPop(((Integer) result.get("Dataset-AllTimePopularity")).doubleValue()); + Double monthPop = getPop(((Integer) result.get("Dataset-MonthlyPopularity")).doubleValue()); + + List<String> sensors = (List<String>) result.get("DatasetSource-Sensor-ShortName"); + + SResult re = new SResult(shortName, longName, topic, content, dateText); + + SResult.set(re, "term", relevance); + SResult.set(re, "releaseDate", Long.valueOf(longdate.get(0)).doubleValue()); + SResult.set(re, "processingLevel", processingLevel); + SResult.set(re, "processingL", proNum); + SResult.set(re, "userPop", userPop); + SResult.set(re, "allPop", allPop); + SResult.set(re, "monthPop", monthPop); + SResult.set(re, "startDate", startDateTxt); + SResult.set(re, "endDate", endDateTxt); + SResult.set(re, "sensors", String.join(", ", sensors)); + + QueryBuilder queryLabelSearch = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("query", query)).must(QueryBuilders.termQuery("dataID", shortName)); + SearchResponse labelRes = es.getClient().prepareSearch(index).setTypes("trainingranking").setQuery(queryLabelSearch).setSize(5).execute().actionGet(); + String labelString = null; + for (SearchHit label : labelRes.getHits().getHits()) { + Map<String, Object> labelItem = label.getSource(); + labelString = (String) labelItem.get("label"); + } + SResult.set(re, "label", labelString); + resultList.add(re); + } + + return resultList; + } + + /** + * Method of semantic search to generate JSON string + * + * @param index index name in Elasticsearch + * @param type type name in Elasticsearch + * @param query regular query string + * @param queryOperator query mode- query, or, and + * @param rr selected ranking method + * @return search results + */ + public String ssearch(String index, String type, String query, String queryOperator, String rankOption, Ranker rr) { + List<SResult> li = searchByQuery(index, type, query, queryOperator, rankOption); + if ("Rank-SVM".equals(rankOption)) { + li = rr.rank(li); + } + Gson gson = new Gson(); + List<JsonObject> fileList = new ArrayList<>(); + + for (int i = 0; i < li.size(); i++) { + JsonObject file = new JsonObject(); + file.addProperty("Short Name", (String) SResult.get(li.get(i), "shortName")); + file.addProperty("Long Name", (String) SResult.get(li.get(i), "longName")); + file.addProperty("Topic", (String) SResult.get(li.get(i), "topic")); + file.addProperty("Description", (String) SResult.get(li.get(i), "description")); + file.addProperty("Release Date", (String) SResult.get(li.get(i), "relase_date")); + fileList.add(file); + + file.addProperty("Start/End Date", (String) SResult.get(li.get(i), "startDate") + " - " + (String) SResult.get(li.get(i), "endDate")); + file.addProperty("Processing Level", (String) SResult.get(li.get(i), "processingLevel")); + + file.addProperty("Sensor", (String) SResult.get(li.get(i), "sensors")); + } + JsonElement fileListElement = gson.toJsonTree(fileList); + + JsonObject pDResults = new JsonObject(); + pDResults.add("PDResults", fileListElement); + return pDResults.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/package-info.java new file mode 100644 index 0000000..70d05f4 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes classes for semantic search, such as click stream importer, + * query dispatcher, semantic searcher, and ranker (ranksvm, ordinal/linear regression) + */ +package org.apache.sdap.mudrod.ssearch; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/DataGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/DataGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/DataGenerator.java new file mode 100644 index 0000000..ba3c88e --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/DataGenerator.java @@ -0,0 +1,313 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch.ranking; + +import au.com.bytecode.opencsv.CSVReader; +import au.com.bytecode.opencsv.CSVWriter; + +import java.io.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * SVMData is a program designed to create appropriate input data for the RankSVM + * algorithm that involves Pairwise Classification. Specifically, instead of working in + * the space of query-document vectors, e.g. x1, x2, x3, we transform them into a new space + * in which a pair of documents is represented as the difference between their feature vectors. + */ +public class DataGenerator { + private static String mySourceDir; + private static String myResultDir; + private static boolean isMultFiles; + + private static String[] myHeader; + private static List<List<String>> myMasterList = new ArrayList<List<String>>(); + + // HashMap used for comparing evaluation classes + public static final HashMap<String, Integer> map1 = new HashMap<String, Integer>(); + + static { + map1.put("Excellent", 7); + map1.put("Very good", 6); + map1.put("Good", 5); + map1.put("OK", 4); + map1.put("Bad", 3); + map1.put("Very bad", 2); + map1.put("Terrible", 1); + } + + /** + * Constructor which takes in path containing one or multiple files to process. + * Also takes in argument specifying whether or not a single file needs to be processed, + * or multiple files need to be processed. + * + * @param sourceDir directory containing single file or multiple files to be processed + * @param resultDir output folder + * @param multFiles true if multiple files in directory need to be processed and false if + * only a single file needs to be processed + */ + public DataGenerator(String sourceDir, String resultDir, boolean multFiles) { + mySourceDir = sourceDir; + myResultDir = resultDir; + isMultFiles = multFiles; + } + + /** + * Responsible for invoking the processing of data file(s) and their subsequent storage + * into a user specified directory. + */ + public void process() { + parseFile(); + writeCSVfile(myMasterList); + } + + /** + * Parses the original user-specified CSV file, storing the contents for future calculations + * and formatting. + */ + public static void parseFile() { + String[][] dataArr = null; + try { + String sourceDir = mySourceDir; + + if (isMultFiles == true) // Case where multiple files have to be processed + { + // Iterate over files in directory + File directory = new File(sourceDir); + File[] directoryListing = directory.listFiles(); + + if (directoryListing != null) { + for (File child : directoryListing) { + CSVReader csvReader = new CSVReader(new FileReader(child)); + List<String[]> list = csvReader.readAll(); + + // Store into 2D array by transforming array list to normal array + dataArr = new String[list.size()][]; + dataArr = list.toArray(dataArr); + + calculateVec(dataArr); + + csvReader.close(); + } + storeHead(dataArr); // Store the header + } + } else // Process only one file + { + File file = new File(sourceDir); + + if (file != null) { + CSVReader csvReader = new CSVReader(new FileReader(file)); + List<String[]> list = csvReader.readAll(); + + // Store into 2D array by transforming array list to normal array + dataArr = new String[list.size()][]; + dataArr = list.toArray(dataArr); + + storeHead(dataArr); // Store the header + calculateVec(dataArr); + + csvReader.close(); + } + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Performs the necessary vector calculations on each possible combination of vectors, + * also storing a value that indicates the evaluation. + * + * @param arr the parsed contents of the original CSV file + */ + public static void calculateVec(String[][] arr) { + List<List<String>> listofLists = new ArrayList<List<String>>(); // Holds calculations + + int rowStart = 1; + for (int row = rowStart; row < arr.length; row++) // Start at row 1 because row 0 is heading lol + { + for (int i = 1; i < arr.length - row; i++) { + List<String> colList = new ArrayList<String>(); // create vector to store all values inside of a column, which is stored inside 2D vector + for (int col = 0; col < arr[0].length - 1; col++) // Columns go until the next to last column + { + //System.out.println(col + " " + arr[row][col]); + // Extract double value from each cell + double x1 = Double.parseDouble(arr[row][col]); + double x2 = Double.parseDouble(arr[row + i][col]); + + // Perform calculation for each cell + double result = x1 - x2; + + // Convert this double value into string, and store inside array list + String strResult = Double.toString(result); + colList.add(strResult); + } + + // Finally, add either 1, -1, or do not add row at all when encountering evaluation value + int addEvalNum = compareEvaluation(arr[row][arr[0].length - 1], arr[row + i][arr[0].length - 1]); + if (addEvalNum == 1) { + colList.add("1"); + listofLists.add(colList); // Add this list to 2D list - row is finished now, move on + } else if (addEvalNum == -1) { + colList.add("-1"); + listofLists.add(colList); // Add this list to 2D list - row is finished now, move on + } + // Else, they are equal, do not even add this row to 2D vector + } + } + + // After all processing takes place, send to method that recreates data with equal # of 1's and -1's + List<List<String>> equalizedList = equalizeList(listofLists); + myMasterList.addAll(equalizedList); + } + + /** + * Taking in two vector evaluation parameters, compares these two evaluations, returning a 1 + * if the first evaluation is greater than the second, a -1 in the case the first evaluation is + * less than the second, and a 10 in the case that the two are equal, meaning this vector will + * not be used. + * + * @param eval1 evaluation from first vector + * @param eval2 evaluation from second vector + * @return 1 if first evaluation is greater than the second, -1 if first evaluation is less than the second, and + * 10 in the case that the two are equal + */ + public static int compareEvaluation(String eval1, String eval2) { + int evalNum1 = map1.get(eval1); + int evalNum2 = map1.get(eval2); + + if (evalNum1 > evalNum2) // ">" means it is more relevant - assign a 1 + { + return 1; + } else if (evalNum1 < evalNum2) { + return -1; + } else { + return 10; // Return 10 if they are equal - signifies you cannot use the row + } + } + + /** + * After vector calculations and new evaluation values are set, produces refined output data such that + * there is an equal or close to equal number of rows containing both "1" and "-1" as the new evaluation value. + * + * @param rawList originally calculated data from the input CSV file + * @return data that has an equal distribution of evaluation values + */ + public static List<List<String>> equalizeList(List<List<String>> rawList) { + // Create two sets - one containing row index for +1 and the other for -1 + List<Integer> pos1List = new ArrayList<Integer>(); + List<Integer> neg1List = new ArrayList<Integer>(); + + for (int i = 0; i < rawList.size(); i++) // Iterate through all rows to get indexes + { + int evalNum = Integer.parseInt(rawList.get(i).get(rawList.get(0).size() - 1)); // Get 1 or -1 from original array list + if (evalNum == 1) { + pos1List.add(i); // Add row index that has 1 + } else if (evalNum == -1) { + neg1List.add(i); // Add row index that has -1 + } + } + + int totPosCount = pos1List.size(); // Total # of 1's + int totNegCount = neg1List.size(); // Total # of -1's + + if ((totPosCount - totNegCount) >= 1) // There are more 1's than -1's, equalize them + { + int indexOfPosList = 0; // Start getting indexes from the first index of positive index location list + while ((totPosCount - totNegCount) >= 1) // Keep going until we have acceptable amount of both +1 and -1 + { + int pos1IndexVal = pos1List.get(indexOfPosList); // Get index from previously made list of indexes + for (int col = 0; col < rawList.get(0).size(); col++) // Go through elements of indexed row, negating it to transform to -1 row + { + double d = Double.parseDouble(rawList.get(pos1IndexVal).get(col)); // Transform to double first + d = d * -1; // Negate it + String negatedValue = Double.toString(d); // Change back to String + rawList.get(pos1IndexVal).set(col, negatedValue);// Put this value back into dat row + } + + totPosCount--; // We changed a +1 row to a -1 row, decrement count of positives + totNegCount++; // Increment count of negatives + indexOfPosList++; // Get next +1 location in raw data + } + } else if ((totNegCount - totPosCount) > 1) // There are more -1's than 1's, equalize them + { + int indexOfNegList = 0; + while ((totNegCount - totPosCount) > 1) // Keep going until we have acceptable amount of both +1 and -1 + { + int neg1IndexVal = neg1List.get(indexOfNegList); // Get index from previously made list of indexes + for (int col = 0; col < rawList.get(0).size(); col++) // Go through elements of indexed row, negating it to transform to +1 row + { + double d = Double.parseDouble(rawList.get(neg1IndexVal).get(col)); // Transform to double first + d = d * -1; // Negate it + String negatedValue = Double.toString(d); // Change back to String + rawList.get(neg1IndexVal).set(col, negatedValue);// Put this value back into dat row + } + + totNegCount--; // We changed a -1 row to a +1 row, decrement count of negatives now + totPosCount++; // Increment count of positives + indexOfNegList++; // Get next -1 location in raw data + } + } else { + // Do nothing - rows are within acceptable equality bounds of plus or minus 1 + } + + return rawList; + } + + /** + * Retrieves the heading from a file to be processed so it can be written to the output file later. + * + * @param arr 2D array containing the parsed information from input file + */ + public static void storeHead(String[][] arr) { + myHeader = new String[arr[0].length]; // Reside private variable + + for (int col = 0; col < arr[0].length; col++) { + myHeader[col] = arr[0][col]; + } + } + + /** + * Writes newly calculated and equally distributed vector data to user specified CSV file. + * + * @param list finalized vector data to write to user specified output file + */ + public static void writeCSVfile(List<List<String>> list) { + String outputFile = myResultDir; + boolean alreadyExists = new File(outputFile).exists(); + + try { + CSVWriter csvOutput = new CSVWriter(new FileWriter(outputFile), ','); // Create new instance of CSVWriter to write to file output + + if (!alreadyExists) { + csvOutput.writeNext(myHeader); // Write the text headers first before data + + for (int i = 0; i < list.size(); i++) // Iterate through all rows in 2D array + { + String[] temp = new String[list.get(i).size()]; // Convert row array list in 2D array to regular string array + temp = list.get(i).toArray(temp); + csvOutput.writeNext(temp); // Write this array to the file + } + } + + csvOutput.close(); // Close csvWriter + } catch (IOException e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Evaluator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Evaluator.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Evaluator.java new file mode 100644 index 0000000..ad7f159 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Evaluator.java @@ -0,0 +1,145 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch.ranking; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Supports ability to evaluating ranking results + */ +public class Evaluator { + /** + * Method of calculating NDCG score + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return NDCG score + */ + public double getNDCG(int[] list, int K) { + double dcg = this.getDCG(list, K); + double idcg = this.getIDCG(list, K); + double ndcg = 0.0; + if (idcg > 0.0) { + ndcg = dcg / idcg; + } + return ndcg; + } + + /** + * Method of getting the precision of a list at position K + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return precision at K + */ + public double getPrecision(int[] list, int K) { + int size = list.length; + if (size == 0 || K == 0) { + return 0; + } + + if (K > size) { + K = size; + } + + int rel_doc_num = this.getRelevantDocNum(list, K); + double precision = (double) rel_doc_num / (double) K; + return precision; + } + + /** + * Method of getting the number of relevant element in a ranking results + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return the number of relevant element + */ + private int getRelevantDocNum(int[] list, int K) { + int size = list.length; + if (size == 0 || K == 0) { + return 0; + } + + if (K > size) { + K = size; + } + + int rel_num = 0; + for (int i = 0; i < K; i++) { + if (list[i] > 3) { // 3 refers to "OK" + rel_num++; + } + } + return rel_num; + } + + /** + * Method of calculating DCG score from a list of ranking results + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return DCG score + */ + private double getDCG(int[] list, int K) { + int size = list.length; + if (size == 0 || K == 0) { + return 0.0; + } + + if (K > size) { + K = size; + } + + double dcg = list[0]; + for (int i = 1; i < K; i++) { + int rel = list[i]; + int pos = i + 1; + double rel_log = Math.log(pos) / Math.log(2); + dcg += rel / rel_log; + } + return dcg; + } + + /** + * Method of calculating ideal DCG score from a list of ranking results + * + * @param list a list of integer with each integer element indicating + * the performance at its position + * @param K the number of elements needed to be included in the calculation + * @return IDCG score + */ + private double getIDCG(int[] list, int K) { + Comparator<Integer> comparator = new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return o2.compareTo(o1); + } + }; + List<Integer> sortlist = IntStream.of(list).boxed().collect(Collectors.toList()); + ; + Collections.sort(sortlist, comparator); + int[] sortedArr = sortlist.stream().mapToInt(i -> i).toArray(); + double idcg = this.getDCG(sortedArr, K); + return idcg; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Learner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Learner.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Learner.java new file mode 100644 index 0000000..cb4da7e --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Learner.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch.ranking; + +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.classification.SVMModel; +import org.apache.spark.mllib.regression.LabeledPoint; + +import java.io.Serializable; + +/** + * Supports the ability to importing classifier into memory + */ +public class Learner implements Serializable { + /** + * + */ + private static final long serialVersionUID = 1L; + private static final String SPARKSVM = "SparkSVM"; + SVMModel model = null; + transient SparkContext sc = null; + + /** + * Constructor to load in spark SVM classifier + * + * @param classifierName classifier type + * @param skd an instance of spark driver + * @param svmSgdModel path to a trained model + */ + public Learner(String classifierName, SparkDriver skd, String svmSgdModel) { + if (classifierName.equals(SPARKSVM)) { + sc = skd.sc.sc(); + sc.addFile(svmSgdModel, true); + model = SVMModel.load(sc, svmSgdModel); + } + } + + /** + * Method of classifying instance + * + * @param p the instance that needs to be classified + * @return the class id + */ + public double classify(LabeledPoint p) { + return model.predict(p.features()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/SparkFormatter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/SparkFormatter.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/SparkFormatter.java new file mode 100644 index 0000000..ad61607 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/SparkFormatter.java @@ -0,0 +1,55 @@ +package org.apache.sdap.mudrod.ssearch.ranking; + +import java.io.*; +import java.text.DecimalFormat; + +public class SparkFormatter { + DecimalFormat NDForm = new DecimalFormat("#.###"); + + public SparkFormatter() { + } + + public void toSparkSVMformat(String inputCSVFileName, String outputTXTFileName) { + File file = new File(outputTXTFileName); + if (file.exists()) { + file.delete(); + } + try { + file.createNewFile(); + FileWriter fw = new FileWriter(outputTXTFileName); + BufferedWriter bw = new BufferedWriter(fw); + + BufferedReader br = new BufferedReader(new FileReader(inputCSVFileName)); + br.readLine(); + String line = br.readLine(); + while (line != null) { + String[] list = line.split(","); + String output = ""; + Double label = Double.parseDouble(list[list.length - 1].replace("\"", "")); + if (label == -1.0) { + output = "0 "; + } else if (label == 1.0) { + output = "1 "; + } + + for (int i = 0; i < list.length - 1; i++) { + int index = i + 1; + output += index + ":" + NDForm.format(Double.parseDouble(list[i].replace("\"", ""))) + " "; + } + bw.write(output + "\n"); + + line = br.readLine(); + } + br.close(); + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + SparkFormatter sf = new SparkFormatter(); + sf.toSparkSVMformat("C:/mudrodCoreTestData/rankingResults/inputDataForSVM.csv", "C:/mudrodCoreTestData/rankingResults/inputDataForSVM_spark.txt"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/SparkSVM.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/SparkSVM.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/SparkSVM.java new file mode 100644 index 0000000..0d0eb8d --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/SparkSVM.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch.ranking; + +import org.apache.sdap.mudrod.main.MudrodEngine; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.classification.SVMModel; +import org.apache.spark.mllib.classification.SVMWithSGD; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; + +public class SparkSVM { + + private SparkSVM() { + //public constructor + } + + public static void main(String[] args) { + MudrodEngine me = new MudrodEngine(); + + JavaSparkContext jsc = me.startSparkDriver().sc; + + String path = SparkSVM.class.getClassLoader().getResource("inputDataForSVM_spark.txt").toString(); + JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), path).toJavaRDD(); + + // Run training algorithm to build the model. + int numIterations = 100; + final SVMModel model = SVMWithSGD.train(data.rdd(), numIterations); + + // Save and load model + model.save(jsc.sc(), SparkSVM.class.getClassLoader().getResource("javaSVMWithSGDModel").toString()); + + jsc.sc().stop(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/TrainingImporter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/TrainingImporter.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/TrainingImporter.java new file mode 100644 index 0000000..2f27aa0 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/TrainingImporter.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch.ranking; + +import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilders; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports the ability to importing training set into Elasticsearch + */ +public class TrainingImporter extends MudrodAbstract { + /** + * + */ + private static final long serialVersionUID = 1L; + + public TrainingImporter(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + es.deleteAllByQuery(props.getProperty(MudrodConstants.ES_INDEX_NAME), "trainingranking", QueryBuilders.matchAllQuery()); + addMapping(); + } + + /** + * Method of adding mapping to traning set type + */ + public void addMapping() { + XContentBuilder Mapping; + try { + Mapping = jsonBuilder().startObject().startObject("trainingranking").startObject("properties").startObject("query").field("type", "string").field("index", "not_analyzed").endObject() + .startObject("dataID").field("type", "string").field("index", "not_analyzed").endObject().startObject("label").field("type", "string").field("index", "not_analyzed").endObject().endObject() + .endObject().endObject(); + + es.getClient().admin().indices().preparePutMapping(props.getProperty("indexName")).setType("trainingranking").setSource(Mapping).execute().actionGet(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Method of importing training set in to Elasticsearch + * + * @param dataFolder the path to the traing set + * @throws IOException IOException + */ + public void importTrainingSet(String dataFolder) throws IOException { + es.createBulkProcessor(); + + File[] files = new File(dataFolder).listFiles(); + for (File file : files) { + BufferedReader br = new BufferedReader(new FileReader(file.getAbsolutePath())); + br.readLine(); + String line = br.readLine(); + while (line != null) { + String[] list = line.split(","); + String query = file.getName().replace(".csv", ""); + if (list.length > 0) { + IndexRequest ir = new IndexRequest(props.getProperty("indexName"), "trainingranking") + .source(jsonBuilder().startObject().field("query", query).field("dataID", list[0]).field("label", list[list.length - 1]).endObject()); + es.getBulkProcessor().add(ir); + } + line = br.readLine(); + } + br.close(); + } + es.destroyBulkProcessor(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/package-info.java new file mode 100644 index 0000000..ebcc77d --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes classes for importing training data, ML models, + * generating input data for RankSVM, and evaluating ranking results + */ +package org.apache.sdap.mudrod.ssearch.ranking; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/structure/SResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/structure/SResult.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/structure/SResult.java new file mode 100644 index 0000000..cf94ddb --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/structure/SResult.java @@ -0,0 +1,183 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ssearch.structure; + +import java.lang.reflect.Field; + +/** + * Data structure class for search result + */ +public class SResult { + public static final String rlist[] = { "term_score", "releaseDate_score", /*"versionNum_score",*/ + "processingL_score", "allPop_score", "monthPop_score", "userPop_score"/*, "termAndv_score"*/ }; + String shortName = null; + String longName = null; + String topic = null; + String description = null; + String relase_date = null; + + public Double final_score = 0.0; + public Double term_score = 0.0; + public Double releaseDate_score = 0.0; + public Double versionNum_score = 0.0; + public Double processingL_score = 0.0; + public Double click_score = 0.0; + public Double allPop_score = 0.0; + public Double monthPop_score = 0.0; + public Double userPop_score = 0.0; + public Double termAndv_score = 0.0; + public Integer below = 0; + + public Double Dataset_LongName_score = null; + public Double Dataset_Metadata_score = null; + public Double DatasetParameter_Term_score = null; + public Double DatasetSource_Source_LongName_score = null; + public Double DatasetSource_Sensor_LongName_score = null; + + public String version = null; + public String processingLevel = null; + public String latency = null; + public String stopDateLong = null; + public String stopDateFormat = null; + public Double spatialR_Sat = null; + public Double spatialR_Grid = null; + public String temporalR = null; + + public Double releaseDate = null; + public Double click = null; + public Double term = null; + public Double versionNum = null; + public Double processingL = null; + public Double allPop = null; + public Double monthPop = null; + public Double userPop = null; + public Double termAndv = null; + + public Double Dataset_LongName = null; + public Double Dataset_Metadata = null; + public Double DatasetParameter_Term = null; + public Double DatasetSource_Source_LongName = null; + public Double DatasetSource_Sensor_LongName = null; + + public Double prediction = 0.0; + public String label = null; + + //add by quintinali + public String startDate; + public String endDate; + public String sensors; + + /** + * @param shortName short name of dataset + * @param longName long name of dataset + * @param topic topic of dataset + * @param description description of dataset + * @param date release date of dataset + */ + public SResult(String shortName, String longName, String topic, String description, String date) { + this.shortName = shortName; + this.longName = longName; + this.topic = topic; + this.description = description; + this.relase_date = date; + } + + public SResult(SResult sr) { + for (int i = 0; i < rlist.length; i++) { + set(this, rlist[i], get(sr, rlist[i])); + } + } + + /** + * Method of getting export header + * + * @param delimiter the delimiter used to separate strings + * @return header + */ + public static String getHeader(String delimiter) { + String str = ""; + for (int i = 0; i < rlist.length; i++) { + str += rlist[i] + delimiter; + } + str = str + "label" + "\n"; + return "ShortName" + delimiter + "below" + delimiter + str; + } + + /** + * Method of get a search results as string + * + * @param delimiter the delimiter used to separate strings + * @return search result as string + */ + public String toString(String delimiter) { + String str = ""; + for (int i = 0; i < rlist.length; i++) { + double score = get(this, rlist[i]); + str += score + delimiter; + } + str = str + label + "\n"; + return shortName + delimiter + below + delimiter + str; + } + + /** + * Generic setter method + * + * @param object instance of SResult + * @param fieldName field name that needs to be set on + * @param fieldValue field value that needs to be set to + * @return 1 means success, and 0 otherwise + */ + public static boolean set(Object object, String fieldName, Object fieldValue) { + Class<?> clazz = object.getClass(); + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(object, fieldValue); + return true; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + return false; + } + + /** + * Generic getter method + * + * @param object instance of SResult + * @param fieldName field name of search result + * @param <V> data type + * @return the value of the filed in the object + */ + @SuppressWarnings("unchecked") + public static <V> V get(Object object, String fieldName) { + Class<?> clazz = object.getClass(); + while (clazz != null) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return (V) field.get(object); + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ssearch/structure/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/structure/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/structure/package-info.java new file mode 100644 index 0000000..1069390 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/structure/package-info.java @@ -0,0 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes data structure needed for ranking process + */ +package org.apache.sdap.mudrod.ssearch.structure; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/ESTransportClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/ESTransportClient.java b/core/src/main/java/org/apache/sdap/mudrod/utils/ESTransportClient.java new file mode 100644 index 0000000..9f53586 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/ESTransportClient.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sdap.mudrod.utils; + +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.percolator.PercolatorPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.mustache.MustachePlugin; +import org.elasticsearch.transport.Netty3Plugin; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * A builder to create an instance of {@link TransportClient} This class + * pre-installs the {@link Netty3Plugin}, for the client. These plugins are all + * elasticsearch core modules required. + */ +@SuppressWarnings({ "unchecked", "varargs" }) +public class ESTransportClient extends TransportClient { + + private static final Collection<Class<? extends Plugin>> PRE_INSTALLED_PLUGINS = Collections + .unmodifiableList(Arrays.asList(ReindexPlugin.class, PercolatorPlugin.class, MustachePlugin.class, Netty3Plugin.class)); + + @SafeVarargs + public ESTransportClient(Settings settings, Class<? extends Plugin>... plugins) { + this(settings, Arrays.asList(plugins)); + } + + public ESTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) { + super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS), null); + + } + + @Override + public void close() { + super.close(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/HttpRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/HttpRequest.java b/core/src/main/java/org/apache/sdap/mudrod/utils/HttpRequest.java new file mode 100644 index 0000000..6b623f6 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/HttpRequest.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; + +/** + * ClassName: HttpRequest + * Function: Http request tool. + */ +public class HttpRequest { + + private static final Logger LOG = LoggerFactory.getLogger(HttpRequest.class); + + public HttpRequest() { + } + + public String getRequest(String requestUrl) { + String line = null; + try { + URL url = new URL(requestUrl); + + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setDoOutput(true); + + connection.setConnectTimeout(5000); + connection.setReadTimeout(5000); + int code = connection.getResponseCode(); + if (code != HttpURLConnection.HTTP_OK) { + line = "{\"exception\":\"Service failed\"}"; + LOG.info(line); + } else { + InputStream content = connection.getInputStream(); + BufferedReader in = new BufferedReader(new InputStreamReader(content)); + line = in.readLine(); + } + } catch (Exception e) { + line = "{\"exception\":\"No service was found\"}"; + LOG.error(line); + } + return line; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/LabeledRowMatrix.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/LabeledRowMatrix.java b/core/src/main/java/org/apache/sdap/mudrod/utils/LabeledRowMatrix.java new file mode 100644 index 0000000..8f00df6 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/LabeledRowMatrix.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +import org.apache.spark.mllib.linalg.distributed.RowMatrix; + +import java.util.List; + +/** + * ClassName: LabeledRowMatrix + * Function: LabeledRowMatrix strut. + */ +public class LabeledRowMatrix { + + // words: matrix row titles + public List<String> rowkeys; + // docs: matrix column titles + public List<String> colkeys; + // wordDocMatrix: a matrix in which each row is corresponding to a term and + // each column is a doc. + public RowMatrix rowMatrix; + + public LabeledRowMatrix() { + // TODO Auto-generated constructor stub + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java b/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java new file mode 100644 index 0000000..3245da6 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java @@ -0,0 +1,192 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * ClassName: LinkageTriple Function: Vocabulary linkage operations + */ +public class LinkageTriple implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + // keyAId: ID of term A + public long keyAId; + // keyBId: ID of term B + public long keyBId; + // weight: The relationship between term A and Term B + public double weight; + // keyA: TermA + public String keyA; + // keyB: TermB + public String keyB; + // df: Format number + public static DecimalFormat df = new DecimalFormat("#.00"); + + public LinkageTriple() { + // TODO Auto-generated constructor stub + } + + /** + * TODO Output linkage triples in string format. + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return keyA + "," + keyB + ":" + weight; + } + + public static void insertTriples(ESDriver es, List<LinkageTriple> triples, String index, String type) throws IOException { + LinkageTriple.insertTriples(es, triples, index, type, false, false); + } + + public static void insertTriples(ESDriver es, List<LinkageTriple> triples, String index, String type, Boolean bTriple, boolean bSymmetry) throws IOException { + es.deleteType(index, type); + if (bTriple) { + LinkageTriple.addMapping(es, index, type); + } + + if (triples == null) { + return; + } + + es.createBulkProcessor(); + int size = triples.size(); + for (int i = 0; i < size; i++) { + + XContentBuilder jsonBuilder = jsonBuilder().startObject(); + if (bTriple) { + + jsonBuilder.field("concept_A", triples.get(i).keyA); + jsonBuilder.field("concept_B", triples.get(i).keyB); + + } else { + jsonBuilder.field("keywords", triples.get(i).keyA + "," + triples.get(i).keyB); + } + + jsonBuilder.field("weight", Double.parseDouble(df.format(triples.get(i).weight))); + jsonBuilder.endObject(); + + IndexRequest ir = new IndexRequest(index, type).source(jsonBuilder); + es.getBulkProcessor().add(ir); + + if (bTriple && bSymmetry) { + XContentBuilder symmetryJsonBuilder = jsonBuilder().startObject(); + symmetryJsonBuilder.field("concept_A", triples.get(i).keyB); + symmetryJsonBuilder.field("concept_B", triples.get(i).keyA); + + symmetryJsonBuilder.field("weight", Double.parseDouble(df.format(triples.get(i).weight))); + + symmetryJsonBuilder.endObject(); + + IndexRequest symmetryir = new IndexRequest(index, type).source(symmetryJsonBuilder); + es.getBulkProcessor().add(symmetryir); + } + } + es.destroyBulkProcessor(); + } + + public static void addMapping(ESDriver es, String index, String type) { + XContentBuilder Mapping; + try { + Mapping = jsonBuilder().startObject().startObject(type).startObject("properties").startObject("concept_A").field("type", "string").field("index", "not_analyzed").endObject() + .startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject() + + .endObject().endObject().endObject(); + + es.getClient().admin().indices().preparePutMapping(index).setType(type).setSource(Mapping).execute().actionGet(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static void standardTriples(ESDriver es, String index, String type) throws IOException { + es.createBulkProcessor(); + + SearchResponse sr = es.getClient().prepareSearch(index).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0) + .addAggregation(AggregationBuilders.terms("concepts").field("concept_A").size(0)).execute().actionGet(); + Terms concepts = sr.getAggregations().get("concepts"); + + for (Terms.Bucket entry : concepts.getBuckets()) { + String concept = (String) entry.getKey(); + double maxSim = LinkageTriple.getMaxSimilarity(es, index, type, concept); + if (maxSim == 1.0) { + continue; + } + + SearchResponse scrollResp = es.getClient().prepareSearch(index).setTypes(type).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.termQuery("concept_A", concept)) + .addSort("weight", SortOrder.DESC).setSize(100).execute().actionGet(); + + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> metadata = hit.getSource(); + double sim = (double) metadata.get("weight"); + double newSim = sim / maxSim; + UpdateRequest ur = es.generateUpdateRequest(index, type, hit.getId(), "weight", Double.parseDouble(df.format(newSim))); + es.getBulkProcessor().add(ur); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + } + + es.destroyBulkProcessor(); + } + + private static double getMaxSimilarity(ESDriver es, String index, String type, String concept) { + + double maxSim = 1.0; + SearchRequestBuilder builder = es.getClient().prepareSearch(index).setTypes(type).setQuery(QueryBuilders.termQuery("concept_A", concept)).addSort("weight", SortOrder.DESC).setSize(1); + + SearchResponse usrhis = builder.execute().actionGet(); + SearchHit[] hits = usrhis.getHits().getHits(); + if (hits.length == 1) { + SearchHit hit = hits[0]; + Map<String, Object> result = hit.getSource(); + maxSim = (double) result.get("weight"); + } + + if (maxSim == 0.0) { + maxSim = 1.0; + } + + return maxSim; + } +}
