[ https://issues.apache.org/jira/browse/SDAP-71?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473993#comment-16473993 ]
ASF GitHub Bot commented on SDAP-71: ------------------------------------ lewismc closed pull request #16: SDAP-71 full ingestion workflow and process broken in several places URL: https://github.com/apache/incubator-sdap-mudrod/pull/16 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.gitignore b/.gitignore index 3009cbf..a8ff594 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ core/.externalToolBuilders/Maven_Ant_Builder.launch core/maven-eclipse.xml service/.classpath web/.classpath +web/.externalToolBuilders/ +web/maven-eclipse.xml diff --git a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java index 6d675fb..fc51557 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java +++ b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java @@ -177,15 +177,31 @@ public String customAnalyzing(String indexName, String analyzer, String str) thr } public void deleteAllByQuery(String index, String type, QueryBuilder query) { - ImmutableOpenMap<String, MappingMetaData> mappings = getClient().admin().cluster().prepareState().execute().actionGet() - .getState().metaData().index(index).getMappings(); + ImmutableOpenMap<String, MappingMetaData> mappings = getClient() + .admin() + .cluster() + .prepareState() + .execute() + .actionGet() + .getState() + .metaData() + .index(index) + .getMappings(); //check if the type exists - if (!mappings.containsKey(type)) return; + if (!mappings.containsKey(type)) + return; createBulkProcessor(); - SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new TimeValue(60000)).setQuery(query).setSize(10000).execute() - .actionGet(); + SearchResponse scrollResp = getClient() + .prepareSearch(index) + .setSearchType(SearchType.QUERY_AND_FETCH) + .setTypes(type) + .setScroll(new TimeValue(60000)) + .setQuery(query) + .setSize(10000) + .execute() + .actionGet(); while (true) { for (SearchHit hit : scrollResp.getHits().getHits()) { diff --git a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java index 37e7508..a8cc723 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java +++ b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java @@ -171,7 +171,7 @@ public JsonObject getIngeratedListInJson(String input) { * the similarities from different sources */ public Map<String, List<LinkedTerm>> aggregateRelatedTermsFromAllmodel(String input) { - aggregateRelatedTerms(input, MudrodConstants.USE_HISTORY_LINKAGE_TYPE); + aggregateRelatedTerms(input, MudrodConstants.USER_HISTORY_LINKAGE_TYPE); aggregateRelatedTerms(input, MudrodConstants.CLICK_STREAM_LINKAGE_TYPE); aggregateRelatedTerms(input, MudrodConstants.METADATA_LINKAGE_TYPE); aggregateRelatedTermsSWEET(input, MudrodConstants.ONTOLOGY_LINKAGE_TYPE); @@ -180,7 +180,7 @@ public JsonObject getIngeratedListInJson(String input) { } public int getModelweight(String model) { - if (model.equals(MudrodConstants.USE_HISTORY_LINKAGE_TYPE)) { + if (model.equals(MudrodConstants.USER_HISTORY_LINKAGE_TYPE)) { return Integer.parseInt(props.getProperty(MudrodConstants.USER_HISTORY_W)); } diff --git a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java index 84ba347..2a43e4d 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java +++ b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java @@ -15,23 +15,22 @@ /** * Class contains static constant keys and values relating to Mudrod - * configuration properties. Property values are read from <a href= - * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a> + * configuration properties. Property values are read from <code>config.properties</code>. */ public interface MudrodConstants { public static final String CLEANUP_TYPE = "cleanup.log"; - public static final String CLICK_STREAM_LINKAGE_TYPE = "click.stream.linkage"; + public static final String CLICK_STREAM_LINKAGE_TYPE = "mudrod.clickstream.linkage"; - public static final String CLICK_STREAM_MATRIX_TYPE = "click.stream.matrix"; + public static final String CLICK_STREAM_MATRIX_TYPE = "mudrod.clickstream.matrix"; public static final String CLICKSTREAM_SVD_DIM = "mudrod.clickstream.svd.d"; public static final String CLICKSTREAM_W = "mudrod.clickstream.weight"; - + public static final String CLICKSTREAM_PATH = "mudrod.clickstream.path"; - + public static final String CLICKSTREAM_SVD_PATH = "mudrod.clickstream.svd.path"; /** Defined on CLI */ @@ -52,25 +51,25 @@ public static final String FTP_PREFIX = "mudrod.ftp.prefix"; public static final String FTP_TYPE = "raw.ftp"; - + public static final String FTP_LOG = "ftp"; public static final String HTTP_PREFIX = "mudrod.http.prefix"; public static final String HTTP_TYPE = "raw.http"; - + public static final String HTTP_LOG = "http"; - + public static final String BASE_URL = "mudrod.base.url"; - + public static final String BLACK_LIST_REQUEST = "mudrod.black.request.list"; - + public static final String BLACK_LIST_AGENT = "mudrod.black.agent.list"; public static final String LOG_INDEX = "mudrod.log.index"; public static final String METADATA_LINKAGE_TYPE = "metadata.linkage"; - + public static final String METADATA_DOWNLOAD_URL = "mudrod.metadata.download.url"; public static final String METADATA_SVD_DIM = "mudrod.metadata.svd.d"; @@ -93,38 +92,38 @@ public static final String ONTOLOGY_LINKAGE_TYPE = "ontology.linkage"; public static final String ONTOLOGY_W = "mudrod.ontology.weight"; - + public static final String ONTOLOGY_PATH = "mudrod.ontology.path"; - + public static final String ONTOLOGY_INPUT_PATH = "mudrod.ontology.input.path"; /** Defined on CLI */ public static final String METADATA_DOWNLOAD = "mudrod.metadata.download"; - + public static final String RAW_METADATA_PATH = "mudrod.metadata.path"; public static final String RAW_METADATA_TYPE = "mudrod.metadata.type"; - + public static final String METADATA_MATRIX_PATH = "mudrod.metadata.matrix.path"; - + public static final String METADATA_SVD_PATH = "mudrod.metadata.svd.path"; - + public static final String RECOM_METADATA_TYPE = "recommedation.metadata"; - + public static final String METADATA_ID = "mudrod.metadata.id"; - + public static final String SEMANTIC_FIELDS = "mudrod.metadata.semantic.fields"; - + public static final String METADATA_WORD_SIM_TYPE = "metadata.word.sim"; - + public static final String METADATA_FEATURE_SIM_TYPE = "metadata.feature.sim"; - + public static final String METADATA_SESSION_SIM_TYPE = "metadata.session.sim"; - + public static final String METADATA_TERM_MATRIX_PATH = "metadata.term.matrix.path"; - + public static final String METADATA_WORD_MATRIX_PATH = "metadata.word.matrix.path"; - + public static final String METADATA_SESSION_MATRIX_PATH = "metadata.session.matrix.path"; public static final String REQUEST_RATE = "mudrod.request.rate"; @@ -138,32 +137,29 @@ public static final String SPARK_APP_NAME = "mudrod.spark.app.name"; public static final String SPARK_MASTER = "mudrod.spark.master"; - /** - * Absolute local location of javaSVMWithSGDModel directory. This is typically - * <code>file:///usr/local/mudrod/core/src/main/resources/javaSVMWithSGDModel</code> - */ + public static final String RANKING_MODEL = "mudrod.ranking.model"; - + public static final String RANKING_ML = "mudrod.ranking.machine.learning"; public static final String REQUEST_TIME_GAP = "mudrod.request.time.gap"; public static final String TIME_SUFFIX = "time.suffix"; - public static final String USE_HISTORY_LINKAGE_TYPE = "user.history.linkage"; + public static final String USER_HISTORY_LINKAGE_TYPE = "mudrod.user.history.linkage"; public static final String USER_HISTORY_W = "mudrod.user.history.weight"; - + public static final String USER_HISTORY_PATH = "mudrod.user.history.path"; public static final String VIEW_F = "mudrod.view.freq"; - + public static final String VIEW_MARKER = "mudrod.view.url.marker"; - + public static final String SEARCH_MARKER = "mudrod.search.url.marker"; - + public static final String SEARCH_F = "mudrod.search.freq"; - + public static final String DOWNLOAD_F = "mudrod.download.freq"; } diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java index b74074a..f492ecf 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java +++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java @@ -32,7 +32,7 @@ import java.util.Properties; /** - * ClassName: ApiHarvester Function: Harvest metadata from PO.DAACweb service. + * Harvest metadata from PO.DAAC Webservices. */ public class ApiHarvester extends DiscoveryStepAbstract { @@ -70,7 +70,7 @@ public Object execute() { } /** - * addMetadataMapping: Add mapping to index metadata in Elasticsearch. Please + * Add mapping to index metadata in Elasticsearch. Please * invoke this method before import metadata to Elasticsearch. */ public void addMetadataMapping() { @@ -84,7 +84,7 @@ public void addMetadataMapping() { } /** - * importToES: Index metadata into elasticsearch from local file directory. + * Index metadata into elasticsearch from local file directory. * Please make sure metadata have been harvest from web service before * invoking this method. */ @@ -118,12 +118,12 @@ private void importSingleFileToES(InputStream is) { } /** - * harvestMetadatafromWeb: Harvest metadata from PO.DAAC web service. + * Harvest metadata from PO.DAAC web service. */ private void harvestMetadatafromWeb() { LOG.info("Metadata download started."); int startIndex = 0; - int doc_length = 0; + int docLength = 0; JsonParser parser = new JsonParser(); do { String searchAPI = props.getProperty(MudrodConstants.METADATA_DOWNLOAD_URL); @@ -135,7 +135,7 @@ private void harvestMetadatafromWeb() { JsonObject responseObject = json.getAsJsonObject(); JsonArray docs = responseObject.getAsJsonObject("response").getAsJsonArray("docs"); - doc_length = docs.size(); + docLength = docs.size(); File file = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH)); if (!file.exists()) { @@ -145,7 +145,7 @@ private void harvestMetadatafromWeb() { LOG.error("Failed to create directory!"); } } - for (int i = 0; i < doc_length; i++) { + for (int i = 0; i < docLength; i++) { JsonElement item = docs.get(i); int docId = startIndex + i; File itemfile = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH) + "/" + docId + ".json"); @@ -167,7 +167,7 @@ private void harvestMetadatafromWeb() { Thread.currentThread().interrupt(); } - } while (doc_length != 0); + } while (docLength != 0); LOG.info("Metadata downloading finished"); } diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java index e4a6320..a8d3d22 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java +++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java @@ -64,7 +64,12 @@ public Object execute() { String metadataMatrixFile = props.getProperty(MudrodConstants.METADATA_MATRIX_PATH); try { MetadataExtractor extractor = new MetadataExtractor(); - JavaPairRDD<String, List<String>> metadataTermsRDD = extractor.loadMetadata(this.es, this.spark.sc, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE)); + JavaPairRDD<String, List<String>> metadataTermsRDD = + extractor.loadMetadata( + this.es, + this.spark.sc, + props.getProperty(MudrodConstants.ES_INDEX_NAME), + props.getProperty(MudrodConstants.RAW_METADATA_TYPE)); LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metadataTermsRDD); MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, metadataMatrixFile); diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java index 303eb52..7dc72c5 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java +++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java @@ -27,8 +27,7 @@ import java.util.Properties; /** - * ClassName: MetadataAnalyzer - * Function: Calculate semantic relationship of vocabularies extracted from + * Calculate semantic relationship of vocabularies extracted from * metadata. */ public class MetadataAnalyzer extends DiscoveryStepAbstract implements Serializable { diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java index e17d2ce..3486607 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java @@ -81,7 +81,11 @@ public LabeledRowMatrix generateWordBasedTFIDF() throws Exception { LabeledRowMatrix wordtfidfMatrix = opt.tFIDFTokens(metadataWords, spark); - MatrixUtil.exportToCSV(wordtfidfMatrix.rowMatrix, wordtfidfMatrix.rowkeys, wordtfidfMatrix.colkeys, props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH)); + MatrixUtil.exportToCSV( + wordtfidfMatrix.rowMatrix, + wordtfidfMatrix.rowkeys, + wordtfidfMatrix.colkeys, + props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH)); return wordtfidfMatrix; } @@ -100,7 +104,11 @@ public LabeledRowMatrix generateTermBasedTFIDF() throws Exception { LabeledRowMatrix tokentfidfMatrix = opt.tFIDFTokens(metadataTokens, spark); - MatrixUtil.exportToCSV(tokentfidfMatrix.rowMatrix, tokentfidfMatrix.rowkeys, tokentfidfMatrix.colkeys, props.getProperty(MudrodConstants.METADATA_TERM_MATRIX_PATH)); + MatrixUtil.exportToCSV( + tokentfidfMatrix.rowMatrix, + tokentfidfMatrix.rowkeys, + tokentfidfMatrix.colkeys, + props.getProperty(MudrodConstants.METADATA_TERM_MATRIX_PATH)); return tokentfidfMatrix; } diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java similarity index 72% rename from core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java rename to core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java index 9121719..546fae5 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java +++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java @@ -32,13 +32,13 @@ * Supports ability to import click stream data into Elasticsearch * through .csv file */ -public class ClickstreamImporter extends MudrodAbstract { +public class ClickStreamImporter extends MudrodAbstract { /** * */ private static final long serialVersionUID = 1L; - public ClickstreamImporter(Properties props, ESDriver es, SparkDriver spark) { + public ClickStreamImporter(Properties props, ESDriver es, SparkDriver spark) { super(props, es, spark); addClickStreamMapping(); } @@ -47,20 +47,23 @@ public ClickstreamImporter(Properties props, ESDriver es, SparkDriver spark) { * Method to add Elasticsearch mapping for click stream data */ public void addClickStreamMapping() { - XContentBuilder Mapping; + XContentBuilder mapping; + String clickStreamMatrixType = props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE); try { - Mapping = jsonBuilder().startObject().startObject( - props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).startObject( - "properties").startObject("query").field("type", "string").field( - "index", "not_analyzed").endObject().startObject("dataID").field( - "type", "string").field("index", "not_analyzed").endObject() - - .endObject().endObject().endObject(); + mapping = jsonBuilder() + .startObject() + .startObject(clickStreamMatrixType) + .startObject("properties") + .startObject("query").field("type", "string").field("index", "not_analyzed").endObject() + .startObject("dataID").field("type", "string").field("index", "not_analyzed").endObject() + .endObject() + .endObject() + .endObject(); es.getClient().admin().indices().preparePutMapping( props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType( - props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).setSource( - Mapping).execute().actionGet(); + clickStreamMatrixType).setSource( + mapping).execute().actionGet(); } catch (IOException e) { e.printStackTrace(); } @@ -70,8 +73,9 @@ public void addClickStreamMapping() { * Method to import click stream CSV into Elasticsearch */ public void importfromCSVtoES() { - es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), - props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)); + String clickStreamMatrixType = props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE); + String esIndexName = props.getProperty(MudrodConstants.ES_INDEX_NAME); + es.deleteType(esIndexName, clickStreamMatrixType); es.createBulkProcessor(); BufferedReader br = null; @@ -86,8 +90,7 @@ public void importfromCSVtoES() { String[] clicks = line.split(cvsSplitBy); for (int i = 1; i < clicks.length; i++) { if (!"0.0".equals(clicks[i])) { - IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), - props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)) + IndexRequest ir = new IndexRequest(esIndexName, clickStreamMatrixType) .source(jsonBuilder().startObject().field("query", clicks[0]).field( "dataID", dataList[i]).field("clicks", clicks[i]).endObject()); es.getBulkProcessor().add(ir); diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java b/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java index a574041..1ebae17 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java @@ -57,7 +57,6 @@ public static DecimalFormat df = new DecimalFormat("#.00"); public LinkageTriple() { - // TODO Auto-generated constructor stub } /** @@ -96,8 +95,13 @@ public static void insertTriples(ESDriver es, List<LinkageTriple> triples, Strin } else { jsonBuilder.field("keywords", triple.keyA + "," + triple.keyB); } - - jsonBuilder.field("weight", Double.parseDouble(df.format(triple.weight))); + double tripleWeight = 0; + try { + tripleWeight = Double.parseDouble(df.format(triple.weight)); + } catch (NumberFormatException e) { + // do nothing, triple weight is 0 as it cannot be parsed + } + jsonBuilder.field("weight", tripleWeight); jsonBuilder.endObject(); IndexRequest ir = new IndexRequest(index, type).source(jsonBuilder); diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java index 7eef272..ec27a29 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java @@ -457,8 +457,11 @@ public static void exportToCSV(RowMatrix matrix, List<String> rowKeys, List<Stri } try { file.createNewFile(); - FileWriter fw = new FileWriter(file.getAbsoluteFile()); - BufferedWriter bw = new BufferedWriter(fw); + } catch (IOException e1) { + e1.printStackTrace(); + } + try (FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw);){ String coltitle = " Num" + ","; for (int j = 0; j < colnum; j++) { coltitle += "\"" + colKeys.get(j) + "\","; @@ -475,12 +478,8 @@ public static void exportToCSV(RowMatrix matrix, List<String> rowKeys, List<Stri row = row.substring(0, row.length() - 1); bw.write(row + "\n"); } - - bw.close(); - } catch (IOException e) { e.printStackTrace(); - } } } diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java index 704ccfd..2f37af9 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java @@ -31,7 +31,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.joda.time.DateTime; import org.joda.time.Seconds; import org.joda.time.format.DateTimeFormatter; @@ -40,7 +39,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -98,7 +101,8 @@ public Object execute() { public boolean checkKnownCrawler(String agent) { String[] crawlers = props.getProperty(MudrodConstants.BLACK_LIST_AGENT).split(","); for (int i = 0; i < crawlers.length; i++) { - if (agent.toLowerCase().contains(crawlers[i].trim())) return true; + if (agent.toLowerCase().contains(crawlers[i].trim())) + return true; } return false; } @@ -110,20 +114,20 @@ void checkByRateInParallel() throws InterruptedException, IOException { int userCount = 0; userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> { - ESDriver tmpES = new ESDriver(props); - tmpES.createBulkProcessor(); + ESDriver tmpEs = new ESDriver(props); + tmpEs.createBulkProcessor(); List<Integer> realUserNums = new ArrayList<>(); while (iterator.hasNext()) { String s = iterator.next(); - Integer realUser = checkByRate(tmpES, s); + Integer realUser = checkByRate(tmpEs, s); realUserNums.add(realUser); } - tmpES.destroyBulkProcessor(); - tmpES.close(); + tmpEs.destroyBulkProcessor(); + tmpEs.close(); return realUserNums.iterator(); }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b); - LOG.info("User count: {}", Integer.toString(userCount)); + LOG.info("Final user count: {}", Integer.toString(userCount)); } private int checkByRate(ESDriver es, String user) { @@ -135,8 +139,19 @@ private int checkByRate(ESDriver es, String user) { BoolQueryBuilder filterSearch = new BoolQueryBuilder(); filterSearch.must(QueryBuilders.termQuery("IP", user)); - AggregationBuilder aggregation = AggregationBuilders.dateHistogram("by_minute").field("Time").dateHistogramInterval(DateHistogramInterval.MINUTE).order(Order.COUNT_DESC); - SearchResponse checkRobot = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setQuery(filterSearch).setSize(0).addAggregation(aggregation).execute().actionGet(); + AggregationBuilder aggregation = AggregationBuilders + .dateHistogram("by_minute") + .field("Time") + .dateHistogramInterval(DateHistogramInterval.MINUTE) + .order(Order.COUNT_DESC); + SearchResponse checkRobot = es.getClient() + .prepareSearch(logIndex) + .setTypes(httpType, ftpType) + .setQuery(filterSearch) + .setSize(0) + .addAggregation(aggregation) + .execute() + .actionGet(); Histogram agg = checkRobot.getAggregations().get("by_minute"); diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java index 1969e3e..415f377 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java @@ -72,38 +72,54 @@ public void generateBinaryMatrix() { bw.write("Num" + ","); // step 1: write first row of csv - List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX)); + List<String> logIndexList = es.getIndexListWithPrefix( + props.getProperty(MudrodConstants.LOG_INDEX)); String[] logIndices = logIndexList.toArray(new String[0]); String[] statictypeArray = new String[] { this.sessionStats }; int docCount = es.getDocCount(logIndices, statictypeArray); - - LOG.info("{}: {}", this.sessionStats, docCount); - if (docCount==0) - { + LOG.info("{}: {}", this.sessionStats, docCount); + + if (docCount==0) { bw.close(); file.delete(); return; } - SearchResponse sr = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0) - .addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet(); + SearchResponse sr = es.getClient() + .prepareSearch(logIndices) + .setTypes(statictypeArray) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(0) + .addAggregation(AggregationBuilders.terms("IPs") + .field("IP") + .size(docCount)) + .execute() + .actionGet(); Terms ips = sr.getAggregations().get("IPs"); List<String> ipList = new ArrayList<>(); for (Terms.Bucket entry : ips.getBuckets()) { - if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) { // filter - // out - // less - // active users/ips + // filter + if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) { + // out less active users/ips ipList.add(entry.getKey().toString()); } } bw.write(String.join(",", ipList) + "\n"); // step 2: step the rest rows of csv - SearchRequestBuilder sr2Builder = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0) - .addAggregation(AggregationBuilders.terms("KeywordAgg").field("keywords").size(docCount).subAggregation(AggregationBuilders.terms("IPAgg").field("IP").size(docCount))); + SearchRequestBuilder sr2Builder = es.getClient() + .prepareSearch(logIndices) + .setTypes(statictypeArray) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(0) + .addAggregation(AggregationBuilders.terms("KeywordAgg") + .field("keywords") + .size(docCount) + .subAggregation(AggregationBuilders.terms("IPAgg") + .field("IP") + .size(docCount))); SearchResponse sr2 = sr2Builder.execute().actionGet(); Terms keywords = sr2.getAggregations().get("KeywordAgg"); @@ -132,6 +148,7 @@ public void generateBinaryMatrix() { } bw.close(); + fw.close(); } catch (IOException e) { e.printStackTrace(); } diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java index f3a7f6d..1d04902 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java @@ -53,7 +53,8 @@ public LogAbstract(Properties props, ESDriver es, SparkDriver spark) { } protected void initLogIndex() { - logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + props.getProperty(MudrodConstants.TIME_SUFFIX); + logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + + props.getProperty(MudrodConstants.TIME_SUFFIX); httpType = MudrodConstants.HTTP_TYPE; ftpType = MudrodConstants.FTP_TYPE; cleanupType = MudrodConstants.CLEANUP_TYPE; @@ -116,8 +117,16 @@ public Terms getUserTerms(String... type) { int docCount = es.getDocCount(logIndex, type); - SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0) - .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount)).execute().actionGet(); + SearchResponse sr = es.getClient() + .prepareSearch(logIndex) + .setTypes(type) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(0) + .addAggregation(AggregationBuilders.terms("Users") + .field("IP") + .size(docCount)) + .execute() + .actionGet(); return sr.getAggregations().get("Users"); } @@ -138,10 +147,23 @@ public Terms getUserTerms(String... type) { int docCount = es.getDocCount(logIndex, httpType); - AggregationBuilder dailyAgg = AggregationBuilders.dateHistogram("by_day").field("Time").dateHistogramInterval(DateHistogramInterval.DAY).order(Order.COUNT_DESC); - - SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(httpType).setQuery(QueryBuilders.matchAllQuery()).setSize(0) - .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount).subAggregation(dailyAgg)).execute().actionGet(); + AggregationBuilder dailyAgg = AggregationBuilders + .dateHistogram("by_day") + .field("Time") + .dateHistogramInterval(DateHistogramInterval.DAY) + .order(Order.COUNT_DESC); + + SearchResponse sr = es.getClient() + .prepareSearch(logIndex) + .setTypes(httpType) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(0) + .addAggregation(AggregationBuilders.terms("Users") + .field("IP") + .size(docCount) + .subAggregation(dailyAgg)) + .execute() + .actionGet(); Terms users = sr.getAggregations().get("Users"); Map<String, Long> userList = new HashMap<>(); for (Terms.Bucket user : users.getBuckets()) { @@ -201,11 +223,16 @@ public Terms getSessionTerms() { int docCount = es.getDocCount(this.logIndex, this.cleanupType); - SearchResponse sr = es.getClient().prepareSearch(this.logIndex).setTypes(this.cleanupType).setQuery(QueryBuilders.matchAllQuery()) - .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount)).execute().actionGet(); - - Terms Sessions = sr.getAggregations().get("Sessions"); - return Sessions; + SearchResponse sr = es.getClient() + .prepareSearch(this.logIndex) + .setTypes(this.cleanupType) + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(AggregationBuilders.terms("Sessions") + .field("SessionID") + .size(docCount)) + .execute() + .actionGet(); + return sr.getAggregations().get("Sessions"); } public List<String> getSessions() { @@ -213,7 +240,7 @@ public Terms getSessionTerms() { Terms sessions = this.getSessionTerms(); List<String> sessionList = new ArrayList<>(); for (Terms.Bucket entry : sessions.getBuckets()) { - if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) { + if (entry.getDocCount() >= 3 && !"invalid".equals(entry.getKey())) { String session = (String) entry.getKey(); sessionList.add(session); } diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java index 4ce1535..3e8ffa7 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java @@ -93,50 +93,15 @@ public void generateSession() { } public void genSessionByReferer(int timeThres) throws InterruptedException, IOException { - - genSessionByRefererInParallel(timeThres); - + genSessionByRefererInParallel(timeThres); } public void combineShortSessions(int timeThres) throws InterruptedException, IOException { - - combineShortSessionsInParallel(timeThres); - + combineShortSessionsInParallel(timeThres); } /** - * Method to generate session by time threshold and referrer - * - * @param timeThres value of time threshold (s) - * @throws ElasticsearchException ElasticsearchException - * @throws IOException IOException - */ - public void genSessionByRefererInSequential(int timeThres) throws ElasticsearchException, IOException { - - Terms users = this.getUserTerms(this.cleanupType); - - int sessionCount = 0; - for (Terms.Bucket entry : users.getBuckets()) { - - String user = (String) entry.getKey(); - Integer sessionNum = genSessionByReferer(es, user, timeThres); - sessionCount += sessionNum; - } - - LOG.info("Initial session count: {}", Integer.toString(sessionCount)); - } - - public void combineShortSessionsInSequential(int timeThres) throws ElasticsearchException, IOException { - - Terms users = this.getUserTerms(this.cleanupType); - for (Terms.Bucket entry : users.getBuckets()) { - String user = entry.getKey().toString(); - combineShortSessions(es, user, timeThres); - } - } - - /** - * Method to remove invalid logs through IP address + * Remove invalid logs through IP address * * @param es an instantiated es driver * @param ip invalid IP address @@ -148,13 +113,24 @@ public void deleteInvalid(ESDriver es, String ip) throws IOException { BoolQueryBuilder filterAll = new BoolQueryBuilder(); filterAll.must(QueryBuilders.termQuery("IP", ip)); - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterAll).setSize(100).execute().actionGet(); + SearchResponse scrollResp = es.getClient() + .prepareSearch(logIndex) + .setTypes(this.cleanupType) + .setScroll(new TimeValue(60000)) + .setQuery(filterAll) + .setSize(100) + .execute() + .actionGet(); while (true) { for (SearchHit hit : scrollResp.getHits().getHits()) { update(es, logIndex, cleanupType, hit.getId(), "SessionID", "invalid"); } - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + scrollResp = es.getClient() + .prepareSearchScroll(scrollResp.getScrollId()) + .setScroll(new TimeValue(600000)) + .execute() + .actionGet(); if (scrollResp.getHits().getHits().length == 0) { break; } @@ -174,14 +150,14 @@ public void deleteInvalid(ESDriver es, String ip) throws IOException { * @throws IOException */ private void update(ESDriver es, String index, String type, String id, String field1, Object value1) throws IOException { - UpdateRequest ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject()); + UpdateRequest ur = new UpdateRequest(index, type, id) + .doc(jsonBuilder().startObject().field(field1, value1).endObject()); es.getBulkProcessor().add(ur); } public void genSessionByRefererInParallel(int timeThres) throws InterruptedException, IOException { JavaRDD<String> userRDD = getUserRDD(this.cleanupType); - int sessionCount = 0; sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { /** @@ -226,17 +202,24 @@ public int genSessionByReferer(ESDriver es, String user, int timeThres) throws E BoolQueryBuilder filterSearch = new BoolQueryBuilder(); filterSearch.must(QueryBuilders.termQuery("IP", user)); - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterSearch).addSort("Time", SortOrder.ASC).setSize(100) - .execute().actionGet(); + SearchResponse scrollResp = es.getClient() + .prepareSearch(logIndex) + .setTypes(this.cleanupType) + .setScroll(new TimeValue(60000)) + .setQuery(filterSearch) + .addSort("Time", SortOrder.ASC) + .setSize(100) + .execute() + .actionGet(); Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>(); - String request = ""; - String referer = ""; - String logType = ""; - String id = ""; + String request; + String referer; + String logType; + String id; String ip = user; String indexUrl = props.getProperty(MudrodConstants.BASE_URL) + "/"; - DateTime time = null; + DateTime time; DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); while (scrollResp.getHits().getHits().length != 0) { @@ -354,6 +337,7 @@ public void call(Iterator<String> arg0) throws Exception { tmpES.close(); } }); + LOG.info("Final Session count (after combining short sessions): {}", Long.toString(userRDD.count())); } public void combineShortSessions(ESDriver es, String user, int timeThres) throws ElasticsearchException, IOException { @@ -372,7 +356,14 @@ public void combineShortSessions(ESDriver es, String user, int timeThres) throws BoolQueryBuilder filterCheck = new BoolQueryBuilder(); filterCheck.must(QueryBuilders.termQuery("IP", user)).must(QueryBuilders.termQuery("Referer", "-")); - SearchResponse checkReferer = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterCheck).setSize(0).execute().actionGet(); + SearchResponse checkReferer = es.getClient() + .prepareSearch(logIndex) + .setTypes(this.cleanupType) + .setScroll(new TimeValue(60000)) + .setQuery(filterCheck) + .setSize(0) + .execute() + .actionGet(); long numInvalid = checkReferer.getHits().getTotalHits(); double invalidRate = numInvalid / docCount; @@ -383,8 +374,17 @@ public void combineShortSessions(ESDriver es, String user, int timeThres) throws } StatsAggregationBuilder statsAgg = AggregationBuilders.stats("Stats").field("Time"); - SearchResponse srSession = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(filterSearch) - .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount).subAggregation(statsAgg)).execute().actionGet(); + SearchResponse srSession = es.getClient() + .prepareSearch(logIndex) + .setTypes(this.cleanupType) + .setScroll(new TimeValue(60000)) + .setQuery(filterSearch) + .addAggregation(AggregationBuilders.terms("Sessions") + .field("SessionID") + .size(docCount) + .subAggregation(statsAgg)) + .execute() + .actionGet(); Terms sessions = srSession.getAggregations().get("Sessions"); @@ -400,7 +400,7 @@ public void combineShortSessions(ESDriver es, String user, int timeThres) throws String last = null; String lastnewID = null; String lastoldID = null; - String current = null; + String current; for (Session s : sessionList) { current = s.getEndTime(); if (last != null) { @@ -413,7 +413,14 @@ public void combineShortSessions(ESDriver es, String user, int timeThres) throws QueryBuilder fs = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", s.getID())); - SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new TimeValue(60000)).setQuery(fs).setSize(100).execute().actionGet(); + SearchResponse scrollResp = es.getClient() + .prepareSearch(logIndex) + .setTypes(this.cleanupType) + .setScroll(new TimeValue(60000)) + .setQuery(fs) + .setSize(100) + .execute() + .actionGet(); while (true) { for (SearchHit hit : scrollResp.getHits().getHits()) { if (lastnewID == null) { @@ -423,7 +430,11 @@ public void combineShortSessions(ESDriver es, String user, int timeThres) throws } } - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + scrollResp = es.getClient() + .prepareSearchScroll(scrollResp.getScrollId()) + .setScroll(new TimeValue(600000)) + .execute() + .actionGet(); if (scrollResp.getHits().getHits().length == 0) { break; } diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java index 981bece..e99af06 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java @@ -27,7 +27,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.stats.Stats; import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder; import org.joda.time.DateTime; @@ -38,7 +37,12 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -81,7 +85,7 @@ public Object execute() { } public void processSession() throws InterruptedException, IOException, ExecutionException { - processSessionInParallel(); + processSessionInParallel(); } /** @@ -160,7 +164,13 @@ public int processSession(ESDriver es, String sessionId) throws IOException, Int BoolQueryBuilder filterSearch = new BoolQueryBuilder(); filterSearch.must(QueryBuilders.termQuery("SessionID", sessionId)); - SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(inputType).setQuery(filterSearch).addAggregation(statsAgg).execute().actionGet(); + SearchResponse sr = es.getClient() + .prepareSearch(logIndex) + .setTypes(inputType) + .setQuery(filterSearch) + .addAggregation(statsAgg) + .execute() + .actionGet(); Stats agg = sr.getAggregations().get("Stats"); min = agg.getMinAsString(); @@ -231,9 +241,9 @@ public int processSession(ESDriver es, String sessionId) throws IOException, Int String view = findDataset(request); if ("".equals(views)) views = view; - else if (!views.contains(view)) - views = views + "," + view; - } + else if (!views.contains(view)) + views = views + "," + view; + } } if (MudrodConstants.FTP_LOG.equals(logType)) { ftpRequestCount++; @@ -257,7 +267,11 @@ else if (!views.contains(view)) } - scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + scrollResp = es.getClient() + .prepareSearchScroll(scrollResp.getScrollId()) + .setScroll(new TimeValue(600000)) + .execute() + .actionGet(); // Break condition: No hits are returned if (scrollResp.getHits().getHits().length == 0) { break; @@ -269,12 +283,17 @@ else if (!views.contains(view)) } if (searchDataListRequestCount != 0 && - searchDataListRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.SEARCH_F)) && - searchDataRequestCount != 0 && - searchDataRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.VIEW_F)) && - ftpRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_F))) + searchDataListRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.SEARCH_F)) && + searchDataRequestCount != 0 && + searchDataRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.VIEW_F)) && + ftpRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_F))) { - String sessionURL = props.getProperty(MudrodConstants.SESSION_PORT) + props.getProperty(MudrodConstants.SESSION_URL) + "?sessionid=" + sessionId + "&sessionType=" + outputType + "&requestType=" + inputType; + String sessionURL = props.getProperty( + MudrodConstants.SESSION_PORT) + + props.getProperty(MudrodConstants.SESSION_URL) + + "?sessionid=" + sessionId + + "&sessionType=" + outputType + + "&requestType=" + inputType; sessionCount = 1; IndexRequest ir = new IndexRequest(logIndex, outputType).source( diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java index 70c4067..193115e 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java @@ -18,7 +18,7 @@ import org.apache.sdap.mudrod.driver.SparkDriver; import org.apache.sdap.mudrod.main.MudrodConstants; import org.apache.sdap.mudrod.semantics.SVDAnalyzer; -import org.apache.sdap.mudrod.ssearch.ClickstreamImporter; +import org.apache.sdap.mudrod.ssearch.ClickStreamImporter; import org.apache.sdap.mudrod.utils.LinkageTriple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,18 +51,18 @@ public Object execute() { LOG.info("Starting ClickStreamAnalyzer..."); startTime = System.currentTimeMillis(); try { - String clickstream_matrixFile = props.getProperty(MudrodConstants.CLICKSTREAM_PATH); - File f = new File(clickstream_matrixFile); + String clickstreamMatrixFile = props.getProperty(MudrodConstants.CLICKSTREAM_PATH); + File f = new File(clickstreamMatrixFile); if (f.exists()) { SVDAnalyzer svd = new SVDAnalyzer(props, es, spark); - svd.getSVDMatrix(clickstream_matrixFile, + svd.getSVDMatrix(clickstreamMatrixFile, Integer.parseInt(props.getProperty(MudrodConstants.CLICKSTREAM_SVD_DIM)), props.getProperty(MudrodConstants.CLICKSTREAM_SVD_PATH)); List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty(MudrodConstants.CLICKSTREAM_SVD_PATH)); svd.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.CLICK_STREAM_LINKAGE_TYPE); // Store click stream in ES for the ranking use - ClickstreamImporter cs = new ClickstreamImporter(props, es, spark); + ClickStreamImporter cs = new ClickStreamImporter(props, es, spark); cs.importfromCSVtoES(); } } catch (Exception e) { diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java index c4c5da4..cb1e12f 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java @@ -50,7 +50,7 @@ public Object execute() { SemanticAnalyzer sa = new SemanticAnalyzer(props, es, spark); List<LinkageTriple> tripleList = sa.calTermSimfromMatrix(props.getProperty(MudrodConstants.USER_HISTORY_PATH)); - sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.USE_HISTORY_LINKAGE_TYPE)); + sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.USER_HISTORY_LINKAGE_TYPE)); endTime = System.currentTimeMillis(); es.refreshIndex(); diff --git a/core/src/main/resources/config.properties b/core/src/main/resources/config.properties index 495d29c..5ed2504 100644 --- a/core/src/main/resources/config.properties +++ b/core/src/main/resources/config.properties @@ -16,14 +16,14 @@ mudrod.es.transport.tcp.port = 9300 mudrod.es.unicast.hosts = 127.0.0.1 mudrod.es.http.port = 9200 mudrod.es.index = mudrod - + # Spark related # Log processing type. Possible values include 'sequential' or 'parallel' mudrod.processing.type = parallel mudrod.spark.app.name = MudrodSparkApp mudrod.spark.master = local[4] mudrod.spark.optimize = repartition - + # Web log processing configuration # index name has to be all lowercase mudrod.log.index = log @@ -38,37 +38,43 @@ mudrod.download.freq = 100 mudrod.request.rate = 30 mudrod.session.port = 8080 mudrod.session.url = /mudrod-service/session.html -mudrod.request.time.gap = 600 +mudrod.request.time.gap = 600 mudrod.view.url.marker = /dataset/ mudrod.search.url.marker = /datasetlist? -# In order to better parse a URL (getting searching keyword, etc.), please consider custimize +# In order to better parse a URL (getting searching keyword, etc.), please consider customizing the # org.apache.sdap.mudrod.weblog.structure.RequestUrl - GetSearchInfo, getFilterInfo - + # User search history mudrod.query.min = 0 mudrod.user.history.weight = 2 - +mudrod.user.history.linkage = UserHistoryLinkage +mudrod.user.history.path = userHistoryMatrix + # clickstream mudrod.download.weight = 3 mudrod.clickstream.svd.d = 50 mudrod.clickstream.weight = 2 - +mudrod.clickstream.matrix = clickStreamMatrix +mudrod.clickstream.linkage = ClickStreamLinkage +mudrod.clickstream.path = clickStreamMatrix +mudrod.clickstream.svd.path = clickStreamMatrixSVD + # metadata -mudrod.metadata.download = 0 +mudrod.metadata.download = 1 mudrod.metadata.download.url = https://podaac.jpl.nasa.gov/api/dataset?startIndex=$startIndex&entries=10&sortField=Dataset-AllTimePopularity&sortOrder=asc&id=&value=&search= mudrod.metadata.svd.d = 50 mudrod.metadata.url = null mudrod.metadata.weight = 1 mudrod.metadata.type = RawMetadata - + # ranking, ${svmSgdModel.value} is resolved at build time. See the property in core/pom.xml for the value mudrod.ranking.machine.learning = 1 mudrod.ranking.model = ${svmSgdModel.value}.zip - + # recommendation mudrod.metadata.id = Dataset-ShortName mudrod.metadata.semantic.fields = DatasetParameter-Term,DatasetParameter-Variable,Dataset-ExtractTerm # ontology service implementation. Possible values include EsipPortal - EsipPortalOntology EsipCOR - EsipCOROntology Local - org.apache.sdap.mudrod.ontology.process.Local mudrod.ontology.implementation = Local -mudrod.ontology.weight = 2 +mudrod.ontology.weight = 2 \ No newline at end of file diff --git a/core/src/main/resources/elastic_mappings.json b/core/src/main/resources/elastic_mappings.json index ddb1952..3730b87 100644 --- a/core/src/main/resources/elastic_mappings.json +++ b/core/src/main/resources/elastic_mappings.json @@ -1,116 +1,68 @@ { - "mappings": { - "doc": { - "dynamic_templates": [ - { - "geo_point_mapping": { - "match_mapping_type": "nested", - "match": "Point", - "mapping": { - "type": "geo_point" - } - }, - "geo_shape_mapping": { - "match_mapping_type": "nested", - "match": "Polygon", - "mapping": { - "type": "geo_shape" - } - } - } - ] - } - }, - "eonet_event": { - "properties": { - "sources": { - "type": "nested", - "fields": { - "id": { - "ignore_above": 256, - "type": "keyword" - }, - "url": { - "ignore_above": 256, - "type": "keyword" - } - } - }, - "geometries": { - "type": "nested", - "fields": { - "date": { - "ignore_above": 256, - "type": "date" - }, - "type": { - "ignore_above": 256, - "type": "keyword" - }, - "coordinates": { - "ignore_above": 256, - "dynamic": true - } - } - }, - "link": { - "type": "text", - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - } - }, - "closed": { - "type": "text", - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - } - }, - "description": { - "type": "text", - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - } - }, - "categories": { - "type": "nested", - "fields": { - "id": { - "ignore_above": 256, - "type": "integer" - }, - "title": { - "ignore_above": 256, - "type": "keyword" - } - } - }, - "id": { - "type": "text", - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - } - }, - "title": { - "type": "completion", - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - } - } - } + "_default_":{ + "properties":{ + "keywords":{ + "type":"text", + "analyzer":"csv", + "fielddata":true + }, + "views":{ + "type":"string", + "analyzer":"csv" + }, + "downloads":{ + "type":"string", + "analyzer":"csv" + }, + "RequestUrl":{ + "type":"string", + "include_in_all":false, + "index":"no" + }, + "IP":{ + "type":"keyword", + "index":"not_analyzed" + }, + "Browser":{ + "type":"string", + "include_in_all":false, + "index":"no" + }, + "SessionURL":{ + "type":"string", + "include_in_all":false, + "index":"no" + }, + "Referer":{ + "type":"string", + "index":"not_analyzed" + }, + "SessionID":{ + "type":"string", + "index":"not_analyzed" + }, + "Response":{ + "type":"string", + "include_in_all":false, + "index":"no" + }, + "Request":{ + "type":"string", + "include_in_all":false, + "index":"no" + }, + "Coordinates":{ + "type":"geo_point", + "include_in_all":false, + "index":"no" + }, + "LogType":{ + "type":"string", + "index":"not_analyzed" + }, + "Dataset-Metadata":{ + "type":"completion" + } } + } } \ No newline at end of file diff --git a/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java b/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java index 6b410f4..9bbc1cd 100644 --- a/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java +++ b/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java @@ -1,5 +1,15 @@ -/** +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.sdap.mudrod.services.eonet; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > full ingestion workflow and process broken in several places > ------------------------------------------------------------ > > Key: SDAP-71 > URL: https://issues.apache.org/jira/browse/SDAP-71 > Project: Apache Science Data Analytics Platform > Issue Type: Improvement > Reporter: Lewis John McGibbney > Priority: Major > > Amongst other things, the configuration overhaul broke several parts of the > full ingestion workflow. This has taken literally ages to debug however I now > have a patch at hand which fixes all issues I've encountered. > This has been an absolute blocker for me implementing the eventsView... I'll > now get on to that. > PR coming up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)