http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java deleted file mode 100644 index d1af04a..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java +++ /dev/null @@ -1,572 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.driver; - -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import com.google.gson.GsonBuilder; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import gov.nasa.jpl.mudrod.main.MudrodEngine; -import gov.nasa.jpl.mudrod.utils.ESTransportClient; -import org.apache.commons.lang.StringUtils; -import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse; -import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.Fuzziness; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.node.Node; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.suggest.Suggest; -import org.elasticsearch.search.suggest.SuggestBuilder; -import org.elasticsearch.search.suggest.SuggestBuilders; -import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.net.InetAddress; -import java.time.Instant; -import java.time.LocalDate; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -/** - * Driver implementation for all Elasticsearch functionality. - */ -public class ESDriver implements Serializable { - - private static final Logger LOG = LoggerFactory.getLogger(ESDriver.class); - private static final long serialVersionUID = 1L; - private transient Client client = null; - private transient Node node = null; - private transient BulkProcessor bulkProcessor = null; - - /** - * Default constructor for this class. To load client configuration call - * substantiated constructor. - */ - public ESDriver() { - // Default constructor, to load configuration call ESDriver(props) - } - - /** - * Substantiated constructor which accepts a {@link java.util.Properties} - * - * @param props a populated properties object. - */ - public ESDriver(Properties props) { - try { - setClient(makeClient(props)); - } catch (IOException e) { - LOG.error("Error whilst constructing Elastcisearch client.", e); - } - } - - public void createBulkProcessor() { - LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", 1000, 2500500); - setBulkProcessor(BulkProcessor.builder(getClient(), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOG.debug("ESDriver#createBulkProcessor @Override #beforeBulk is not implemented yet!"); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - LOG.debug("ESDriver#createBulkProcessor @Override #afterBulk is not implemented yet!"); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error("Bulk request has failed!"); - throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure); - } - }).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1) - .build()); - } - - public void destroyBulkProcessor() { - try { - getBulkProcessor().awaitClose(20, TimeUnit.MINUTES); - setBulkProcessor(null); - refreshIndex(); - } catch (InterruptedException e) { - LOG.error("Error destroying the Bulk Processor.", e); - } - } - - public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException { - - boolean exists = getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists(); - if (exists) { - return; - } - - getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet(); - getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet(); - } - - public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException { - return this.customAnalyzing(indexName, "cody", str); - } - - public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException { - String[] strList = str.toLowerCase().split(","); - for (int i = 0; i < strList.length; i++) { - String tmp = ""; - AnalyzeResponse r = client.admin().indices().prepareAnalyze(strList[i]).setIndex(indexName).setAnalyzer(analyzer).execute().get(); - for (AnalyzeToken token : r.getTokens()) { - tmp += token.getTerm() + " "; - } - strList[i] = tmp.trim(); - } - return String.join(",", strList); - } - - public List<String> customAnalyzing(String indexName, List<String> list) throws InterruptedException, ExecutionException { - if (list == null) { - return list; - } - int size = list.size(); - List<String> customlist = new ArrayList<>(); - for (int i = 0; i < size; i++) { - customlist.add(this.customAnalyzing(indexName, list.get(i))); - } - - return customlist; - } - - public void deleteAllByQuery(String index, String type, QueryBuilder query) { - createBulkProcessor(); - SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new TimeValue(60000)).setQuery(query).setSize(10000).execute() - .actionGet(); - - while (true) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - DeleteRequest deleteRequest = new DeleteRequest(index, type, hit.getId()); - getBulkProcessor().add(deleteRequest); - } - - scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - if (scrollResp.getHits().getHits().length == 0) { - break; - } - - } - destroyBulkProcessor(); - } - - public void deleteType(String index, String type) { - this.deleteAllByQuery(index, type, QueryBuilders.matchAllQuery()); - } - - public List<String> getTypeListWithPrefix(Object object, Object object2) { - ArrayList<String> typeList = new ArrayList<>(); - GetMappingsResponse res; - try { - res = getClient().admin().indices().getMappings(new GetMappingsRequest().indices(object.toString())).get(); - ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(object.toString()); - for (ObjectObjectCursor<String, MappingMetaData> c : mapping) { - if (c.key.startsWith(object2.toString())) { - typeList.add(c.key); - } - } - } catch (InterruptedException | ExecutionException e) { - LOG.error("Error whilst obtaining type list from Elasticsearch mappings.", e); - } - return typeList; - } - - public List<String> getIndexListWithPrefix(Object object) { - - LOG.info("Retrieving index list with prefix: {}", object.toString()); - String[] indices = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices(); - - ArrayList<String> indexList = new ArrayList<>(); - int length = indices.length; - for (int i = 0; i < length; i++) { - String indexName = indices[i]; - if (indexName.startsWith(object.toString())) { - indexList.add(indexName); - } - } - - return indexList; - } - - public String searchByQuery(String index, String type, String query) throws IOException, InterruptedException, ExecutionException { - return searchByQuery(index, type, query, false); - } - - @SuppressWarnings("unchecked") - public String searchByQuery(String index, String type, String query, Boolean bDetail) throws IOException, InterruptedException, ExecutionException { - boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); - if (!exists) { - return null; - } - - QueryBuilder qb = QueryBuilders.queryStringQuery(query); - SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet(); - - // Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same. - Map<String, String> fieldsToReturn = new HashMap<>(); - - fieldsToReturn.put("Dataset-ShortName", "Short Name"); - fieldsToReturn.put("Dataset-LongName", "Long Name"); - fieldsToReturn.put("DatasetParameter-Topic", "Topic"); - fieldsToReturn.put("Dataset-Description", "Dataset-Description"); - fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date"); - - if (bDetail) { - fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat"); - fieldsToReturn.put("Dataset-Doi", "Dataset-Doi"); - fieldsToReturn.put("Dataset-ProcessingLevel", "Processing Level"); - fieldsToReturn.put("DatasetCitation-Version", "Version"); - fieldsToReturn.put("DatasetSource-Sensor-ShortName", "DatasetSource-Sensor-ShortName"); - fieldsToReturn.put("DatasetProject-Project-ShortName", "DatasetProject-Project-ShortName"); - fieldsToReturn.put("DatasetParameter-Category", "DatasetParameter-Category"); - fieldsToReturn.put("DatasetLocationPolicy-BasePath", "DatasetLocationPolicy-BasePath"); - fieldsToReturn.put("DatasetParameter-Variable-Full", "DatasetParameter-Variable-Full"); - fieldsToReturn.put("DatasetParameter-Term-Full", "DatasetParameter-Term-Full"); - fieldsToReturn.put("DatasetParameter-VariableDetail", "DatasetParameter-VariableDetail"); - - fieldsToReturn.put("DatasetRegion-Region", "Region"); - fieldsToReturn.put("DatasetCoverage-NorthLat", "NorthLat"); - fieldsToReturn.put("DatasetCoverage-SouthLat", "SouthLat"); - fieldsToReturn.put("DatasetCoverage-WestLon", "WestLon"); - fieldsToReturn.put("DatasetCoverage-EastLon", "EastLon"); - fieldsToReturn.put("DatasetCoverage-StartTimeLong-Long", "DatasetCoverage-StartTimeLong-Long"); - fieldsToReturn.put("Dataset-DatasetCoverage-StopTimeLong", "Dataset-DatasetCoverage-StopTimeLong"); - - fieldsToReturn.put("Dataset-TemporalResolution", "Dataset-TemporalResolution"); - fieldsToReturn.put("Dataset-TemporalRepeat", "Dataset-TemporalRepeat"); - fieldsToReturn.put("Dataset-LatitudeResolution", "Dataset-LatitudeResolution"); - fieldsToReturn.put("Dataset-LongitudeResolution", "Dataset-LongitudeResolution"); - fieldsToReturn.put("Dataset-AcrossTrackResolution", "Dataset-AcrossTrackResolution"); - fieldsToReturn.put("Dataset-AlongTrackResolution", "Dataset-AlongTrackResolution"); - } - - List<Map<String, Object>> searchResults = new ArrayList<>(); - - for (SearchHit hit : response.getHits().getHits()) { - Map<String, Object> source = hit.getSource(); - - Map<String, Object> searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey())) - .collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue)); - - // searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result - - // Some results require special handling/formatting: - // Release Date formatting - LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList<String>) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate(); - searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE)); - - if (bDetail) { - - // DataFormat value, translate RAW to BINARY - if ("RAW".equals(searchResult.get("DataFormat"))) { - searchResult.put("DataFormat", "BINARY"); - } - - // DatasetLocationPolicy-BasePath Should only contain ftp, http, or https URLs - List<String> urls = ((List<String>) searchResult.get("DatasetLocationPolicy-BasePath")).stream().filter(url -> url.startsWith("ftp") || url.startsWith("http")).collect(Collectors.toList()); - searchResult.put("DatasetLocationPolicy-BasePath", urls); - - // Time Span Formatting - LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate(); - LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ? - null : - Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate(); - searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE))); - - // Temporal resolution can come from one of two fields - searchResult.put("TemporalResolution", "".equals(searchResult.get("Dataset-TemporalResolution")) ? searchResult.get("Dataset-TemporalRepeat") : searchResult.get("Dataset-TemporalResolution")); - - // Special formatting for spatial resolution - String latResolution = (String) searchResult.get("Dataset-LatitudeResolution"); - String lonResolution = (String) searchResult.get("Dataset-LongitudeResolution"); - if (!latResolution.isEmpty() && !lonResolution.isEmpty()) { - searchResult.put("SpatialResolution", latResolution + " degrees (latitude) x " + lonResolution + " degrees (longitude)"); - } else { - String acrossResolution = (String) searchResult.get("Dataset-AcrossTrackResolution"); - String alonResolution = (String) searchResult.get("Dataset-AlongTrackResolution"); - double dAcrossResolution = Double.parseDouble(acrossResolution) / 1000; - double dAlonResolution = Double.parseDouble(alonResolution) / 1000; - searchResult.put("SpatialResolution", dAlonResolution + " km (Along) x " + dAcrossResolution + " km (Across)"); - } - - // Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies. - List<List<String>> measurements = buildMeasurementHierarchies((List<String>) searchResult.get("Topic"), (List<String>) searchResult.get("DatasetParameter-Term-Full"), - (List<String>) searchResult.get("DatasetParameter-Variable-Full"), (List<String>) searchResult.get("DatasetParameter-VariableDetail")); - - searchResult.put("Measurements", measurements); - - } - - searchResults.add(searchResult); - } - - Map<String, List<?>> pdResults = new HashMap<>(); - pdResults.put("PDResults", searchResults); - - return new GsonBuilder().create().toJson(pdResults); - } - - /** - * Builds a List of Measurement Hierarchies given the individual source lists. - * The hierarchy is built from the element in the same position from each input list in the order: Topic -> Term -> Variable -> VariableDetail - * "None" and blank strings are ignored. If, at any level, an element does not exist for that position or it is "None" or blank, that hierarchy is considered complete. - * - * For example, if the input is: - * <pre> - * topics = ["Oceans", "Oceans"] - * terms = ["Sea Surface Topography", "Ocean Waves"] - * variables = ["Sea Surface Height", "Significant Wave Height"] - * variableDetails = ["None", "None"] - * </pre> - * - * The output would be: - * <pre> - * [ - * ["Oceans", "Sea Surface Topography", "Sea Surface Height"], - * ["Oceans", "Ocean Waves", "Significant Wave Height"] - * ] - * </pre> - * Oceans > Sea Surface Topography > Sea Surface Height - * Oceans > Ocean Waves > Significant Wave Height - * - * @param topics List of topics, the first element of a measurement - * @param terms List of terms, the second element of a measurement - * @param variables List of variables, the third element of a measurement - * @param variableDetails List of variable details, the fourth element of a measurement - * - * @return A List where each element is a single hierarchy (as a List) built from the provided input lists. - */ - private List<List<String>> buildMeasurementHierarchies(List<String> topics, List<String> terms, List<String> variables, List<String> variableDetails) { - - List<List<String>> measurements = new ArrayList<>(); - - for (int x = 0; x < topics.size(); x++) { - measurements.add(new ArrayList<>()); - measurements.get(x).add(topics.get(x)); - // Only add the next 'level' if we can - if (x < terms.size() && !"None".equalsIgnoreCase(terms.get(x)) && StringUtils.isNotBlank(terms.get(x))) { - measurements.get(x).add(terms.get(x)); - if (x < variables.size() && !"None".equalsIgnoreCase(variables.get(x)) && StringUtils.isNotBlank(variables.get(x))) { - measurements.get(x).add(variables.get(x)); - if (x < variableDetails.size() && !"None".equalsIgnoreCase(variableDetails.get(x)) && StringUtils.isNotBlank(variableDetails.get(x))) { - measurements.get(x).add(variableDetails.get(x)); - } - } - } - } - - return measurements; - - } - - public List<String> autoComplete(String index, String term) { - boolean exists = this.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); - if (!exists) { - return new ArrayList<>(); - } - - Set<String> suggestHS = new HashSet<String>(); - List<String> suggestList = new ArrayList<>(); - - // please make sure that the completion field is configured in the ES mapping - CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100); - SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder)); - SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet(); - - Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator(); - - while (iterator.hasNext()) { - Suggest.Suggestion.Entry.Option next = iterator.next(); - String suggest = next.getText().string().toLowerCase(); - suggestList.add(suggest); - } - - suggestHS.addAll(suggestList); - suggestList.clear(); - suggestList.addAll(suggestHS); - return suggestList; - } - - public void close() { - client.close(); - } - - public void refreshIndex() { - client.admin().indices().prepareRefresh().execute().actionGet(); - } - - /** - * Generates a TransportClient or NodeClient - * - * @param props a populated {@link java.util.Properties} object - * @return a constructed {@link org.elasticsearch.client.Client} - * @throws IOException if there is an error building the - * {@link org.elasticsearch.client.Client} - */ - protected Client makeClient(Properties props) throws IOException { - String clusterName = props.getProperty(MudrodConstants.ES_CLUSTER); - String hostsString = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS); - String[] hosts = hostsString.split(","); - String portStr = props.getProperty(MudrodConstants.ES_TRANSPORT_TCP_PORT); - int port = Integer.parseInt(portStr); - - Settings.Builder settingsBuilder = Settings.builder(); - - // Set the cluster name and build the settings - if (!clusterName.isEmpty()) - settingsBuilder.put("cluster.name", clusterName); - - settingsBuilder.put("http.type", "netty3"); - settingsBuilder.put("transport.type", "netty3"); - - Settings settings = settingsBuilder.build(); - - Client client = null; - - // Prefer TransportClient - if (hosts != null && port > 1) { - TransportClient transportClient = new ESTransportClient(settings); - for (String host : hosts) - transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); - client = transportClient; - } else if (clusterName != null) { - node = new Node(settings); - client = node.client(); - } - - return client; - } - - /** - * Main method used to invoke the ESDriver implementation. - * - * @param args no arguments are required to invoke the Driver. - */ - public static void main(String[] args) { - MudrodEngine mudrodEngine = new MudrodEngine(); - ESDriver es = new ESDriver(mudrodEngine.loadConfig()); - es.getTypeListWithPrefix("podaacsession", "sessionstats"); - } - - /** - * @return the client - */ - public Client getClient() { - return client; - } - - /** - * @param client the client to set - */ - public void setClient(Client client) { - this.client = client; - } - - /** - * @return the bulkProcessor - */ - public BulkProcessor getBulkProcessor() { - return bulkProcessor; - } - - /** - * @param bulkProcessor the bulkProcessor to set - */ - public void setBulkProcessor(BulkProcessor bulkProcessor) { - this.bulkProcessor = bulkProcessor; - } - - public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) { - - UpdateRequest ur = null; - try { - ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject()); - } catch (IOException e) { - LOG.error("Error whilst attempting to generate a new Update Request.", e); - } - - return ur; - } - - public UpdateRequest generateUpdateRequest(String index, String type, String id, Map<String, Object> filedValueMap) { - - UpdateRequest ur = null; - try { - XContentBuilder builder = jsonBuilder().startObject(); - for (Entry<String, Object> entry : filedValueMap.entrySet()) { - String key = entry.getKey(); - builder.field(key, filedValueMap.get(key)); - } - builder.endObject(); - ur = new UpdateRequest(index, type, id).doc(builder); - } catch (IOException e) { - LOG.error("Error whilst attempting to generate a new Update Request.", e); - } - - return ur; - } - - public int getDocCount(String index, String... type) { - MatchAllQueryBuilder search = QueryBuilders.matchAllQuery(); - String[] indexArr = new String[] { index }; - return this.getDocCount(indexArr, type, search); - } - - public int getDocCount(String[] index, String[] type) { - MatchAllQueryBuilder search = QueryBuilders.matchAllQuery(); - return this.getDocCount(index, type, search); - } - - public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch) { - SearchRequestBuilder countSrBuilder = getClient().prepareSearch(index).setTypes(type).setQuery(filterSearch).setSize(0); - SearchResponse countSr = countSrBuilder.execute().actionGet(); - int docCount = (int) countSr.getHits().getTotalHits(); - return docCount; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java deleted file mode 100644 index e49c029..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.driver; - -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.serializer.KryoSerializer; -import org.apache.spark.sql.SQLContext; - -import java.io.File; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.util.Properties; -//import org.apache.spark.sql.SparkSession; - -public class SparkDriver implements Serializable { - - //TODO the commented out code below is the API uprgade - //for Spark 2.0.0. It requires a large upgrade and simplification - //across the mudrod codebase so should be done in an individual ticket. - // /** - // * - // */ - // private static final long serialVersionUID = 1L; - // private SparkSession builder; - // - // public SparkDriver() { - // builder = SparkSession.builder() - // .master("local[2]") - // .config("spark.hadoop.validateOutputSpecs", "false") - // .config("spark.files.overwrite", "true") - // .getOrCreate(); - // } - // - // public SparkSession getBuilder() { - // return builder; - // } - // - // public void setBuilder(SparkSession builder) { - // this.builder = builder; - // } - // - // public void close() { - // builder.stop(); - // } - - /** - * - */ - private static final long serialVersionUID = 1L; - public transient JavaSparkContext sc; - public transient SQLContext sqlContext; - - public SparkDriver() { - // empty default constructor - } - - public SparkDriver(Properties props) { - SparkConf conf = new SparkConf().setAppName(props.getProperty(MudrodConstants.SPARK_APP_NAME, "MudrodSparkApp")).setIfMissing("spark.master", props.getProperty(MudrodConstants.SPARK_MASTER)) - .set("spark.hadoop.validateOutputSpecs", "false").set("spark.files.overwrite", "true"); - - String esHost = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS); - String esPort = props.getProperty(MudrodConstants.ES_HTTP_PORT); - - if (!"".equals(esHost)) { - conf.set("es.nodes", esHost); - } - - if (!"".equals(esPort)) { - conf.set("es.port", esPort); - } - - conf.set("spark.serializer", KryoSerializer.class.getName()); - conf.set("es.batch.size.entries", "1500"); - - sc = new JavaSparkContext(conf); - sqlContext = new SQLContext(sc); - } - - public void close() { - sc.sc().stop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java deleted file mode 100644 index d9cde36..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes commonly used Elasticsearch and Spark related - * functions - */ -package gov.nasa.jpl.mudrod.driver; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java deleted file mode 100644 index 14b667d..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.integration; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.sort.SortOrder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.DecimalFormat; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - -/** - * Supports ability to integrate vocab similarity results from metadata, ontology, and web logs. - */ -public class LinkageIntegration extends DiscoveryStepAbstract { - - private static final Logger LOG = LoggerFactory.getLogger(LinkageIntegration.class); - private static final long serialVersionUID = 1L; - transient List<LinkedTerm> termList = new ArrayList<>(); - DecimalFormat df = new DecimalFormat("#.00"); - private static final String INDEX_NAME = "indexName"; - private static final String WEIGHT = "weight"; - - public LinkageIntegration(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - /** - * The data structure to store semantic triple. - */ - class LinkedTerm { - String term = null; - double weight = 0; - String model = null; - - public LinkedTerm(String str, double w, String m) { - term = str; - weight = w; - model = m; - } - } - - /** - * Method of executing integration step - */ - @Override - public Object execute() { - getIngeratedList("ocean wind", 11); - return null; - } - - @Override - public Object execute(Object o) { - return null; - } - - /** - * Method of getting integrated results - * - * @param input query string - * @return a hash map where the string is a related term, and double is the - * similarity to the input query - */ - public Map<String, Double> appyMajorRule(String input) { - termList = new ArrayList<>(); - Map<String, Double> termsMap = new HashMap<>(); - Map<String, List<LinkedTerm>> map = new HashMap<>(); - try { - map = aggregateRelatedTermsFromAllmodel(es.customAnalyzing(props.getProperty(INDEX_NAME), input)); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Error applying majority rule", e); - } - - for (Entry<String, List<LinkedTerm>> entry : map.entrySet()) { - List<LinkedTerm> list = entry.getValue(); - double sumModelWeight = 0; - double tmp = 0; - for (LinkedTerm element : list) { - sumModelWeight += getModelweight(element.model); - - if (element.weight > tmp) { - tmp = element.weight; - } - } - - double finalWeight = tmp + ((sumModelWeight - 2) * 0.05); - if (finalWeight < 0) { - finalWeight = 0; - } - - if (finalWeight > 1) { - finalWeight = 1; - } - termsMap.put(entry.getKey(), Double.parseDouble(df.format(finalWeight))); - } - - return sortMapByValue(termsMap); - } - - /** - * Method of getting integrated results - * - * @param input query string - * @param num the number of most related terms - * @return a string of related terms along with corresponding similarities - */ - public String getIngeratedList(String input, int num) { - String output = ""; - Map<String, Double> sortedMap = appyMajorRule(input); - int count = 0; - for (Entry<String, Double> entry : sortedMap.entrySet()) { - if (count < num) { - output += entry.getKey() + " = " + entry.getValue() + ", "; - } - count++; - } - LOG.info("\n************************Integrated results***************************"); - LOG.info(output); - return output; - } - - /** - * Method of getting integrated results - * - * @param input query string - * @return a JSON object of related terms along with corresponding similarities - */ - public JsonObject getIngeratedListInJson(String input) { - Map<String, Double> sortedMap = appyMajorRule(input); - int count = 0; - Map<String, Double> trimmedMap = new LinkedHashMap<>(); - for (Entry<String, Double> entry : sortedMap.entrySet()) { - if (!entry.getKey().contains("china")) { - if (count < 10) { - trimmedMap.put(entry.getKey(), entry.getValue()); - } - count++; - } - } - - return mapToJson(trimmedMap); - } - - /** - * Method of aggregating terms from web logs, metadata, and ontology - * - * @param input query string - * @return a hash map where the string is a related term, and the list is - * the similarities from different sources - */ - public Map<String, List<LinkedTerm>> aggregateRelatedTermsFromAllmodel(String input) { - aggregateRelatedTerms(input, props.getProperty("userHistoryLinkageType")); - aggregateRelatedTerms(input, props.getProperty("clickStreamLinkageType")); - aggregateRelatedTerms(input, props.getProperty("metadataLinkageType")); - aggregateRelatedTermsSWEET(input, props.getProperty("ontologyLinkageType")); - - return termList.stream().collect(Collectors.groupingBy(w -> w.term)); - } - - public int getModelweight(String model) { - if (model.equals(props.getProperty("userHistoryLinkageType"))) { - return Integer.parseInt(props.getProperty("userHistory_w")); - } - - if (model.equals(props.getProperty("clickStreamLinkageType"))) { - return Integer.parseInt(props.getProperty("clickStream_w")); - } - - if (model.equals(props.getProperty("metadataLinkageType"))) { - return Integer.parseInt(props.getProperty("metadata_w")); - } - - if (model.equals(props.getProperty("ontologyLinkageType"))) { - return Integer.parseInt(props.getProperty("ontology_w")); - } - - return 999999; - } - - /** - * Method of extracting the related term from a comma string - * - * @param str input string - * @param input query string - * @return related term contained in the input string - */ - public String extractRelated(String str, String input) { - String[] strList = str.split(","); - if (input.equals(strList[0])) { - return strList[1]; - } else { - return strList[0]; - } - } - - public void aggregateRelatedTerms(String input, String model) { - //get the first 10 related terms - SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("keywords", input)).addSort(WEIGHT, SortOrder.DESC).setSize(11) - .execute().actionGet(); - - LOG.info("\n************************ {} results***************************", model); - for (SearchHit hit : usrhis.getHits().getHits()) { - Map<String, Object> result = hit.getSource(); - String keywords = (String) result.get("keywords"); - String relatedKey = extractRelated(keywords, input); - - if (!relatedKey.equals(input)) { - LinkedTerm lTerm = new LinkedTerm(relatedKey, (double) result.get(WEIGHT), model); - LOG.info("( {} {} )", relatedKey, (double) result.get(WEIGHT)); - termList.add(lTerm); - } - - } - } - - /** - * Method of querying related terms from ontology - * - * @param input input query - * @param model source name - */ - public void aggregateRelatedTermsSWEET(String input, String model) { - SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC) - .setSize(11).execute().actionGet(); - LOG.info("\n************************ {} results***************************", model); - for (SearchHit hit : usrhis.getHits().getHits()) { - Map<String, Object> result = hit.getSource(); - String conceptB = (String) result.get("concept_B"); - if (!conceptB.equals(input)) { - LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), model); - LOG.info("( {} {} )", conceptB, (double) result.get(WEIGHT)); - termList.add(lTerm); - } - } - } - - /** - * Method of sorting a map by value - * - * @param passedMap input map - * @return sorted map - */ - public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) { - List<String> mapKeys = new ArrayList<>(passedMap.keySet()); - List<Double> mapValues = new ArrayList<>(passedMap.values()); - Collections.sort(mapValues, Collections.reverseOrder()); - Collections.sort(mapKeys, Collections.reverseOrder()); - - LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>(); - - Iterator<Double> valueIt = mapValues.iterator(); - while (valueIt.hasNext()) { - Object val = valueIt.next(); - Iterator<String> keyIt = mapKeys.iterator(); - - while (keyIt.hasNext()) { - Object key = keyIt.next(); - String comp1 = passedMap.get(key).toString(); - String comp2 = val.toString(); - - if (comp1.equals(comp2)) { - passedMap.remove(key); - mapKeys.remove(key); - sortedMap.put((String) key, (Double) val); - break; - } - - } - - } - return sortedMap; - } - - /** - * Method of converting hashmap to JSON - * - * @param word input query - * @param wordweights a map from related terms to weights - * @return converted JSON object - */ - private JsonObject mapToJson(Map<String, Double> wordweights) { - Gson gson = new Gson(); - JsonObject json = new JsonObject(); - List<JsonObject> nodes = new ArrayList<>(); - - for (Entry<String, Double> entry : wordweights.entrySet()) { - JsonObject node = new JsonObject(); - String key = entry.getKey(); - Double value = entry.getValue(); - node.addProperty("word", key); - node.addProperty("weight", value); - nodes.add(node); - } - - JsonElement nodesElement = gson.toJsonTree(nodes); - json.add("ontology", nodesElement); - - return json; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java deleted file mode 100644 index 7f2ba66..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes integration method of web log, ontology, and metdata - * mining results. - */ -package gov.nasa.jpl.mudrod.integration; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java deleted file mode 100644 index ee7cbfa..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.main; - -import gov.nasa.jpl.mudrod.ontology.Ontology; - -/** - * Class contains static constant keys and values relating to Mudrod - * configuration properties. Property values are read from <a href= - * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a> - */ -public interface MudrodConstants { - - public static final String CLEANUP_TYPE_PREFIX = "Cleanup_type_prefix"; - - public static final String CLICK_STREAM_LINKAGE_TYPE = "clickStreamLinkageType"; - - public static final String CLICK_STREAM_MATRIX_TYPE = "clickStreamMatrixType"; - - public static final String CLICKSTREAM_SVD_DIM = "clickstreamSVDDimension"; - - public static final String CLICKSTREAM_W = "clickStream_w"; - - public static final String COMMENT_TYPE = "commentType"; - - /** Defined on CLI */ - public static final String DATA_DIR = "dataDir"; - - public static final String DOWNLOAD_F = "downloadf"; - - public static final String DOWNLOAD_WEIGHT = "downloadWeight"; - - public static final String ES_CLUSTER = "clusterName"; - - public static final String ES_TRANSPORT_TCP_PORT = "ES_Transport_TCP_Port"; - - public static final String ES_UNICAST_HOSTS = "ES_unicast_hosts"; - - public static final String ES_HTTP_PORT = "ES_HTTP_port"; - - public static final String ES_INDEX_NAME = "indexName"; - - public static final String FTP_PREFIX = "ftpPrefix"; - - public static final String FTP_TYPE_PREFIX = "FTP_type_prefix"; - - public static final String HTTP_PREFIX = "httpPrefix"; - - public static final String HTTP_TYPE_PREFIX = "HTTP_type_prefix"; - - public static final String LOG_INDEX = "logIndexName"; - - public static final String METADATA_LINKAGE_TYPE = "metadataLinkageType"; - - public static final String METADATA_SVD_DIM = "metadataSVDDimension"; - - public static final String METADATA_URL = "metadataurl"; - - public static final String METADATA_W = "metadata_w"; - - public static final String MINI_USER_HISTORY = "mini_userHistory"; - - public static final String MUDROD = "mudrod"; - - /** Defined on CLI */ - public static final String MUDROD_CONFIG = "MUDROD_CONFIG"; - /** - * An {@link Ontology} implementation. - */ - public static final String ONTOLOGY_IMPL = MUDROD + "ontology.implementation"; - - public static final String ONTOLOGY_LINKAGE_TYPE = "ontologyLinkageType"; - - public static final String ONTOLOGY_W = "ontology_w"; - - public static final String PROCESS_TYPE = "processingType"; - - /** Defined on CLI */ - public static final String RAW_METADATA_PATH = "raw_metadataPath"; - - public static final String RAW_METADATA_TYPE = "raw_metadataType"; - - public static final String SEARCH_F = "searchf"; - - public static final String SENDING_RATE = "sendingrate"; - - public static final String SESSION_PORT = "SessionPort"; - - public static final String SESSION_STATS_PREFIX = "SessionStats_prefix"; - - public static final String SESSION_URL = "SessionUrl"; - - public static final String SPARK_APP_NAME = "spark.app.name"; - - public static final String SPARK_MASTER = "spark.master"; - /** - * Absolute local location of javaSVMWithSGDModel directory. This is typically - * <code>file:///usr/local/mudrod/core/src/main/resources/javaSVMWithSGDModel</code> - */ - public static final String SVM_SGD_MODEL = "svmSgdModel"; - - public static final String TIMEGAP = "timegap"; - - public static final String TIME_SUFFIX = "TimeSuffix"; - - public static final String USE_HISTORY_LINKAGE_TYPE = "userHistoryLinkageType"; - - public static final String USER_HISTORY_W = "userHistory_w"; - - public static final String VIEW_F = "viewf"; - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java deleted file mode 100644 index 27dba84..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.main; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryEngineAbstract; -import gov.nasa.jpl.mudrod.discoveryengine.MetadataDiscoveryEngine; -import gov.nasa.jpl.mudrod.discoveryengine.OntologyDiscoveryEngine; -import gov.nasa.jpl.mudrod.discoveryengine.RecommendEngine; -import gov.nasa.jpl.mudrod.discoveryengine.WeblogDiscoveryEngine; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.integration.LinkageIntegration; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.jdom2.Document; -import org.jdom2.Element; -import org.jdom2.JDOMException; -import org.jdom2.input.SAXBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.nio.file.Files; -import java.util.List; -import java.util.Properties; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - -import static gov.nasa.jpl.mudrod.main.MudrodConstants.DATA_DIR; - -/** - * Main entry point for Running the Mudrod system. Invocation of this class is - * tightly linked to the primary Mudrod configuration which can be located at - * <a href= - * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>. - */ -public class MudrodEngine { - - private static final Logger LOG = LoggerFactory.getLogger(MudrodEngine.class); - private Properties props = new Properties(); - private ESDriver es = null; - private SparkDriver spark = null; - private static final String LOG_INGEST = "logIngest"; - private static final String META_INGEST = "metaIngest"; - private static final String FULL_INGEST = "fullIngest"; - private static final String PROCESSING = "processingWithPreResults"; - private static final String ES_HOST = "esHost"; - private static final String ES_TCP_PORT = "esTCPPort"; - private static final String ES_HTTP_PORT = "esPort"; - - /** - * Public constructor for this class. - */ - public MudrodEngine() { - // default constructor - } - - /** - * Start the {@link ESDriver}. Should only be called after call to - * {@link MudrodEngine#loadConfig()} - * - * @return fully provisioned {@link ESDriver} - */ - public ESDriver startESDriver() { - return new ESDriver(props); - } - - /** - * Start the {@link SparkDriver}. Should only be called after call to - * {@link MudrodEngine#loadConfig()} - * - * @return fully provisioned {@link SparkDriver} - */ - public SparkDriver startSparkDriver() { - return new SparkDriver(props); - } - - /** - * Retreive the Mudrod configuration as a Properties Map containing K, V of - * type String. - * - * @return a {@link java.util.Properties} object - */ - public Properties getConfig() { - return props; - } - - /** - * Retreive the Mudrod {@link ESDriver} - * - * @return the {@link ESDriver} instance. - */ - public ESDriver getESDriver() { - return this.es; - } - - /** - * Set the Elasticsearch driver for MUDROD - * - * @param es - * an ES driver instance - */ - public void setESDriver(ESDriver es) { - this.es = es; - } - - private InputStream locateConfig() { - - String configLocation = System.getenv(MudrodConstants.MUDROD_CONFIG) == null ? "" : System.getenv(MudrodConstants.MUDROD_CONFIG); - File configFile = new File(configLocation); - - try { - InputStream configStream = new FileInputStream(configFile); - LOG.info("Loaded config file from " + configFile.getAbsolutePath()); - return configStream; - } catch (IOException e) { - LOG.info("File specified by environment variable " + MudrodConstants.MUDROD_CONFIG + "=\'" + configLocation + "\' could not be loaded. " + e.getMessage()); - } - - InputStream configStream = MudrodEngine.class.getClassLoader().getResourceAsStream("config.xml"); - - if (configStream != null) { - LOG.info("Loaded config file from {}", MudrodEngine.class.getClassLoader().getResource("config.xml").getPath()); - } - - return configStream; - } - - /** - * Load the configuration provided at <a href= - * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>. - * - * @return a populated {@link java.util.Properties} object. - */ - public Properties loadConfig() { - SAXBuilder saxBuilder = new SAXBuilder(); - - InputStream configStream = locateConfig(); - - Document document; - try { - document = saxBuilder.build(configStream); - Element rootNode = document.getRootElement(); - List<Element> paraList = rootNode.getChildren("para"); - - for (int i = 0; i < paraList.size(); i++) { - Element paraNode = paraList.get(i); - String attributeName = paraNode.getAttributeValue("name"); - if (MudrodConstants.SVM_SGD_MODEL.equals(attributeName)) { - props.put(attributeName, decompressSVMWithSGDModel(paraNode.getTextTrim())); - } else { - props.put(attributeName, paraNode.getTextTrim()); - } - } - } catch (JDOMException | IOException e) { - LOG.error("Exception whilst retrieving or processing XML contained within 'config.xml'!", e); - } - return getConfig(); - - } - - private String decompressSVMWithSGDModel(String archiveName) throws IOException { - - URL scmArchive = getClass().getClassLoader().getResource(archiveName); - if (scmArchive == null) { - throw new IOException("Unable to locate " + archiveName + " as a classpath resource."); - } - File tempDir = Files.createTempDirectory("mudrod").toFile(); - assert tempDir.setWritable(true); - File archiveFile = new File(tempDir, archiveName); - FileUtils.copyURLToFile(scmArchive, archiveFile); - - // Decompress archive - int BUFFER_SIZE = 512000; - ZipInputStream zipIn = new ZipInputStream(new FileInputStream(archiveFile)); - ZipEntry entry; - while ((entry = zipIn.getNextEntry()) != null) { - File f = new File(tempDir, entry.getName()); - // If the entry is a directory, create the directory. - if (entry.isDirectory() && !f.exists()) { - boolean created = f.mkdirs(); - if (!created) { - LOG.error("Unable to create directory '{}', during extraction of archive contents.", f.getAbsolutePath()); - } - } else if (!entry.isDirectory()) { - boolean created = f.getParentFile().mkdirs(); - if (!created && !f.getParentFile().exists()) { - LOG.error("Unable to create directory '{}', during extraction of archive contents.", f.getParentFile().getAbsolutePath()); - } - int count; - byte data[] = new byte[BUFFER_SIZE]; - FileOutputStream fos = new FileOutputStream(new File(tempDir, entry.getName()), false); - try (BufferedOutputStream dest = new BufferedOutputStream(fos, BUFFER_SIZE)) { - while ((count = zipIn.read(data, 0, BUFFER_SIZE)) != -1) { - dest.write(data, 0, count); - } - } - } - } - - return new File(tempDir, StringUtils.removeEnd(archiveName, ".zip")).toURI().toString(); - } - - /** - * Preprocess and process logs {@link DiscoveryEngineAbstract} implementations - * for weblog - */ - public void startLogIngest() { - DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark); - wd.preprocess(); - wd.process(); - LOG.info("*****************logs have been ingested successfully******************"); - } - - /** - * updating and analysing metadata to metadata similarity results - */ - public void startMetaIngest() { - DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark); - md.preprocess(); - md.process(); - - DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark); - recom.preprocess(); - recom.process(); - LOG.info("Metadata has been ingested successfully."); - } - - public void startFullIngest() { - DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark); - wd.preprocess(); - wd.process(); - - DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark); - md.preprocess(); - md.process(); - - DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark); - recom.preprocess(); - recom.process(); - LOG.info("Full ingest has finished successfully."); - } - - /** - * Only preprocess various {@link DiscoveryEngineAbstract} implementations for - * weblog, ontology and metadata, linkage discovery and integration. - */ - public void startProcessing() { - DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark); - wd.process(); - - DiscoveryEngineAbstract od = new OntologyDiscoveryEngine(props, es, spark); - od.preprocess(); - od.process(); - - DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark); - md.preprocess(); - md.process(); - - LinkageIntegration li = new LinkageIntegration(props, es, spark); - li.execute(); - - DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark); - recom.process(); - } - - /** - * Close the connection to the {@link ESDriver} instance. - */ - public void end() { - if (es != null) { - es.close(); - } - } - - /** - * Main program invocation. Accepts one argument denoting location (on disk) - * to a log file which is to be ingested. Help will be provided if invoked - * with incorrect parameters. - * - * @param args - * {@link java.lang.String} array contaning correct parameters. - */ - public static void main(String[] args) { - // boolean options - Option helpOpt = new Option("h", "help", false, "show this help message"); - - // log ingest (preprocessing + processing) - Option logIngestOpt = new Option("l", LOG_INGEST, false, "begin log ingest"); - // metadata ingest (preprocessing + processing) - Option metaIngestOpt = new Option("m", META_INGEST, false, "begin metadata ingest"); - // ingest both log and metadata - Option fullIngestOpt = new Option("f", FULL_INGEST, false, "begin full ingest Mudrod workflow"); - // processing only, assuming that preprocessing results is in dataDir - Option processingOpt = new Option("p", PROCESSING, false, "begin processing with preprocessing results"); - - // argument options - Option dataDirOpt = OptionBuilder.hasArg(true).withArgName("/path/to/data/directory").hasArgs(1).withDescription("the data directory to be processed by Mudrod").withLongOpt("dataDirectory") - .isRequired().create(DATA_DIR); - - Option esHostOpt = OptionBuilder.hasArg(true).withArgName("host_name").hasArgs(1).withDescription("elasticsearch cluster unicast host").withLongOpt("elasticSearchHost").isRequired(false) - .create(ES_HOST); - - Option esTCPPortOpt = OptionBuilder.hasArg(true).withArgName("port_num").hasArgs(1).withDescription("elasticsearch transport TCP port").withLongOpt("elasticSearchTransportTCPPort") - .isRequired(false).create(ES_TCP_PORT); - - Option esPortOpt = OptionBuilder.hasArg(true).withArgName("port_num").hasArgs(1).withDescription("elasticsearch HTTP/REST port").withLongOpt("elasticSearchHTTPPort").isRequired(false) - .create(ES_HTTP_PORT); - - // create the options - Options options = new Options(); - options.addOption(helpOpt); - options.addOption(logIngestOpt); - options.addOption(metaIngestOpt); - options.addOption(fullIngestOpt); - options.addOption(processingOpt); - options.addOption(dataDirOpt); - options.addOption(esHostOpt); - options.addOption(esTCPPortOpt); - options.addOption(esPortOpt); - - CommandLineParser parser = new GnuParser(); - try { - CommandLine line = parser.parse(options, args); - String processingType = null; - - if (line.hasOption(LOG_INGEST)) { - processingType = LOG_INGEST; - } else if (line.hasOption(PROCESSING)) { - processingType = PROCESSING; - } else if (line.hasOption(META_INGEST)) { - processingType = META_INGEST; - } else if (line.hasOption(FULL_INGEST)) { - processingType = FULL_INGEST; - } - - String dataDir = line.getOptionValue(DATA_DIR).replace("\\", "/"); - if (!dataDir.endsWith("/")) { - dataDir += "/"; - } - - MudrodEngine me = new MudrodEngine(); - me.loadConfig(); - me.props.put(DATA_DIR, dataDir); - - if (line.hasOption(ES_HOST)) { - String esHost = line.getOptionValue(ES_HOST); - me.props.put(MudrodConstants.ES_UNICAST_HOSTS, esHost); - } - - if (line.hasOption(ES_TCP_PORT)) { - String esTcpPort = line.getOptionValue(ES_TCP_PORT); - me.props.put(MudrodConstants.ES_TRANSPORT_TCP_PORT, esTcpPort); - } - - if (line.hasOption(ES_HTTP_PORT)) { - String esHttpPort = line.getOptionValue(ES_HTTP_PORT); - me.props.put(MudrodConstants.ES_HTTP_PORT, esHttpPort); - } - - me.es = new ESDriver(me.getConfig()); - me.spark = new SparkDriver(me.getConfig()); - loadFullConfig(me, dataDir); - if (processingType != null) { - switch (processingType) { - case PROCESSING: - me.startProcessing(); - break; - case LOG_INGEST: - me.startLogIngest(); - break; - case META_INGEST: - me.startMetaIngest(); - break; - case FULL_INGEST: - me.startFullIngest(); - break; - default: - break; - } - } - me.end(); - } catch (Exception e) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("MudrodEngine: 'dataDir' argument is mandatory. " + "User must also provide an ingest method.", options, true); - LOG.error("Error whilst parsing command line.", e); - } - } - - private static void loadFullConfig(MudrodEngine me, String dataDir) { - //TODO all of the properties defined below, which are determined are - //runtime need to be added to MudrodConstants.java and referenced - //accordingly and consistently from Properties.getProperty(MudrodConstant...); - me.props.put("ontologyInputDir", dataDir + "SWEET_ocean/"); - me.props.put("oceanTriples", dataDir + "Ocean_triples.csv"); - me.props.put("userHistoryMatrix", dataDir + "UserHistoryMatrix.csv"); - me.props.put("clickstreamMatrix", dataDir + "ClickstreamMatrix.csv"); - me.props.put("metadataMatrix", dataDir + "MetadataMatrix.csv"); - me.props.put("clickstreamSVDMatrix_tmp", dataDir + "clickstreamSVDMatrix_tmp.csv"); - me.props.put("metadataSVDMatrix_tmp", dataDir + "metadataSVDMatrix_tmp.csv"); - me.props.put("raw_metadataPath", dataDir + me.props.getProperty(MudrodConstants.RAW_METADATA_TYPE)); - - me.props.put("jtopia", dataDir + "jtopiaModel"); - me.props.put("metadata_term_tfidf_matrix", dataDir + "metadata_term_tfidf.csv"); - me.props.put("metadata_word_tfidf_matrix", dataDir + "metadata_word_tfidf.csv"); - me.props.put("session_metadata_Matrix", dataDir + "metadata_session_coocurrence_matrix.csv"); - - me.props.put("metadataOBCode", dataDir + "MetadataOHCode"); - me.props.put("metadata_topic", dataDir + "metadata_topic"); - me.props.put("metadata_topic_matrix", dataDir + "metadata_topic_matrix.csv"); - } - - /** - * Obtain the spark implementation. - * - * @return the {@link SparkDriver} - */ - public SparkDriver getSparkDriver() { - return this.spark; - } - - /** - * Set the {@link SparkDriver} - * - * @param sparkDriver - * a configured {@link SparkDriver} - */ - public void setSparkDriver(SparkDriver sparkDriver) { - this.spark = sparkDriver; - - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java deleted file mode 100644 index 44d0518..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes Main entry point for Running the Mudrod system. - */ -package gov.nasa.jpl.mudrod.main; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java deleted file mode 100644 index 1bb0a3c..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes metadata pre-processing, processing, and data structure - * classes. - */ -package gov.nasa.jpl.mudrod.metadata; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java deleted file mode 100644 index bc7d187..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.metadata.pre; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import gov.nasa.jpl.mudrod.utils.HttpRequest; -import org.apache.commons.io.IOUtils; -import org.elasticsearch.action.index.IndexRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.Properties; - -/** - * ClassName: ApiHarvester Function: Harvest metadata from PO.DAACweb service. - */ -public class ApiHarvester extends DiscoveryStepAbstract { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(ApiHarvester.class); - - /** - * Creates a new instance of ApiHarvester. - * - * @param props the Mudrod configuration - * @param es the Elasticsearch drive - * @param spark the spark driver - */ - public ApiHarvester(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - @Override - public Object execute() { - LOG.info("Starting Metadata harvesting."); - startTime = System.currentTimeMillis(); - //remove old metadata from ES - es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE)); - //harvest new metadata using PO.DAAC web services - harvestMetadatafromWeb(); - es.createBulkProcessor(); - addMetadataMapping(); - importToES(); - es.destroyBulkProcessor(); - endTime = System.currentTimeMillis(); - es.refreshIndex(); - LOG.info("Metadata harvesting completed. Time elapsed: {}", (endTime - startTime) / 1000); - return null; - } - - /** - * addMetadataMapping: Add mapping to index metadata in Elasticsearch. Please - * invoke this method before import metadata to Elasticsearch. - */ - public void addMetadataMapping() { - String mappingJson = "{\r\n \"dynamic_templates\": " + "[\r\n " + "{\r\n \"strings\": " + "{\r\n \"match_mapping_type\": \"string\"," - + "\r\n \"mapping\": {\r\n \"type\": \"text\"," + "\r\n \"fielddata\": true," + "\r\n \"analyzer\": \"english\"," - + "\r\n \"fields\": {\r\n \"raw\": {" + "\r\n \"type\": \"string\"," + "\r\n \"index\": \"not_analyzed\"" + "\r\n }" - + "\r\n }\r\n " + "\r\n }" + "\r\n }\r\n }\r\n ]\r\n}"; - - es.getClient().admin().indices().preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType(props.getProperty(MudrodConstants.RAW_METADATA_TYPE)).setSource(mappingJson).execute() - .actionGet(); - } - - /** - * importToES: Index metadata into elasticsearch from local file directory. - * Please make sure metadata have been harvest from web service before - * invoking this method. - */ - private void importToES() { - File directory = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH)); - if(!directory.exists()) - directory.mkdir(); - File[] fList = directory.listFiles(); - for (File file : fList) { - InputStream is; - try { - is = new FileInputStream(file); - importSingleFileToES(is); - } catch (FileNotFoundException e) { - LOG.error("Error finding file!", e); - } - - } - } - - private void importSingleFileToES(InputStream is) { - try { - String jsonTxt = IOUtils.toString(is); - JsonParser parser = new JsonParser(); - JsonElement item = parser.parse(jsonTxt); - IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE)).source(item.toString()); - es.getBulkProcessor().add(ir); - } catch (IOException e) { - LOG.error("Error indexing metadata record!", e); - } - } - - /** - * harvestMetadatafromWeb: Harvest metadata from PO.DAAC web service. - */ - private void harvestMetadatafromWeb() { - LOG.info("Metadata download started."); - int startIndex = 0; - int doc_length = 0; - JsonParser parser = new JsonParser(); - do { - String searchAPI = "https://podaac.jpl.nasa.gov/api/dataset?startIndex=" + Integer.toString(startIndex) + "&entries=10&sortField=Dataset-AllTimePopularity&sortOrder=asc&id=&value=&search="; - HttpRequest http = new HttpRequest(); - String response = http.getRequest(searchAPI); - - JsonElement json = parser.parse(response); - JsonObject responseObject = json.getAsJsonObject(); - JsonArray docs = responseObject.getAsJsonObject("response").getAsJsonArray("docs"); - - doc_length = docs.size(); - - File file = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH)); - if (!file.exists()) { - if (file.mkdir()) { - LOG.info("Directory is created!"); - } else { - LOG.error("Failed to create directory!"); - } - } - for (int i = 0; i < doc_length; i++) { - JsonElement item = docs.get(i); - int docId = startIndex + i; - File itemfile = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH) + "/" + docId + ".json"); - - try (FileWriter fw = new FileWriter(itemfile.getAbsoluteFile()); BufferedWriter bw = new BufferedWriter(fw);) { - itemfile.createNewFile(); - bw.write(item.toString()); - } catch (IOException e) { - LOG.error("Error writing metadata to local file!", e); - } - } - - startIndex += 10; - - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOG.error("Error entering Elasticsearch Mappings!", e); - Thread.currentThread().interrupt(); - } - - } while (doc_length != 0); - - LOG.info("Metadata downloading finished"); - } - - @Override - public Object execute(Object o) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java deleted file mode 100644 index 63677e5..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.metadata.pre; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.main.MudrodConstants; -import gov.nasa.jpl.mudrod.metadata.structure.MetadataExtractor; -import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix; -import gov.nasa.jpl.mudrod.utils.MatrixUtil; -import org.apache.spark.api.java.JavaPairRDD; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Properties; - -/** - * Generate term-metadata matrix from original metadata. Each row in - * the matrix is corresponding to a term, and each column is a metadata. - */ -public class MatrixGenerator extends DiscoveryStepAbstract { - - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(MatrixGenerator.class); - - /** - * Creates a new instance of MatrixGenerator. - * - * @param props the Mudrod configuration - * @param es the Elasticsearch drive - * @param spark the spark drive - */ - public MatrixGenerator(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - /** - * Generate a csv which is a term-metadata matrix genetrated from original - * metadata. - * - * @see DiscoveryStepAbstract#execute() - */ - @Override - public Object execute() { - LOG.info("Metadata matrix started"); - startTime = System.currentTimeMillis(); - - String metadataMatrixFile = props.getProperty("metadataMatrix"); - try { - MetadataExtractor extractor = new MetadataExtractor(); - JavaPairRDD<String, List<String>> metadataTermsRDD = extractor.loadMetadata(this.es, this.spark.sc, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE)); - LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metadataTermsRDD); - MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, metadataMatrixFile); - - } catch (Exception e) { - LOG.error("Error during Metadata matrix generaion: {}", e); - } - - endTime = System.currentTimeMillis(); - LOG.info("Metadata matrix finished time elapsed: {}s", (endTime - startTime) / 1000); - return null; - } - - @Override - public Object execute(Object o) { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java deleted file mode 100644 index 60a2429..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes metadata pre-processing functions. - */ -package gov.nasa.jpl.mudrod.metadata.pre; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java deleted file mode 100644 index 67dbf2d..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package gov.nasa.jpl.mudrod.metadata.process; - -import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract; -import gov.nasa.jpl.mudrod.driver.ESDriver; -import gov.nasa.jpl.mudrod.driver.SparkDriver; -import gov.nasa.jpl.mudrod.semantics.SVDAnalyzer; -import gov.nasa.jpl.mudrod.utils.LinkageTriple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.List; -import java.util.Properties; - -/** - * ClassName: MetadataAnalyzer - * Function: Calculate semantic relationship of vocabularies extracted from - * metadata. - */ -public class MetadataAnalyzer extends DiscoveryStepAbstract implements Serializable { - - /** - * - */ - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(MetadataAnalyzer.class); - - /** - * Creates a new instance of MetadataAnalyzer. - * - * @param props the Mudrod configuration - * @param es the Elasticsearch drive - * @param spark the spark drive - */ - public MetadataAnalyzer(Properties props, ESDriver es, SparkDriver spark) { - super(props, es, spark); - } - - @Override - public Object execute(Object o) { - return null; - } - - /** - * Calculate semantic relationship of vocabularies from a csv file which is a - * term-metadata matrix. - * - * @see DiscoveryStepAbstract#execute() - */ - @Override - public Object execute() { - try { - LOG.info("*****************Metadata Analyzer starts******************"); - startTime = System.currentTimeMillis(); - - SVDAnalyzer analyzer = new SVDAnalyzer(props, es, spark); - int svdDimension = Integer.parseInt(props.getProperty("metadataSVDDimension")); - String metadataMatrixFile = props.getProperty("metadataMatrix"); - String svdMatrixFileName = props.getProperty("metadataSVDMatrix_tmp"); - - analyzer.getSVDMatrix(metadataMatrixFile, svdDimension, svdMatrixFileName); - List<LinkageTriple> triples = analyzer.calTermSimfromMatrix(svdMatrixFileName); - - analyzer.saveToES(triples, props.getProperty("indexName"), props.getProperty("metadataLinkageType")); - - } catch (Exception e) { - e.printStackTrace(); - } - - endTime = System.currentTimeMillis(); - es.refreshIndex(); - LOG.info("*****************Metadata Analyzer ends******************Took {}s", (endTime - startTime) / 1000); - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java deleted file mode 100644 index c988f31..0000000 --- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package includes metadata processing classes. - */ -package gov.nasa.jpl.mudrod.metadata.process; \ No newline at end of file
