http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyLinkCal.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyLinkCal.java b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyLinkCal.java new file mode 100644 index 0000000..eb6aeff --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyLinkCal.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ontology.process; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilders; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to parse and process FTP and HTTP log files + */ +public class OntologyLinkCal extends DiscoveryStepAbstract { + + public OntologyLinkCal(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + es.deleteAllByQuery(props.getProperty("indexName"), props.getProperty("ontologyLinkageType"), QueryBuilders.matchAllQuery()); + addSWEETMapping(); + } + + /** + * Method of adding mapping for triples extracted from SWEET + */ + public void addSWEETMapping() { + XContentBuilder Mapping; + try { + Mapping = jsonBuilder().startObject().startObject(props.getProperty("ontologyLinkageType")).startObject("properties").startObject("concept_A").field("type", "string") + .field("index", "not_analyzed").endObject().startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject() + + .endObject().endObject().endObject(); + + es.getClient().admin().indices().preparePutMapping(props.getProperty("indexName")).setType(props.getProperty("ontologyLinkageType")).setSource(Mapping).execute().actionGet(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Method of calculating and importing SWEET triples into Elasticsearch + */ + @Override + public Object execute() { + es.deleteType(props.getProperty("indexName"), props.getProperty("ontologyLinkageType")); + es.createBulkProcessor(); + + BufferedReader br = null; + String line = ""; + double weight = 0; + + try { + br = new BufferedReader(new FileReader(props.getProperty("oceanTriples"))); + while ((line = br.readLine()) != null) { + String[] strList = line.toLowerCase().split(","); + if (strList[1].equals("subclassof")) { + weight = 0.75; + } else { + weight = 0.9; + } + + IndexRequest ir = new IndexRequest(props.getProperty("indexName"), props.getProperty("ontologyLinkageType")).source( + jsonBuilder().startObject().field("concept_A", es.customAnalyzing(props.getProperty("indexName"), strList[2])) + .field("concept_B", es.customAnalyzing(props.getProperty("indexName"), strList[0])).field("weight", weight).endObject()); + es.getBulkProcessor().add(ir); + + } + + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } finally { + if (br != null) { + try { + br.close(); + es.destroyBulkProcessor(); + es.refreshIndex(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyParser.java b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyParser.java new file mode 100644 index 0000000..6e2a5f2 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyParser.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ontology.process; + +import org.apache.jena.ontology.OntClass; +import org.apache.jena.ontology.OntModel; +import org.apache.sdap.mudrod.ontology.Ontology; + +import java.util.Iterator; + +/** + * Interface for specific ontology parsers e.g. .ttl, RDFXML, + * etc. + */ +public interface OntologyParser { + + /** + * An ontology model (RDF graph) to parse for literals. + * + * @param ont the associated {@link org.apache.sdap.mudrod.ontology.Ontology} + * implementation processing the ontology operation(s). + * @param ontModel the {@link org.apache.jena.ontology.OntModel} + */ + public void parse(Ontology ont, OntModel ontModel); + + /** + * An ontology model (RDF graph) for which to obtain an + * {@link java.util.Iterator} instance of all root classes. + * + * @param ontModel the {@link org.apache.jena.ontology.OntModel} + * @return an {@link java.util.Iterator} instance containing all root classes. + */ + public Iterator<OntClass> rootClasses(OntModel ontModel); + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OwlParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OwlParser.java b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OwlParser.java new file mode 100644 index 0000000..2332c81 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OwlParser.java @@ -0,0 +1,170 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.ontology.process; + +import org.apache.jena.ontology.Individual; +import org.apache.jena.ontology.OntClass; +import org.apache.jena.ontology.OntModel; +import org.apache.jena.rdf.model.Literal; +import org.apache.sdap.mudrod.ontology.Ontology; + +import com.esotericsoftware.minlog.Log; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * {@link org.apache.sdap.mudrod.ontology.process.OntologyParser} + * implementation for <a href="http://www.w3.org/TR/owl-features/">W3C OWL</a> + * files. + */ +public class OwlParser implements OntologyParser { + + private Ontology ont; + private List<OntClass> roots = new ArrayList<>(); + + public OwlParser() { + //default constructor + } + + /** + * Parse OWL ontology files using Apache Jena + */ + @Override + public void parse(Ontology ont, OntModel m) { + this.ont = ont; + for (Iterator<OntClass> i = rootClasses(m); i.hasNext(); ) { + OntClass c = i.next(); + + //dont deal with anonymous classes + if (c.isAnon()) { + continue; + } + + parseClass(c, new ArrayList<>(), 0); + } + } + + protected void parseClass(OntClass cls, List<Object> occurs, int depth) { + //dont deal with anonymous classes + if (cls.isAnon()) { + return; + } + + //add cls to Ontology searchterms + //list labels + Iterator<?> labelIter = cls.listLabels(null); + //if has no labels + if (!labelIter.hasNext()) { + //add rdf:ID as a label + cls.addLabel(rdfidToLabel(cls.getLocalName()), null); + } + //reset the label iterator + labelIter = cls.listLabels(null); + + while (labelIter.hasNext()) { + Literal l = (Literal) labelIter.next(); + ((LocalOntology) ont).addSearchTerm(l.toString(), cls); + } + + // recurse to the next level down + if (cls.canAs(OntClass.class) && !occurs.contains(cls)) { + //list subclasses + for (Iterator<?> i = cls.listSubClasses(true); i.hasNext(); ) { + OntClass sub = (OntClass) i.next(); + + // we push this expression on the occurs list before we recurse + occurs.add(cls); + parseClass(sub, occurs, depth + 1); + occurs.remove(cls); + } + + //list instances + for (Iterator<?> i = cls.listInstances(); i.hasNext(); ) { + //add search terms for each instance + + //list labels + Individual individual = (Individual) i.next(); + for (Iterator<?> j = individual.listLabels(null); j.hasNext(); ) { + Literal l = (Literal) j.next(); + ((LocalOntology) ont).addSearchTerm(l.toString(), individual); + } + } + } + } + + /** + * Parses out all root classes of the given + * {@link org.apache.jena.ontology.OntModel} + * @param m the {@link org.apache.jena.ontology.OntModel} we wish to obtain + * all root classes for. + * @return an {@link java.util.Iterator} of {@link org.apache.jena.ontology.OntClass} + * elements representing all root classes. + */ + @Override + public Iterator<OntClass> rootClasses(OntModel m) { + Iterator<?> i = m.listClasses(); + if (i.hasNext() && i.next() instanceof OntClass) { + //assume ontology has root classes + processSingle(m); + } else { + //check for presence of aggregate/collection ontologies such as sweetAll.owl + processCollection(m); + } + + return roots.iterator(); + } + + private void processSingle(OntModel m) { + for (Iterator<?> i = m.listClasses(); i.hasNext(); ) { + OntClass c = (OntClass) i.next(); + try { + // too confusing to list all the restrictions as root classes + if (c.isAnon()) { + continue; + } + + if (c.hasSuperClass(m.getProfile().THING(), true) || c.getCardinality(m.getProfile().SUB_CLASS_OF()) == 0) { + // this class is directly descended from Thing + roots.add(c); + } + } catch (Exception e) { + Log.error("Error during extraction or root Classes from Ontology Model: ", e); + } + } + } + + private void processCollection(OntModel m) { + for (Iterator<?> i = m.listSubModels(true); i.hasNext(); ) { + OntModel ontModel = (OntModel) i.next(); + processSingle(ontModel); + } + } + + public String rdfidToLabel(String idString) { + Pattern p = Pattern.compile("([a-z0-9])([A-Z])"); + Matcher m = p.matcher(idString); + + String labelString = idString; + while (m.find()) { + labelString = labelString.replaceAll(m.group(1) + m.group(2), m.group(1) + " " + m.group(2)); + } + return labelString; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/ontology/process/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/ontology/process/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/package-info.java new file mode 100644 index 0000000..832bfbd --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/package-info.java @@ -0,0 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes ontology processing classes. + */ +package org.apache.sdap.mudrod.ontology.process; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/package-info.java new file mode 100644 index 0000000..4c6bbb1 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes the preprocessing, processing, and data structure used + * by recommendation module. + */ +package org.apache.sdap.mudrod.recommendation; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/ImportMetadata.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/ImportMetadata.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/ImportMetadata.java new file mode 100644 index 0000000..7bb1d22 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/ImportMetadata.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.recommendation.pre; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import org.apache.commons.io.IOUtils; +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.metadata.pre.ApiHarvester; +import org.elasticsearch.action.index.IndexRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Properties; + +/** + * ClassName: Import Metadata to elasticsearch + */ + +public class ImportMetadata extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(ApiHarvester.class); + + public ImportMetadata(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting Metadata Harvesting"); + startTime = System.currentTimeMillis(); + addMetadataMapping(); + importToES(); + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("Finished Metadata Harvesting time elapsed: {}s", (endTime - startTime) / 1000); + return null; + } + + /** + * addMetadataMapping: Add mapping to index metadata in Elasticsearch. Please + * invoke this method before import metadata to Elasticsearch. + */ + public void addMetadataMapping() { + String mappingJson = "{\r\n \"dynamic_templates\": " + "[\r\n " + "{\r\n \"strings\": " + "{\r\n \"match_mapping_type\": \"string\"," + + "\r\n \"mapping\": {\r\n \"type\": \"string\"," + "\r\n \"analyzer\": \"csv\"\r\n }" + "\r\n }\r\n }\r\n ]\r\n}"; + + es.getClient().admin().indices().preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType(props.getProperty("recom_metadataType")).setSource(mappingJson).execute().actionGet(); + + } + + /** + * importToES: Index metadata into elasticsearch from local file directory. + * Please make sure metadata have been harvest from web service before + * invoking this method. + */ + private void importToES() { + es.deleteType(props.getProperty("indexName"), props.getProperty("recom_metadataType")); + + es.createBulkProcessor(); + File directory = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH)); + File[] fList = directory.listFiles(); + for (File file : fList) { + InputStream is; + try { + is = new FileInputStream(file); + try { + String jsonTxt = IOUtils.toString(is); + JsonParser parser = new JsonParser(); + JsonElement item = parser.parse(jsonTxt); + IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty("recom_metadataType")).source(item.toString()); + + // preprocessdata + + es.getBulkProcessor().add(ir); + } catch (IOException e) { + e.printStackTrace(); + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + + } + + es.destroyBulkProcessor(); + } + + @Override + public Object execute(Object o) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java new file mode 100644 index 0000000..5204b80 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java @@ -0,0 +1,100 @@ +/** + * Project Name:mudrod-core + * File Name:TFIDFGenerator.java + * Package Name:org.apache.sdap.mudrod.recommendation.pre + * Date:Aug 22, 201612:39:52 PM + * Copyright (c) 2016, [email protected] All Rights Reserved. + */ + +package org.apache.sdap.mudrod.recommendation.pre; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.recommendation.structure.MetadataOpt; +import org.apache.sdap.mudrod.utils.LabeledRowMatrix; +import org.apache.sdap.mudrod.utils.MatrixUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * ClassName: Generate TFIDF information of all metadata + */ +public class MetadataTFIDFGenerator extends DiscoveryStepAbstract { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(MetadataTFIDFGenerator.class); + + /** + * Creates a new instance of MatrixGenerator. + * + * @param props the Mudrod configuration + * @param es the Elasticsearch drive + * @param spark the spark drive + */ + public MetadataTFIDFGenerator(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + + LOG.info("Starting Dataset TF_IDF Matrix Generator"); + startTime = System.currentTimeMillis(); + try { + generateWordBasedTFIDF(); + } catch (Exception e) { + LOG.error("Error during Dataset TF_IDF Matrix Generation: {}", e); + } + endTime = System.currentTimeMillis(); + + LOG.info("Dataset TF_IDF Matrix Generation complete, time elaspsed: {}s", (endTime - startTime) / 1000); + + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + public LabeledRowMatrix generateWordBasedTFIDF() throws Exception { + + MetadataOpt opt = new MetadataOpt(props); + + JavaPairRDD<String, String> metadataContents = opt.loadAll(es, spark); + + JavaPairRDD<String, List<String>> metadataWords = opt.tokenizeData(metadataContents, " "); + + LabeledRowMatrix wordtfidfMatrix = opt.tFIDFTokens(metadataWords, spark); + + MatrixUtil.exportToCSV(wordtfidfMatrix.rowMatrix, wordtfidfMatrix.rowkeys, wordtfidfMatrix.colkeys, props.getProperty("metadata_word_tfidf_matrix")); + + return wordtfidfMatrix; + } + + public LabeledRowMatrix generateTermBasedTFIDF() throws Exception { + + MetadataOpt opt = new MetadataOpt(props); + + List<String> variables = new ArrayList<>(); + variables.add("DatasetParameter-Term"); + variables.add("DatasetParameter-Variable"); + variables.add("Dataset-ExtractTerm"); + + JavaPairRDD<String, String> metadataContents = opt.loadAll(es, spark, variables); + + JavaPairRDD<String, List<String>> metadataTokens = opt.tokenizeData(metadataContents, ","); + + LabeledRowMatrix tokentfidfMatrix = opt.tFIDFTokens(metadataTokens, spark); + + MatrixUtil.exportToCSV(tokentfidfMatrix.rowMatrix, tokentfidfMatrix.rowkeys, tokentfidfMatrix.colkeys, props.getProperty("metadata_term_tfidf_matrix")); + + return tokentfidfMatrix; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeVariables.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeVariables.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeVariables.java new file mode 100644 index 0000000..fa734c9 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeVariables.java @@ -0,0 +1,223 @@ +package org.apache.sdap.mudrod.recommendation.pre; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + +public class NormalizeVariables extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(NormalizeVariables.class); + // index name + private String indexName; + // type name of metadata in ES + private String metadataType; + + /** + * Creates a new instance of OHEncoder. + * + * @param props the Mudrod configuration + * @param es an instantiated {@link ESDriver} + * @param spark an instantiated {@link SparkDriver} + */ + public NormalizeVariables(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + indexName = props.getProperty("indexName"); + metadataType = props.getProperty("recom_metadataType"); + } + + @Override + public Object execute() { + LOG.info("*****************processing metadata variables starts******************"); + startTime = System.currentTimeMillis(); + + normalizeMetadataVariables(es); + + endTime = System.currentTimeMillis(); + LOG.info("*****************processing metadata variables ends******************Took {}s", (endTime - startTime) / 1000); + + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + public void normalizeMetadataVariables(ESDriver es) { + + es.createBulkProcessor(); + + SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute() + .actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> metadata = hit.getSource(); + Map<String, Object> updatedValues = new HashMap<>(); + + this.normalizeSpatialVariables(metadata, updatedValues); + this.normalizeTemporalVariables(metadata, updatedValues); + this.normalizeOtherVariables(metadata, updatedValues); + + UpdateRequest ur = es.generateUpdateRequest(indexName, metadataType, hit.getId(), updatedValues); + es.getBulkProcessor().add(ur); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + es.destroyBulkProcessor(); + } + + private void normalizeOtherVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) { + String shortname = (String) metadata.get("Dataset-ShortName"); + double versionNUm = getVersionNum(shortname); + updatedValues.put("Dataset-Derivative-VersionNum", versionNUm); + + } + + private Double getVersionNum(String version) { + if (version == null) { + return 0.0; + } + Double versionNum = 0.0; + Pattern p = Pattern.compile(".*[a-zA-Z].*"); + if ("Operational/Near-Real-Time".equals(version)) { + versionNum = 2.0; + } else if (version.matches("[0-9]{1}[a-zA-Z]{1}")) { + versionNum = Double.parseDouble(version.substring(0, 1)); + } else if (p.matcher(version).find()) { + versionNum = 0.0; + } else { + versionNum = Double.parseDouble(version); + if (versionNum >= 5) { + versionNum = 20.0; + } + } + return versionNum; + } + + private void normalizeSpatialVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) { + + // get spatial resolution + Double spatialR; + if (metadata.get("Dataset-SatelliteSpatialResolution") != null) { + spatialR = (Double) metadata.get("Dataset-SatelliteSpatialResolution"); + } else { + Double gridR = (Double) metadata.get("Dataset-GridSpatialResolution"); + if (gridR != null) { + spatialR = 111 * gridR; + } else { + spatialR = 25.0; + } + } + updatedValues.put("Dataset-Derivative-SpatialResolution", spatialR); + + // Transform Longitude and calculate coverage area + double top = parseDouble((String) metadata.get("DatasetCoverage-NorthLat")); + double bottom = parseDouble((String) metadata.get("DatasetCoverage-SouthLat")); + double left = parseDouble((String) metadata.get("DatasetCoverage-WestLon")); + double right = parseDouble((String) metadata.get("DatasetCoverage-EastLon")); + + if (left > 180) { + left = left - 360; + } + + if (right > 180) { + right = right - 360; + } + + if (left == right) { + left = -180; + right = 180; + } + + double area = (top - bottom) * (right - left); + + updatedValues.put("DatasetCoverage-Derivative-EastLon", right); + updatedValues.put("DatasetCoverage-Derivative-WestLon", left); + updatedValues.put("DatasetCoverage-Derivative-NorthLat", top); + updatedValues.put("DatasetCoverage-Derivative-SouthLat", bottom); + updatedValues.put("DatasetCoverage-Derivative-Area", area); + + // get processing level + String processingLevel = (String) metadata.get("Dataset-ProcessingLevel"); + double dProLevel = this.getProLevelNum(processingLevel); + updatedValues.put("Dataset-Derivative-ProcessingLevel", dProLevel); + } + + private void normalizeTemporalVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) { + + String trStr = (String) metadata.get("Dataset-TemporalResolution"); + if ("".equals(trStr)) { + trStr = (String) metadata.get("Dataset-TemporalRepeat"); + } + + updatedValues.put("Dataset-Derivative-TemporalResolution", covertTimeUnit(trStr)); + } + + private Double covertTimeUnit(String str) { + Double timeInHour; + if (str.contains("Hour")) { + timeInHour = Double.parseDouble(str.split(" ")[0]); + } else if (str.contains("Day")) { + timeInHour = Double.parseDouble(str.split(" ")[0]) * 24; + } else if (str.contains("Week")) { + timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7; + } else if (str.contains("Month")) { + timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7 * 30; + } else if (str.contains("Year")) { + timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7 * 30 * 365; + } else { + timeInHour = 0.0; + } + + return timeInHour; + } + + public Double getProLevelNum(String pro) { + if (pro == null) { + return 1.0; + } + Double proNum = 0.0; + Pattern p = Pattern.compile(".*[a-zA-Z].*"); + if (pro.matches("[0-9]{1}[a-zA-Z]{1}")) { + proNum = Double.parseDouble(pro.substring(0, 1)); + } else if (p.matcher(pro).find()) { + proNum = 1.0; + } else { + proNum = Double.parseDouble(pro); + } + + return proNum; + } + + private double parseDouble(String strNumber) { + if (strNumber != null && strNumber.length() > 0) { + try { + return Double.parseDouble(strNumber); + } catch (Exception e) { + return -1; + } + } else + return 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/SessionCooccurence.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/SessionCooccurence.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/SessionCooccurence.java new file mode 100644 index 0000000..514b044 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/SessionCooccurence.java @@ -0,0 +1,152 @@ +/** + * Project Name:mudrod-core + * File Name:SessionCooccurenceMatrix.java + * Package Name:org.apache.sdap.mudrod.recommendation.pre + * Date:Aug 19, 20163:06:33 PM + * Copyright (c) 2016, [email protected] All Rights Reserved. + */ + +package org.apache.sdap.mudrod.recommendation.pre; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.utils.LabeledRowMatrix; +import org.apache.sdap.mudrod.utils.MatrixUtil; +import org.apache.sdap.mudrod.weblog.structure.SessionExtractor; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.PairFunction; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.*; + +/** + * ClassName: SessionCooccurenceMatrix Function: Generate metadata session + * coocucurence matrix from web logs. Each row in the matrix is corresponding to + * a metadata, and each column is a session. + */ +public class SessionCooccurence extends DiscoveryStepAbstract { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SessionCooccurence.class); + + /** + * Creates a new instance of SessionCooccurence. + * + * @param props + * the Mudrod configuration + * @param es + * the Elasticsearch drive + * @param spark + * the spark driver + */ + public SessionCooccurence(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + + LOG.info("Starting dataset session-based similarity generation..."); + + startTime = System.currentTimeMillis(); + + // get all metadata session cooccurance data + SessionExtractor extractor = new SessionExtractor(); + JavaPairRDD<String, List<String>> sessionDatasetRDD = extractor.bulidSessionDatasetRDD(props, es, spark); + + // remove retired datasets + JavaPairRDD<String, List<String>> sessionFiltedDatasetsRDD = removeRetiredDataset(es, sessionDatasetRDD); + LabeledRowMatrix datasetSessionMatrix = MatrixUtil.createWordDocMatrix(sessionFiltedDatasetsRDD); + + // export + MatrixUtil.exportToCSV(datasetSessionMatrix.rowMatrix, datasetSessionMatrix.rowkeys, datasetSessionMatrix.colkeys, props.getProperty("session_metadata_Matrix")); + + endTime = System.currentTimeMillis(); + + LOG.info("Completed dataset session-based similarity generation. Time elapsed: {}s", (endTime - startTime) / 1000); + + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + /** + * filter out-of-data metadata + * + * @param es + * the Elasticsearch drive + * @param userDatasetsRDD + * dataset extracted from session + * @return filtered session datasets + */ + public JavaPairRDD<String, List<String>> removeRetiredDataset(ESDriver es, JavaPairRDD<String, List<String>> userDatasetsRDD) { + + Map<String, String> nameMap = this.getOnServiceMetadata(es); + + return userDatasetsRDD.mapToPair(new PairFunction<Tuple2<String, List<String>>, String, List<String>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, List<String>> call(Tuple2<String, List<String>> arg0) throws Exception { + List<String> oriDatasets = arg0._2; + List<String> newDatasets = new ArrayList<>(); + int size = oriDatasets.size(); + for (int i = 0; i < size; i++) { + String name = oriDatasets.get(i); + if (nameMap.containsKey(name)) { + newDatasets.add(nameMap.get(name)); + } + } + return new Tuple2<>(arg0._1, newDatasets); + } + }); + + } + + /** + * getMetadataNameMap: Get on service metadata names, key is lowcase of short + * name and value is the original short name + * + * @param es + * the elasticsearch client + * @return a map from lower case metadata name to original metadata name + */ + private Map<String, String> getOnServiceMetadata(ESDriver es) { + + String indexName = props.getProperty(MudrodConstants.ES_INDEX_NAME); + String metadataType = props.getProperty("recom_metadataType"); + + Map<String, String> shortnameMap = new HashMap<>(); + SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute() + .actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> metadata = hit.getSource(); + String shortName = (String) metadata.get("Dataset-ShortName"); + shortnameMap.put(shortName.toLowerCase(), shortName); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + return shortnameMap; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/package-info.java new file mode 100644 index 0000000..4c95ade --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/package-info.java @@ -0,0 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes the preprocessing required by recommendation module. + */ +package org.apache.sdap.mudrod.recommendation.pre; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/AbstractBasedSimilarity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/AbstractBasedSimilarity.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/AbstractBasedSimilarity.java new file mode 100644 index 0000000..a3a2c9e --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/AbstractBasedSimilarity.java @@ -0,0 +1,74 @@ +/** + * Project Name:mudrod-core + * File Name:TopicBasedCF.java + * Package Name:org.apache.sdap.mudrod.recommendation.process + * Date:Aug 22, 201610:45:55 AM + * Copyright (c) 2016, [email protected] All Rights Reserved. + */ + +package org.apache.sdap.mudrod.recommendation.process; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.semantics.SVDAnalyzer; +import org.apache.sdap.mudrod.utils.LinkageTriple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Properties; + +/** + * ClassName: Recommend metedata based on data content semantic similarity + */ +public class AbstractBasedSimilarity extends DiscoveryStepAbstract { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractBasedSimilarity.class); + + /** + * Creates a new instance of TopicBasedCF. + * + * @param props the Mudrod configuration + * @param es the Elasticsearch client + * @param spark the spark drive + */ + public AbstractBasedSimilarity(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + + LOG.info("*****************abstract similarity calculation starts******************"); + startTime = System.currentTimeMillis(); + + try { + /*String topicMatrixFile = props.getProperty("metadata_term_tfidf_matrix"); + SemanticAnalyzer analyzer = new SemanticAnalyzer(props, es, spark); + List<LinkageTriple> triples = analyzer + .calTermSimfromMatrix(topicMatrixFile); + analyzer.saveToES(triples, props.getProperty("indexName"), + props.getProperty("metadataTermTFIDFSimType"), true, true);*/ + + // for comparison + SVDAnalyzer svd = new SVDAnalyzer(props, es, spark); + svd.getSVDMatrix(props.getProperty("metadata_word_tfidf_matrix"), 150, props.getProperty("metadata_word_tfidf_matrix")); + List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty("metadata_word_tfidf_matrix")); + svd.saveToES(tripleList, props.getProperty("indexName"), props.getProperty("metadataWordTFIDFSimType"), true, true); + + } catch (Exception e) { + e.printStackTrace(); + } + + endTime = System.currentTimeMillis(); + LOG.info("*****************abstract similarity calculation ends******************Took {}s", (endTime - startTime) / 1000); + + return null; + } + + @Override + public Object execute(Object o) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/VariableBasedSimilarity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/VariableBasedSimilarity.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/VariableBasedSimilarity.java new file mode 100644 index 0000000..a12c236 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/VariableBasedSimilarity.java @@ -0,0 +1,380 @@ +package org.apache.sdap.mudrod.recommendation.process; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.*; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +public class VariableBasedSimilarity extends DiscoveryStepAbstract implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(VariableBasedSimilarity.class); + + private DecimalFormat df = new DecimalFormat("#.000"); + // a map from variable to its type + public Map<String, Integer> variableTypes; + public Map<String, Integer> variableWeights; + + private static final Integer VAR_SPATIAL = 1; + private static final Integer VAR_TEMPORAL = 2; + private static final Integer VAR_CATEGORICAL = 3; + private static final Integer VAR_ORDINAL = 4; + + // index name + private String indexName; + // type name of metadata in ES + private String metadataType; + private String variableSimType; + + /** + * Creates a new instance of OHEncoder. + * + * @param props the Mudrod configuration + * @param es an instantiated {@link ESDriver} + * @param spark an instantiated {@link SparkDriver} + */ + public VariableBasedSimilarity(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + + indexName = props.getProperty("indexName"); + metadataType = props.getProperty("recom_metadataType"); + variableSimType = props.getProperty("metadataCodeSimType"); + this.inital(); + } + + @Override + public Object execute() { + LOG.info("*****************calculating metadata variables based similarity starts******************"); + startTime = System.currentTimeMillis(); + es.deleteType(indexName, variableSimType); + addMapping(es, indexName, variableSimType); + + VariableBasedSimilarity(es); + es.refreshIndex(); + normalizeVariableWeight(es); + es.refreshIndex(); + endTime = System.currentTimeMillis(); + LOG.info("*****************calculating metadata variables based similarity ends******************Took {}s", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + public void inital() { + this.initVariableType(); + this.initVariableWeight(); + } + + private void initVariableType() { + variableTypes = new HashMap<>(); + + variableTypes.put("DatasetParameter-Variable", VAR_CATEGORICAL); + variableTypes.put("DatasetRegion-Region", VAR_CATEGORICAL); + variableTypes.put("Dataset-ProjectionType", VAR_CATEGORICAL); + variableTypes.put("Dataset-ProcessingLevel", VAR_CATEGORICAL); + variableTypes.put("DatasetParameter-Topic", VAR_CATEGORICAL); + variableTypes.put("DatasetParameter-Term", VAR_CATEGORICAL); + variableTypes.put("DatasetParameter-Category", VAR_CATEGORICAL); + variableTypes.put("DatasetPolicy-DataFormat", VAR_CATEGORICAL); + variableTypes.put("Collection-ShortName", VAR_CATEGORICAL); + variableTypes.put("DatasetSource-Source-Type", VAR_CATEGORICAL); + variableTypes.put("DatasetSource-Source-ShortName", VAR_CATEGORICAL); + variableTypes.put("DatasetSource-Sensor-ShortName", VAR_CATEGORICAL); + variableTypes.put("DatasetPolicy-Availability", VAR_CATEGORICAL); + variableTypes.put("Dataset-Provider-ShortName", VAR_CATEGORICAL); + + variableTypes.put("Dataset-Derivative-ProcessingLevel", VAR_ORDINAL); + variableTypes.put("Dataset-Derivative-TemporalResolution", VAR_ORDINAL); + variableTypes.put("Dataset-Derivative-SpatialResolution", VAR_ORDINAL); + } + + private void initVariableWeight() { + variableWeights = new HashMap<>(); + + variableWeights.put("Dataset-Derivative-ProcessingLevel", 5); + variableWeights.put("DatasetParameter-Category", 5); + variableWeights.put("DatasetParameter-Variable", 5); + variableWeights.put("DatasetSource-Sensor-ShortName", 5); + + variableWeights.put("DatasetPolicy-Availability", 4); + variableWeights.put("DatasetRegion-Region", 4); + variableWeights.put("DatasetSource-Source-Type", 4); + variableWeights.put("DatasetSource-Source-ShortName", 4); + variableWeights.put("DatasetParameter-Term", 4); + variableWeights.put("DatasetPolicy-DataFormat", 4); + variableWeights.put("Dataset-Derivative-SpatialResolution", 4); + variableWeights.put("Temporal_Covergae", 4); + + variableWeights.put("DatasetParameter-Topic", 3); + variableWeights.put("Collection-ShortName", 3); + variableWeights.put("Dataset-Derivative-TemporalResolution", 3); + variableWeights.put("Spatial_Covergae", 3); + + variableWeights.put("Dataset-ProjectionType", 1); + variableWeights.put("Dataset-Provider-ShortName", 1); + } + + public void VariableBasedSimilarity(ESDriver es) { + + es.createBulkProcessor(); + + List<Map<String, Object>> metadatas = new ArrayList<>(); + SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute() + .actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> metadataA = hit.getSource(); + metadatas.add(metadataA); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + int size = metadatas.size(); + for (int i = 0; i < size; i++) { + Map<String, Object> metadataA = metadatas.get(i); + String shortNameA = (String) metadataA.get("Dataset-ShortName"); + + for (int j = 0; j < size; j++) { + metadataA = metadatas.get(i); + Map<String, Object> metadataB = metadatas.get(j); + String shortNameB = (String) metadataB.get("Dataset-ShortName"); + + try { + XContentBuilder contentBuilder = jsonBuilder().startObject(); + contentBuilder.field("concept_A", shortNameA); + contentBuilder.field("concept_B", shortNameB); + + // spatial similarity + this.spatialSimilarity(metadataA, metadataB, contentBuilder); + // temporal similarity + this.temporalSimilarity(metadataA, metadataB, contentBuilder); + // categorical variables similarity + this.categoricalVariablesSimilarity(metadataA, metadataB, contentBuilder); + // ordinal variables similarity + this.ordinalVariablesSimilarity(metadataA, metadataB, contentBuilder); + + contentBuilder.endObject(); + + IndexRequest ir = new IndexRequest(indexName, variableSimType).source(contentBuilder); + es.getBulkProcessor().add(ir); + + } catch (IOException e1) { + e1.printStackTrace(); + } + + } + } + + es.destroyBulkProcessor(); + } + + /* + * refer to P. Frontiera, R. Larson, and J. Radke (2008) A comparison of + geometric approaches to assessing spatial similarity for GIR. + International Journal of Geographical Information Science, + 22(3) + */ + public void spatialSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException { + + double topA = (double) metadataA.get("DatasetCoverage-Derivative-NorthLat"); + double bottomA = (double) metadataA.get("DatasetCoverage-Derivative-SouthLat"); + double leftA = (double) metadataA.get("DatasetCoverage-Derivative-WestLon"); + double rightA = (double) metadataA.get("DatasetCoverage-Derivative-EastLon"); + double areaA = (double) metadataA.get("DatasetCoverage-Derivative-Area"); + + double topB = (double) metadataB.get("DatasetCoverage-Derivative-NorthLat"); + double bottomB = (double) metadataB.get("DatasetCoverage-Derivative-SouthLat"); + double leftB = (double) metadataB.get("DatasetCoverage-Derivative-WestLon"); + double rightB = (double) metadataB.get("DatasetCoverage-Derivative-EastLon"); + double areaB = (double) metadataB.get("DatasetCoverage-Derivative-Area"); + + // Intersect area + double xOverlap = Math.max(0, Math.min(rightA, rightB) - Math.max(leftA, leftB)); + double yOverlap = Math.max(0, Math.min(topA, topB) - Math.max(bottomA, bottomB)); + double overlapArea = xOverlap * yOverlap; + + // Calculate coverage similarity + double similarity = 0.0; + if (areaA > 0 && areaB > 0) { + similarity = (overlapArea / areaA + overlapArea / areaB) * 0.5; + } + + contentBuilder.field("Spatial_Covergae_Sim", similarity); + } + + public void temporalSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException { + + double similarity = 0.0; + double startTimeA = Double.parseDouble((String) metadataA.get("Dataset-DatasetCoverage-StartTimeLong")); + String endTimeAStr = (String) metadataA.get("Dataset-DatasetCoverage-StopTimeLong"); + double endTimeA = 0.0; + if ("".equals(endTimeAStr)) { + endTimeA = System.currentTimeMillis(); + } else { + endTimeA = Double.parseDouble(endTimeAStr); + } + double timespanA = endTimeA - startTimeA; + + double startTimeB = Double.parseDouble((String) metadataB.get("Dataset-DatasetCoverage-StartTimeLong")); + String endTimeBStr = (String) metadataB.get("Dataset-DatasetCoverage-StopTimeLong"); + double endTimeB = 0.0; + if ("".equals(endTimeBStr)) { + endTimeB = System.currentTimeMillis(); + } else { + endTimeB = Double.parseDouble(endTimeBStr); + } + double timespanB = endTimeB - startTimeB; + + double intersect = 0.0; + if (startTimeB >= endTimeA || endTimeB <= startTimeA) { + intersect = 0.0; + } else if (startTimeB >= startTimeA && endTimeB <= endTimeA) { + intersect = timespanB; + } else if (startTimeA >= startTimeB && endTimeA <= endTimeB) { + intersect = timespanA; + } else { + intersect = (startTimeA > startTimeB) ? (endTimeB - startTimeA) : (endTimeA - startTimeB); + } + + similarity = intersect / (Math.sqrt(timespanA) * Math.sqrt(timespanB)); + contentBuilder.field("Temporal_Covergae_Sim", similarity); + } + + public void categoricalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException { + + for (String variable : variableTypes.keySet()) { + Integer type = variableTypes.get(variable); + if (type != VAR_CATEGORICAL) { + continue; + } + + double similarity = 0.0; + Object valueA = metadataA.get(variable); + Object valueB = metadataB.get(variable); + if (valueA instanceof ArrayList) { + ArrayList<String> aList = (ArrayList<String>) valueA; + ArrayList<String> bList = (ArrayList<String>) valueB; + if (aList != null && bList != null) { + + int lengthA = aList.size(); + int lengthB = bList.size(); + List<String> newAList = new ArrayList<>(aList); + List<String> newBList = new ArrayList<>(bList); + newAList.retainAll(newBList); + similarity = newAList.size() / lengthA; + } + + } else if (valueA instanceof String) { + if (valueA.equals(valueB)) { + similarity = 1.0; + } + } + + contentBuilder.field(variable + "_Sim", similarity); + } + } + + public void ordinalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException { + for (String variable : variableTypes.keySet()) { + Integer type = variableTypes.get(variable); + if (type != VAR_ORDINAL) { + continue; + } + + double similarity = 0.0; + Object valueA = metadataA.get(variable); + Object valueB = metadataB.get(variable); + if (valueA != null && valueB != null) { + + double a = (double) valueA; + double b = (double) valueB; + if (a != 0.0) { + similarity = 1 - Math.abs(b - a) / a; + if (similarity < 0) { + similarity = 0.0; + } + } + } + + contentBuilder.field(variable + "_Sim", similarity); + } + } + + public static void addMapping(ESDriver es, String index, String type) { + XContentBuilder Mapping; + try { + Mapping = jsonBuilder().startObject().startObject(type).startObject("properties").startObject("concept_A").field("type", "string").field("index", "not_analyzed").endObject() + .startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject() + + .endObject().endObject().endObject(); + + es.getClient().admin().indices().preparePutMapping(index).setType(type).setSource(Mapping).execute().actionGet(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void normalizeVariableWeight(ESDriver es) { + + es.createBulkProcessor(); + + double totalWeight = 0.0; + for (String variable : variableWeights.keySet()) { + totalWeight += variableWeights.get(variable); + } + + SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(variableSimType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute() + .actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> similarities = hit.getSource(); + + double totalSim = 0.0; + for (String variable : variableWeights.keySet()) { + if (similarities.containsKey(variable + "_Sim")) { + double value = (double) similarities.get(variable + "_Sim"); + double weight = variableWeights.get(variable); + totalSim += weight * value; + } + } + + double weight = totalSim / totalWeight; + UpdateRequest ur = es.generateUpdateRequest(indexName, variableSimType, hit.getId(), "weight", weight); + es.getBulkProcessor().add(ur); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + es.destroyBulkProcessor(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/package-info.java new file mode 100644 index 0000000..7b1aeac --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/package-info.java @@ -0,0 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes the processing required by recommendation module. + */ +package org.apache.sdap.mudrod.recommendation.process; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/sessionBasedCF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/sessionBasedCF.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/sessionBasedCF.java new file mode 100644 index 0000000..a9e0699 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/sessionBasedCF.java @@ -0,0 +1,74 @@ +/** + * Project Name:mudrod-core + * File Name:sessionBasedCF.java + * Package Name:org.apache.sdap.mudrod.recommendation.process + * Date:Aug 19, 20163:17:00 PM + * Copyright (c) 2016, [email protected] All Rights Reserved. + */ + +package org.apache.sdap.mudrod.recommendation.process; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.semantics.SemanticAnalyzer; +import org.apache.sdap.mudrod.utils.LinkageTriple; +import org.apache.sdap.mudrod.utils.SimilarityUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; +import java.util.Properties; + +/** + * ClassName: Recommend metedata based on session level co-occurrence + */ +public class sessionBasedCF extends DiscoveryStepAbstract { + + private static final Logger LOG = LoggerFactory.getLogger(sessionBasedCF.class); + + /** + * Creates a new instance of sessionBasedCF. + * + * @param props + * the Mudrod configuration + * @param es + * the Elasticsearch drive + * @param spark + * the spark drive + */ + public sessionBasedCF(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("*****************Session based metadata similarity starts******************"); + startTime = System.currentTimeMillis(); + + try { + String session_metadatFile = props.getProperty("session_metadata_Matrix"); + File f = new File(session_metadatFile); + if (f.exists()) { + SemanticAnalyzer analyzer = new SemanticAnalyzer(props, es, spark); + List<LinkageTriple> triples = analyzer.calTermSimfromMatrix(session_metadatFile, SimilarityUtil.SIM_PEARSON, 1); + analyzer.saveToES(triples, props.getProperty("indexName"), props.getProperty("metadataSessionBasedSimType"), true, false); + } + + } catch (Exception e) { + e.printStackTrace(); + } + + endTime = System.currentTimeMillis(); + LOG.info("*****************Session based metadata similarity ends******************Took {}s", (endTime - startTime) / 1000); + + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java new file mode 100644 index 0000000..2e314de --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java @@ -0,0 +1,275 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.recommendation.structure; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodEngine; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.*; + +/** + * Recommend metadata using combination all two methods, including content-based + * similarity and session-level similarity + */ +public class HybridRecommendation extends DiscoveryStepAbstract { + /** + * + */ + private static final long serialVersionUID = 1L; + // recommended metadata list + protected transient List<LinkedTerm> termList = new ArrayList<>(); + // format decimal + DecimalFormat df = new DecimalFormat("#.00"); + // index name + protected static final String INDEX_NAME = "indexName"; + private static final String WEIGHT = "weight"; + + /** + * recommended data class Date: Sep 12, 2016 2:25:28 AM + */ + class LinkedTerm { + public String term = null; + public double weight = 0; + public String model = null; + + public LinkedTerm(String str, double w, String m) { + term = str; + weight = w; + model = m; + } + } + + public HybridRecommendation(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + /** + * Get recommended data for a giving dataset + * + * @param input: a giving dataset + * @param num: the number of recommended dataset + * @return recommended dataset in json format + */ + public JsonObject getRecomDataInJson(String input, int num) { + JsonObject resultJson = new JsonObject(); + + String type = props.getProperty("metadataCodeSimType"); + Map<String, Double> sortedVariableSimMap = getRelatedData(type, input, num + 10); + + type = props.getProperty("metadataWordTFIDFSimType"); + Map<String, Double> sortedAbstractSimMap = getRelatedData(type, input, num + 10); + + type = props.getProperty("metadataSessionBasedSimType"); + Map<String, Double> sortedSessionSimMap = getRelatedData(type, input, num + 10); + + JsonElement variableSimJson = mapToJson(sortedVariableSimMap, num); + resultJson.add("variableSim", variableSimJson); + JsonElement abstractSimJson = mapToJson(sortedAbstractSimMap, num); + resultJson.add("abstractSim", abstractSimJson); + JsonElement sessionSimJson = mapToJson(sortedSessionSimMap, num); + resultJson.add("sessionSim", sessionSimJson); + + Map<String, Double> hybirdSimMap = new HashMap<String, Double>(); + + for (String name : sortedAbstractSimMap.keySet()) { + hybirdSimMap.put(name, sortedAbstractSimMap.get(name) /** 0.4 */); + } + + for (String name : sortedVariableSimMap.keySet()) { + if (hybirdSimMap.get(name) != null) { + double sim = hybirdSimMap.get(name) + sortedVariableSimMap.get(name) /** 0.3 */; + hybirdSimMap.put(name, Double.parseDouble(df.format(sim))); + } else { + double sim = sortedVariableSimMap.get(name); + hybirdSimMap.put(name, Double.parseDouble(df.format(sim))); + } + } + + for (String name : sortedSessionSimMap.keySet()) { + if (hybirdSimMap.get(name) != null) { + double sim = hybirdSimMap.get(name) + sortedSessionSimMap.get(name) /** 0.1 */; + hybirdSimMap.put(name, Double.parseDouble(df.format(sim))); + } else { + double sim = sortedSessionSimMap.get(name); + hybirdSimMap.put(name, Double.parseDouble(df.format(sim))); + } + } + + Map<String, Double> sortedHybirdSimMap = this.sortMapByValue(hybirdSimMap); + + JsonElement linkedJson = mapToJson(sortedHybirdSimMap, num); + resultJson.add("linked", linkedJson); + + return resultJson; + } + + /** + * Method of converting hashmap to JSON + * + * @param wordweights a map from related metadata to weights + * @param num the number of converted elements + * @return converted JSON object + */ + protected JsonElement mapToJson(Map<String, Double> wordweights, int num) { + Gson gson = new Gson(); + + List<JsonObject> nodes = new ArrayList<>(); + Set<String> words = wordweights.keySet(); + int i = 0; + for (String wordB : words) { + JsonObject node = new JsonObject(); + node.addProperty("name", wordB); + node.addProperty("weight", wordweights.get(wordB)); + nodes.add(node); + + i += 1; + if (i >= num) { + break; + } + } + + String nodesJson = gson.toJson(nodes); + JsonElement nodesElement = gson.fromJson(nodesJson, JsonElement.class); + + return nodesElement; + } + + /** + * Get recommend dataset for a giving dataset + * + * @param type recommend method + * @param input a giving dataset + * @param num the number of recommended dataset + * @return recommended dataset map, key is dataset name, value is similarity + * value + */ + public Map<String, Double> getRelatedData(String type, String input, int num) { + termList = new ArrayList<>(); + Map<String, Double> termsMap = new HashMap<>(); + Map<String, Double> sortedMap = new HashMap<>(); + try { + List<LinkedTerm> links = getRelatedDataFromES(type, input, num); + int size = links.size(); + for (int i = 0; i < size; i++) { + termsMap.put(links.get(i).term, links.get(i).weight); + } + + sortedMap = sortMapByValue(termsMap); // terms_map will be empty + } catch (Exception e) { + e.printStackTrace(); + } + + return sortedMap; + } + + /** + * Get recommend dataset for a giving dataset + * + * @param type recommend method + * @param input a giving dataset + * @param num the number of recommended dataset + * @return recommended dataset list + */ + public List<LinkedTerm> getRelatedDataFromES(String type, String input, int num) { + + SearchRequestBuilder builder = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(type).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC) + .setSize(num); + + SearchResponse usrhis = builder.execute().actionGet(); + + for (SearchHit hit : usrhis.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String conceptB = (String) result.get("concept_B"); + + if (!conceptB.equals(input)) { + LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), type); + termList.add(lTerm); + } + } + + return termList; + } + + /** + * Method of sorting a map by value + * + * @param passedMap input map + * @return sorted map + */ + public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) { + List<String> mapKeys = new ArrayList<>(passedMap.keySet()); + List<Double> mapValues = new ArrayList<>(passedMap.values()); + Collections.sort(mapValues, Collections.reverseOrder()); + Collections.sort(mapKeys, Collections.reverseOrder()); + + LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>(); + + Iterator<Double> valueIt = mapValues.iterator(); + while (valueIt.hasNext()) { + Object val = valueIt.next(); + Iterator<String> keyIt = mapKeys.iterator(); + + while (keyIt.hasNext()) { + Object key = keyIt.next(); + String comp1 = passedMap.get(key).toString(); + String comp2 = val.toString(); + + if (comp1.equals(comp2)) { + passedMap.remove(key); + mapKeys.remove(key); + sortedMap.put((String) key, (Double) val); + break; + } + } + } + return sortedMap; + } + + public static void main(String[] args) throws IOException { + + MudrodEngine me = new MudrodEngine(); + Properties props = me.loadConfig(); + ESDriver es = new ESDriver(me.getConfig()); + HybridRecommendation test = new HybridRecommendation(props, es, null); + + // String input = "NSCAT_LEVEL_1.7_V2"; + String input = "AQUARIUS_L3_SSS_SMIA_MONTHLY-CLIMATOLOGY_V4"; + JsonObject json = test.getRecomDataInJson(input, 10); + + System.out.println(json.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataOpt.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataOpt.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataOpt.java new file mode 100644 index 0000000..a7cc5e3 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataOpt.java @@ -0,0 +1,150 @@ +package org.apache.sdap.mudrod.recommendation.structure; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.utils.LabeledRowMatrix; +import org.apache.sdap.mudrod.utils.MatrixUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.*; + +public class MetadataOpt implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + private String indexName; + private String metadataType; + private List<String> variables; + + public static final String SPLIT_BLANK = " "; + public static final String SPLIT_COMMA = ","; + + public MetadataOpt(Properties props) { + indexName = props.getProperty(MudrodConstants.ES_INDEX_NAME); + metadataType = props.getProperty("recom_metadataType"); + + variables = new ArrayList<>(); + variables.add("DatasetParameter-Term"); + variables.add("DatasetParameter-Variable"); + variables.add("Dataset-Description"); + variables.add("Dataset-LongName"); + } + + public JavaPairRDD<String, String> loadAll(ESDriver es, SparkDriver spark) throws Exception { + List<Tuple2<String, String>> datasetsTokens = this.loadMetadataFromES(es, variables); + return this.parallizeData(spark, datasetsTokens); + } + + public JavaPairRDD<String, String> loadAll(ESDriver es, SparkDriver spark, List<String> variables) throws Exception { + List<Tuple2<String, String>> datasetsTokens = this.loadMetadataFromES(es, variables); + return this.parallizeData(spark, datasetsTokens); + } + + private JavaPairRDD<String, String> parallizeData(SparkDriver spark, List<Tuple2<String, String>> datasetContent) { + + JavaRDD<Tuple2<String, String>> datasetContentRDD = spark.sc.parallelize(datasetContent); + + return datasetContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, String> call(Tuple2<String, String> term) throws Exception { + return term; + } + }); + + } + + public JavaPairRDD<String, List<String>> tokenizeData(JavaPairRDD<String, String> datasetsContentRDD, String splitter) throws Exception { + + return datasetsContentRDD.mapToPair(new PairFunction<Tuple2<String, String>, String, List<String>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, List<String>> call(Tuple2<String, String> arg) throws Exception { + String content = arg._2; + List<String> tokens = getTokens(content, splitter); + + return new Tuple2<>(arg._1, tokens); + } + }); + + } + + public List<String> getTokens(String str, String splitter) throws Exception { + String[] tokens = null; + if (splitter.equals(SPLIT_BLANK)) { + tokens = str.split(" "); + } else if (splitter.equals(SPLIT_COMMA)) { + tokens = str.split(","); + } + return java.util.Arrays.asList(tokens); + } + + public List<Tuple2<String, String>> loadMetadataFromES(ESDriver es, List<String> variables) throws Exception { + + SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setQuery(QueryBuilders.matchAllQuery()).setScroll(new TimeValue(60000)).setSize(100).execute() + .actionGet(); + + List<Tuple2<String, String>> datasetsTokens = new ArrayList<>(); + while (true) { + + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String shortName = (String) result.get("Dataset-ShortName"); + + String filedStr = ""; + int size = variables.size(); + for (int i = 0; i < size; i++) { + String filed = variables.get(i); + Object filedValue = result.get(filed); + + if (filedValue != null) { + filedStr = es.customAnalyzing(indexName, filedValue.toString()); + } + } + + datasetsTokens.add(new Tuple2<String, String>(shortName, filedStr)); + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + return datasetsTokens; + } + + public LabeledRowMatrix tFIDFTokens(JavaPairRDD<String, List<String>> datasetTokensRDD, SparkDriver spark) { + + LabeledRowMatrix labelMatrix = MatrixUtil.createDocWordMatrix(datasetTokensRDD, spark.sc); + + RowMatrix docwordMatrix = labelMatrix.rowMatrix; + + RowMatrix docwordTFIDFMatrix = MatrixUtil.createTFIDFMatrix(docwordMatrix); + + labelMatrix.rowMatrix = docwordTFIDFMatrix; + + return labelMatrix; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/RecomData.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/RecomData.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/RecomData.java new file mode 100644 index 0000000..bea0b40 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/RecomData.java @@ -0,0 +1,197 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.recommendation.structure; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodEngine; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.*; + +/** + * This class is used to test recommendation result similarity and session-level + * similarity + */ +public class RecomData extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + protected transient List<LinkedTerm> termList = new ArrayList<>(); + DecimalFormat df = new DecimalFormat("#.00"); + protected static final String INDEX_NAME = "indexName"; + private static final String WEIGHT = "weight"; + + class LinkedTerm { + public String term = null; + public double weight = 0; + public String model = null; + + public LinkedTerm(String str, double w, String m) { + term = str; + weight = w; + model = m; + } + } + + public RecomData(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + public JsonObject getRecomDataInJson(String input, int num) { + String type = props.getProperty("metadataTermTFIDFSimType"); + Map<String, Double> sortedOBSimMap = getRelatedData(type, input, num + 5); + JsonElement linkedJson = mapToJson(sortedOBSimMap, num); + + // type = props.getProperty("metadataTermTFIDFSimType"); + type = props.getProperty("metadataCodeSimType"); + + Map<String, Double> sortedMBSimMap = getRelatedData(type, input, num + 5); + JsonElement relatedJson = mapToJson(sortedMBSimMap, num); + + JsonObject json = new JsonObject(); + + json.add("TFIDFSim", linkedJson); + json.add("TopicSim", relatedJson); + + return json; + } + + protected JsonElement mapToJson(Map<String, Double> wordweights, int num) { + Gson gson = new Gson(); + + List<JsonObject> nodes = new ArrayList<>(); + Set<String> words = wordweights.keySet(); + int i = 0; + for (String wordB : words) { + JsonObject node = new JsonObject(); + node.addProperty("name", wordB); + node.addProperty("weight", wordweights.get(wordB)); + nodes.add(node); + + i += 1; + if (i >= num) { + break; + } + } + + String nodesJson = gson.toJson(nodes); + JsonElement nodesElement = gson.fromJson(nodesJson, JsonElement.class); + + return nodesElement; + } + + public Map<String, Double> getRelatedData(String type, String input, int num) { + termList = new ArrayList<>(); + Map<String, Double> termsMap = new HashMap<>(); + Map<String, Double> sortedMap = new HashMap<>(); + try { + List<LinkedTerm> links = getRelatedDataFromES(type, input, num); + int size = links.size(); + for (int i = 0; i < size; i++) { + termsMap.put(links.get(i).term, links.get(i).weight); + } + + sortedMap = sortMapByValue(termsMap); // terms_map will be empty + } catch (Exception e) { + e.printStackTrace(); + } + + return sortedMap; + } + + public List<LinkedTerm> getRelatedDataFromES(String type, String input, int num) { + SearchRequestBuilder builder = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(type).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC) + .setSize(num); + + SearchResponse usrhis = builder.execute().actionGet(); + + for (SearchHit hit : usrhis.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String conceptB = (String) result.get("concept_B"); + + if (!conceptB.equals(input)) { + LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), type); + termList.add(lTerm); + } + } + + return termList; + } + + public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) { + List<String> mapKeys = new ArrayList<>(passedMap.keySet()); + List<Double> mapValues = new ArrayList<>(passedMap.values()); + Collections.sort(mapValues, Collections.reverseOrder()); + Collections.sort(mapKeys, Collections.reverseOrder()); + + LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>(); + + Iterator<Double> valueIt = mapValues.iterator(); + while (valueIt.hasNext()) { + Object val = valueIt.next(); + Iterator<String> keyIt = mapKeys.iterator(); + + while (keyIt.hasNext()) { + Object key = keyIt.next(); + String comp1 = passedMap.get(key).toString(); + String comp2 = val.toString(); + + if (comp1.equals(comp2)) { + passedMap.remove(key); + mapKeys.remove(key); + sortedMap.put((String) key, (Double) val); + break; + } + } + } + return sortedMap; + } + + public static void main(String[] args) throws IOException { + + MudrodEngine me = new MudrodEngine(); + Properties props = me.loadConfig(); + ESDriver es = new ESDriver(me.getConfig()); + RecomData test = new RecomData(props, es, null); + + String input = "AQUARIUS_L3_SSS_SMIA_MONTHLY-CLIMATOLOGY_V4"; + JsonObject json = test.getRecomDataInJson(input, 10); + + System.out.println(json.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/package-info.java new file mode 100644 index 0000000..c9095c7 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/package-info.java @@ -0,0 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes the data structure required by recommendation module. + */ +package org.apache.sdap.mudrod.recommendation.structure; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/semantics/SVDAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/semantics/SVDAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/semantics/SVDAnalyzer.java new file mode 100644 index 0000000..23ae062 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/semantics/SVDAnalyzer.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.semantics; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.utils.MatrixUtil; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * ClassName: SVDAnalyzer Function: Analyze semantic relationship through SVD + * method + */ +public class SVDAnalyzer extends SemanticAnalyzer { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Creates a new instance of SVDAnalyzer. + * + * @param props the Mudrod configuration + * @param es the Elasticsearch drive + * @param spark the spark drive + */ + public SVDAnalyzer(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + /** + * GetSVDMatrix: Create SVD matrix csv file from original csv file. + * + * @param csvFileName each row is a term, and each column is a document. + * @param svdDimention Dimension of SVD matrix + * @param svdMatrixFileName CSV file name of SVD matrix + */ + public void getSVDMatrix(String csvFileName, int svdDimention, String svdMatrixFileName) { + + JavaPairRDD<String, Vector> importRDD = MatrixUtil.loadVectorFromCSV(spark, csvFileName, 1); + JavaRDD<Vector> vectorRDD = importRDD.values(); + RowMatrix wordDocMatrix = new RowMatrix(vectorRDD.rdd()); + RowMatrix tfidfMatrix = MatrixUtil.createTFIDFMatrix(wordDocMatrix); + RowMatrix svdMatrix = MatrixUtil.buildSVDMatrix(tfidfMatrix, svdDimention); + + List<String> rowKeys = importRDD.keys().collect(); + List<String> colKeys = new ArrayList<>(); + for (int i = 0; i < svdDimention; i++) { + colKeys.add("dimension" + i); + } + MatrixUtil.exportToCSV(svdMatrix, rowKeys, colKeys, svdMatrixFileName); + } +}
