This is an automated email from the ASF dual-hosted git repository.
lewismc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-mudrod.git
The following commit(s) were added to refs/heads/master by this push:
new faac228 SDAP-71 full ingestion workflow and process broken in several
places (#16)
faac228 is described below
commit faac228e88ebdb9fbd8e134f8550d5dc16738a71
Author: Lewis John McGibbney <[email protected]>
AuthorDate: Mon May 14 02:58:53 2018 -0700
SDAP-71 full ingestion workflow and process broken in several places (#16)
---
.gitignore | 2 +
.../org/apache/sdap/mudrod/driver/ESDriver.java | 26 ++-
.../mudrod/integration/LinkageIntegration.java | 4 +-
.../apache/sdap/mudrod/main/MudrodConstants.java | 70 ++++----
.../sdap/mudrod/metadata/pre/ApiHarvester.java | 16 +-
.../sdap/mudrod/metadata/pre/MatrixGenerator.java | 7 +-
.../mudrod/metadata/process/MetadataAnalyzer.java | 3 +-
.../recommendation/pre/MetadataTFIDFGenerator.java | 12 +-
...treamImporter.java => ClickStreamImporter.java} | 35 ++--
.../apache/sdap/mudrod/utils/LinkageTriple.java | 10 +-
.../org/apache/sdap/mudrod/utils/MatrixUtil.java | 11 +-
.../sdap/mudrod/weblog/pre/CrawlerDetection.java | 37 +++--
.../sdap/mudrod/weblog/pre/HistoryGenerator.java | 43 +++--
.../apache/sdap/mudrod/weblog/pre/LogAbstract.java | 53 ++++--
.../sdap/mudrod/weblog/pre/SessionGenerator.java | 121 +++++++-------
.../sdap/mudrod/weblog/pre/SessionStatistic.java | 45 ++++--
.../mudrod/weblog/process/ClickStreamAnalyzer.java | 10 +-
.../mudrod/weblog/process/UserHistoryAnalyzer.java | 2 +-
core/src/main/resources/config.properties | 28 ++--
core/src/main/resources/elastic_mappings.json | 178 ++++++++-------------
.../services/eonet/EONETIngestionResource.java | 12 +-
21 files changed, 407 insertions(+), 318 deletions(-)
diff --git a/.gitignore b/.gitignore
index 3009cbf..a8ff594 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,3 +15,5 @@ core/.externalToolBuilders/Maven_Ant_Builder.launch
core/maven-eclipse.xml
service/.classpath
web/.classpath
+web/.externalToolBuilders/
+web/maven-eclipse.xml
diff --git a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
index 6d675fb..fc51557 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
@@ -177,15 +177,31 @@ public class ESDriver implements Serializable {
}
public void deleteAllByQuery(String index, String type, QueryBuilder query) {
- ImmutableOpenMap<String, MappingMetaData> mappings =
getClient().admin().cluster().prepareState().execute().actionGet()
- .getState().metaData().index(index).getMappings();
+ ImmutableOpenMap<String, MappingMetaData> mappings = getClient()
+ .admin()
+ .cluster()
+ .prepareState()
+ .execute()
+ .actionGet()
+ .getState()
+ .metaData()
+ .index(index)
+ .getMappings();
//check if the type exists
- if (!mappings.containsKey(type)) return;
+ if (!mappings.containsKey(type))
+ return;
createBulkProcessor();
- SearchResponse scrollResp =
getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new
TimeValue(60000)).setQuery(query).setSize(10000).execute()
- .actionGet();
+ SearchResponse scrollResp = getClient()
+ .prepareSearch(index)
+ .setSearchType(SearchType.QUERY_AND_FETCH)
+ .setTypes(type)
+ .setScroll(new TimeValue(60000))
+ .setQuery(query)
+ .setSize(10000)
+ .execute()
+ .actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
index 37e7508..a8cc723 100644
---
a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
+++
b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
@@ -171,7 +171,7 @@ public class LinkageIntegration extends
DiscoveryStepAbstract {
* the similarities from different sources
*/
public Map<String, List<LinkedTerm>>
aggregateRelatedTermsFromAllmodel(String input) {
- aggregateRelatedTerms(input, MudrodConstants.USE_HISTORY_LINKAGE_TYPE);
+ aggregateRelatedTerms(input, MudrodConstants.USER_HISTORY_LINKAGE_TYPE);
aggregateRelatedTerms(input, MudrodConstants.CLICK_STREAM_LINKAGE_TYPE);
aggregateRelatedTerms(input, MudrodConstants.METADATA_LINKAGE_TYPE);
aggregateRelatedTermsSWEET(input, MudrodConstants.ONTOLOGY_LINKAGE_TYPE);
@@ -180,7 +180,7 @@ public class LinkageIntegration extends
DiscoveryStepAbstract {
}
public int getModelweight(String model) {
- if (model.equals(MudrodConstants.USE_HISTORY_LINKAGE_TYPE)) {
+ if (model.equals(MudrodConstants.USER_HISTORY_LINKAGE_TYPE)) {
return
Integer.parseInt(props.getProperty(MudrodConstants.USER_HISTORY_W));
}
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
index 84ba347..2a43e4d 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
@@ -15,23 +15,22 @@ package org.apache.sdap.mudrod.main;
/**
* Class contains static constant keys and values relating to Mudrod
- * configuration properties. Property values are read from <a href=
- *
"https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>
+ * configuration properties. Property values are read from
<code>config.properties</code>.
*/
public interface MudrodConstants {
public static final String CLEANUP_TYPE = "cleanup.log";
- public static final String CLICK_STREAM_LINKAGE_TYPE =
"click.stream.linkage";
+ public static final String CLICK_STREAM_LINKAGE_TYPE =
"mudrod.clickstream.linkage";
- public static final String CLICK_STREAM_MATRIX_TYPE = "click.stream.matrix";
+ public static final String CLICK_STREAM_MATRIX_TYPE =
"mudrod.clickstream.matrix";
public static final String CLICKSTREAM_SVD_DIM = "mudrod.clickstream.svd.d";
public static final String CLICKSTREAM_W = "mudrod.clickstream.weight";
-
+
public static final String CLICKSTREAM_PATH = "mudrod.clickstream.path";
-
+
public static final String CLICKSTREAM_SVD_PATH =
"mudrod.clickstream.svd.path";
/** Defined on CLI */
@@ -52,25 +51,25 @@ public interface MudrodConstants {
public static final String FTP_PREFIX = "mudrod.ftp.prefix";
public static final String FTP_TYPE = "raw.ftp";
-
+
public static final String FTP_LOG = "ftp";
public static final String HTTP_PREFIX = "mudrod.http.prefix";
public static final String HTTP_TYPE = "raw.http";
-
+
public static final String HTTP_LOG = "http";
-
+
public static final String BASE_URL = "mudrod.base.url";
-
+
public static final String BLACK_LIST_REQUEST = "mudrod.black.request.list";
-
+
public static final String BLACK_LIST_AGENT = "mudrod.black.agent.list";
public static final String LOG_INDEX = "mudrod.log.index";
public static final String METADATA_LINKAGE_TYPE = "metadata.linkage";
-
+
public static final String METADATA_DOWNLOAD_URL =
"mudrod.metadata.download.url";
public static final String METADATA_SVD_DIM = "mudrod.metadata.svd.d";
@@ -93,38 +92,38 @@ public interface MudrodConstants {
public static final String ONTOLOGY_LINKAGE_TYPE = "ontology.linkage";
public static final String ONTOLOGY_W = "mudrod.ontology.weight";
-
+
public static final String ONTOLOGY_PATH = "mudrod.ontology.path";
-
+
public static final String ONTOLOGY_INPUT_PATH =
"mudrod.ontology.input.path";
/** Defined on CLI */
public static final String METADATA_DOWNLOAD = "mudrod.metadata.download";
-
+
public static final String RAW_METADATA_PATH = "mudrod.metadata.path";
public static final String RAW_METADATA_TYPE = "mudrod.metadata.type";
-
+
public static final String METADATA_MATRIX_PATH =
"mudrod.metadata.matrix.path";
-
+
public static final String METADATA_SVD_PATH = "mudrod.metadata.svd.path";
-
+
public static final String RECOM_METADATA_TYPE = "recommedation.metadata";
-
+
public static final String METADATA_ID = "mudrod.metadata.id";
-
+
public static final String SEMANTIC_FIELDS =
"mudrod.metadata.semantic.fields";
-
+
public static final String METADATA_WORD_SIM_TYPE = "metadata.word.sim";
-
+
public static final String METADATA_FEATURE_SIM_TYPE =
"metadata.feature.sim";
-
+
public static final String METADATA_SESSION_SIM_TYPE =
"metadata.session.sim";
-
+
public static final String METADATA_TERM_MATRIX_PATH =
"metadata.term.matrix.path";
-
+
public static final String METADATA_WORD_MATRIX_PATH =
"metadata.word.matrix.path";
-
+
public static final String METADATA_SESSION_MATRIX_PATH =
"metadata.session.matrix.path";
public static final String REQUEST_RATE = "mudrod.request.rate";
@@ -138,32 +137,29 @@ public interface MudrodConstants {
public static final String SPARK_APP_NAME = "mudrod.spark.app.name";
public static final String SPARK_MASTER = "mudrod.spark.master";
- /**
- * Absolute local location of javaSVMWithSGDModel directory. This is
typically
- *
<code>file:///usr/local/mudrod/core/src/main/resources/javaSVMWithSGDModel</code>
- */
+
public static final String RANKING_MODEL = "mudrod.ranking.model";
-
+
public static final String RANKING_ML = "mudrod.ranking.machine.learning";
public static final String REQUEST_TIME_GAP = "mudrod.request.time.gap";
public static final String TIME_SUFFIX = "time.suffix";
- public static final String USE_HISTORY_LINKAGE_TYPE = "user.history.linkage";
+ public static final String USER_HISTORY_LINKAGE_TYPE =
"mudrod.user.history.linkage";
public static final String USER_HISTORY_W = "mudrod.user.history.weight";
-
+
public static final String USER_HISTORY_PATH = "mudrod.user.history.path";
public static final String VIEW_F = "mudrod.view.freq";
-
+
public static final String VIEW_MARKER = "mudrod.view.url.marker";
-
+
public static final String SEARCH_MARKER = "mudrod.search.url.marker";
-
+
public static final String SEARCH_F = "mudrod.search.freq";
-
+
public static final String DOWNLOAD_F = "mudrod.download.freq";
}
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
index b74074a..f492ecf 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
@@ -32,7 +32,7 @@ import java.io.*;
import java.util.Properties;
/**
- * ClassName: ApiHarvester Function: Harvest metadata from PO.DAACweb service.
+ * Harvest metadata from PO.DAAC Webservices.
*/
public class ApiHarvester extends DiscoveryStepAbstract {
@@ -70,7 +70,7 @@ public class ApiHarvester extends DiscoveryStepAbstract {
}
/**
- * addMetadataMapping: Add mapping to index metadata in Elasticsearch. Please
+ * Add mapping to index metadata in Elasticsearch. Please
* invoke this method before import metadata to Elasticsearch.
*/
public void addMetadataMapping() {
@@ -84,7 +84,7 @@ public class ApiHarvester extends DiscoveryStepAbstract {
}
/**
- * importToES: Index metadata into elasticsearch from local file directory.
+ * Index metadata into elasticsearch from local file directory.
* Please make sure metadata have been harvest from web service before
* invoking this method.
*/
@@ -118,12 +118,12 @@ public class ApiHarvester extends DiscoveryStepAbstract {
}
/**
- * harvestMetadatafromWeb: Harvest metadata from PO.DAAC web service.
+ * Harvest metadata from PO.DAAC web service.
*/
private void harvestMetadatafromWeb() {
LOG.info("Metadata download started.");
int startIndex = 0;
- int doc_length = 0;
+ int docLength = 0;
JsonParser parser = new JsonParser();
do {
String searchAPI =
props.getProperty(MudrodConstants.METADATA_DOWNLOAD_URL);
@@ -135,7 +135,7 @@ public class ApiHarvester extends DiscoveryStepAbstract {
JsonObject responseObject = json.getAsJsonObject();
JsonArray docs =
responseObject.getAsJsonObject("response").getAsJsonArray("docs");
- doc_length = docs.size();
+ docLength = docs.size();
File file = new
File(props.getProperty(MudrodConstants.RAW_METADATA_PATH));
if (!file.exists()) {
@@ -145,7 +145,7 @@ public class ApiHarvester extends DiscoveryStepAbstract {
LOG.error("Failed to create directory!");
}
}
- for (int i = 0; i < doc_length; i++) {
+ for (int i = 0; i < docLength; i++) {
JsonElement item = docs.get(i);
int docId = startIndex + i;
File itemfile = new
File(props.getProperty(MudrodConstants.RAW_METADATA_PATH) + "/" + docId +
".json");
@@ -167,7 +167,7 @@ public class ApiHarvester extends DiscoveryStepAbstract {
Thread.currentThread().interrupt();
}
- } while (doc_length != 0);
+ } while (docLength != 0);
LOG.info("Metadata downloading finished");
}
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
index e4a6320..a8d3d22 100644
---
a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
+++
b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
@@ -64,7 +64,12 @@ public class MatrixGenerator extends DiscoveryStepAbstract {
String metadataMatrixFile =
props.getProperty(MudrodConstants.METADATA_MATRIX_PATH);
try {
MetadataExtractor extractor = new MetadataExtractor();
- JavaPairRDD<String, List<String>> metadataTermsRDD =
extractor.loadMetadata(this.es, this.spark.sc,
props.getProperty(MudrodConstants.ES_INDEX_NAME),
props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
+ JavaPairRDD<String, List<String>> metadataTermsRDD =
+ extractor.loadMetadata(
+ this.es,
+ this.spark.sc,
+ props.getProperty(MudrodConstants.ES_INDEX_NAME),
+ props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
LabeledRowMatrix wordDocMatrix =
MatrixUtil.createWordDocMatrix(metadataTermsRDD);
MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys,
wordDocMatrix.colkeys, metadataMatrixFile);
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
b/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
index 303eb52..7dc72c5 100644
---
a/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
+++
b/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
@@ -27,8 +27,7 @@ import java.util.List;
import java.util.Properties;
/**
- * ClassName: MetadataAnalyzer
- * Function: Calculate semantic relationship of vocabularies extracted from
+ * Calculate semantic relationship of vocabularies extracted from
* metadata.
*/
public class MetadataAnalyzer extends DiscoveryStepAbstract implements
Serializable {
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
index e17d2ce..3486607 100644
---
a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
+++
b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
@@ -81,7 +81,11 @@ public class MetadataTFIDFGenerator extends
DiscoveryStepAbstract {
LabeledRowMatrix wordtfidfMatrix = opt.tFIDFTokens(metadataWords, spark);
- MatrixUtil.exportToCSV(wordtfidfMatrix.rowMatrix, wordtfidfMatrix.rowkeys,
wordtfidfMatrix.colkeys,
props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH));
+ MatrixUtil.exportToCSV(
+ wordtfidfMatrix.rowMatrix,
+ wordtfidfMatrix.rowkeys,
+ wordtfidfMatrix.colkeys,
+ props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH));
return wordtfidfMatrix;
}
@@ -100,7 +104,11 @@ public class MetadataTFIDFGenerator extends
DiscoveryStepAbstract {
LabeledRowMatrix tokentfidfMatrix = opt.tFIDFTokens(metadataTokens, spark);
- MatrixUtil.exportToCSV(tokentfidfMatrix.rowMatrix,
tokentfidfMatrix.rowkeys, tokentfidfMatrix.colkeys,
props.getProperty(MudrodConstants.METADATA_TERM_MATRIX_PATH));
+ MatrixUtil.exportToCSV(
+ tokentfidfMatrix.rowMatrix,
+ tokentfidfMatrix.rowkeys,
+ tokentfidfMatrix.colkeys,
+ props.getProperty(MudrodConstants.METADATA_TERM_MATRIX_PATH));
return tokentfidfMatrix;
}
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java
b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java
similarity index 72%
rename from
core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java
rename to
core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java
index 9121719..546fae5 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java
@@ -32,13 +32,13 @@ import static
org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
* Supports ability to import click stream data into Elasticsearch
* through .csv file
*/
-public class ClickstreamImporter extends MudrodAbstract {
+public class ClickStreamImporter extends MudrodAbstract {
/**
*
*/
private static final long serialVersionUID = 1L;
- public ClickstreamImporter(Properties props, ESDriver es, SparkDriver spark)
{
+ public ClickStreamImporter(Properties props, ESDriver es, SparkDriver spark)
{
super(props, es, spark);
addClickStreamMapping();
}
@@ -47,20 +47,23 @@ public class ClickstreamImporter extends MudrodAbstract {
* Method to add Elasticsearch mapping for click stream data
*/
public void addClickStreamMapping() {
- XContentBuilder Mapping;
+ XContentBuilder mapping;
+ String clickStreamMatrixType =
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE);
try {
- Mapping = jsonBuilder().startObject().startObject(
-
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).startObject(
- "properties").startObject("query").field("type",
"string").field(
- "index",
"not_analyzed").endObject().startObject("dataID").field(
- "type", "string").field("index",
"not_analyzed").endObject()
-
- .endObject().endObject().endObject();
+ mapping = jsonBuilder()
+ .startObject()
+ .startObject(clickStreamMatrixType)
+ .startObject("properties")
+ .startObject("query").field("type",
"string").field("index", "not_analyzed").endObject()
+ .startObject("dataID").field("type",
"string").field("index", "not_analyzed").endObject()
+ .endObject()
+ .endObject()
+ .endObject();
es.getClient().admin().indices().preparePutMapping(
props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType(
-
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).setSource(
- Mapping).execute().actionGet();
+ clickStreamMatrixType).setSource(
+ mapping).execute().actionGet();
} catch (IOException e) {
e.printStackTrace();
}
@@ -70,8 +73,9 @@ public class ClickstreamImporter extends MudrodAbstract {
* Method to import click stream CSV into Elasticsearch
*/
public void importfromCSVtoES() {
- es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME),
- props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE));
+ String clickStreamMatrixType =
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE);
+ String esIndexName = props.getProperty(MudrodConstants.ES_INDEX_NAME);
+ es.deleteType(esIndexName, clickStreamMatrixType);
es.createBulkProcessor();
BufferedReader br = null;
@@ -86,8 +90,7 @@ public class ClickstreamImporter extends MudrodAbstract {
String[] clicks = line.split(cvsSplitBy);
for (int i = 1; i < clicks.length; i++) {
if (!"0.0".equals(clicks[i])) {
- IndexRequest ir = new
IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME),
-
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE))
+ IndexRequest ir = new IndexRequest(esIndexName,
clickStreamMatrixType)
.source(jsonBuilder().startObject().field("query",
clicks[0]).field(
"dataID", dataList[i]).field("clicks",
clicks[i]).endObject());
es.getBulkProcessor().add(ir);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java
b/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java
index a574041..1ebae17 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java
@@ -57,7 +57,6 @@ public class LinkageTriple implements Serializable {
public static DecimalFormat df = new DecimalFormat("#.00");
public LinkageTriple() {
- // TODO Auto-generated constructor stub
}
/**
@@ -96,8 +95,13 @@ public class LinkageTriple implements Serializable {
} else {
jsonBuilder.field("keywords", triple.keyA + "," + triple.keyB);
}
-
- jsonBuilder.field("weight",
Double.parseDouble(df.format(triple.weight)));
+ double tripleWeight = 0;
+ try {
+ tripleWeight = Double.parseDouble(df.format(triple.weight));
+ } catch (NumberFormatException e) {
+ // do nothing, triple weight is 0 as it cannot be parsed
+ }
+ jsonBuilder.field("weight", tripleWeight);
jsonBuilder.endObject();
IndexRequest ir = new IndexRequest(index, type).source(jsonBuilder);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
index 7eef272..ec27a29 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
@@ -457,8 +457,11 @@ public class MatrixUtil {
}
try {
file.createNewFile();
- FileWriter fw = new FileWriter(file.getAbsoluteFile());
- BufferedWriter bw = new BufferedWriter(fw);
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ try (FileWriter fw = new FileWriter(file.getAbsoluteFile());
+ BufferedWriter bw = new BufferedWriter(fw);){
String coltitle = " Num" + ",";
for (int j = 0; j < colnum; j++) {
coltitle += "\"" + colKeys.get(j) + "\",";
@@ -475,12 +478,8 @@ public class MatrixUtil {
row = row.substring(0, row.length() - 1);
bw.write(row + "\n");
}
-
- bw.close();
-
} catch (IOException e) {
e.printStackTrace();
-
}
}
}
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
index 704ccfd..2f37af9 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
@@ -31,7 +31,6 @@ import
org.elasticsearch.search.aggregations.AggregationBuilders;
import
org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.joda.time.DateTime;
import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormatter;
@@ -40,7 +39,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -98,7 +101,8 @@ public class CrawlerDetection extends LogAbstract {
public boolean checkKnownCrawler(String agent) {
String[] crawlers =
props.getProperty(MudrodConstants.BLACK_LIST_AGENT).split(",");
for (int i = 0; i < crawlers.length; i++) {
- if (agent.toLowerCase().contains(crawlers[i].trim())) return true;
+ if (agent.toLowerCase().contains(crawlers[i].trim()))
+ return true;
}
return false;
}
@@ -110,20 +114,20 @@ public class CrawlerDetection extends LogAbstract {
int userCount = 0;
userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>,
Integer>) iterator -> {
- ESDriver tmpES = new ESDriver(props);
- tmpES.createBulkProcessor();
+ ESDriver tmpEs = new ESDriver(props);
+ tmpEs.createBulkProcessor();
List<Integer> realUserNums = new ArrayList<>();
while (iterator.hasNext()) {
String s = iterator.next();
- Integer realUser = checkByRate(tmpES, s);
+ Integer realUser = checkByRate(tmpEs, s);
realUserNums.add(realUser);
}
- tmpES.destroyBulkProcessor();
- tmpES.close();
+ tmpEs.destroyBulkProcessor();
+ tmpEs.close();
return realUserNums.iterator();
}).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
- LOG.info("User count: {}", Integer.toString(userCount));
+ LOG.info("Final user count: {}", Integer.toString(userCount));
}
private int checkByRate(ESDriver es, String user) {
@@ -135,8 +139,19 @@ public class CrawlerDetection extends LogAbstract {
BoolQueryBuilder filterSearch = new BoolQueryBuilder();
filterSearch.must(QueryBuilders.termQuery("IP", user));
- AggregationBuilder aggregation =
AggregationBuilders.dateHistogram("by_minute").field("Time").dateHistogramInterval(DateHistogramInterval.MINUTE).order(Order.COUNT_DESC);
- SearchResponse checkRobot =
es.getClient().prepareSearch(logIndex).setTypes(httpType,
ftpType).setQuery(filterSearch).setSize(0).addAggregation(aggregation).execute().actionGet();
+ AggregationBuilder aggregation = AggregationBuilders
+ .dateHistogram("by_minute")
+ .field("Time")
+ .dateHistogramInterval(DateHistogramInterval.MINUTE)
+ .order(Order.COUNT_DESC);
+ SearchResponse checkRobot = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(httpType, ftpType)
+ .setQuery(filterSearch)
+ .setSize(0)
+ .addAggregation(aggregation)
+ .execute()
+ .actionGet();
Histogram agg = checkRobot.getAggregations().get("by_minute");
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
index 1969e3e..415f377 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
@@ -72,38 +72,54 @@ public class HistoryGenerator extends LogAbstract {
bw.write("Num" + ",");
// step 1: write first row of csv
- List<String> logIndexList =
es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+ List<String> logIndexList = es.getIndexListWithPrefix(
+ props.getProperty(MudrodConstants.LOG_INDEX));
String[] logIndices = logIndexList.toArray(new String[0]);
String[] statictypeArray = new String[] { this.sessionStats };
int docCount = es.getDocCount(logIndices, statictypeArray);
-
- LOG.info("{}: {}", this.sessionStats, docCount);
- if (docCount==0)
- {
+ LOG.info("{}: {}", this.sessionStats, docCount);
+
+ if (docCount==0) {
bw.close();
file.delete();
return;
}
- SearchResponse sr =
es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-
.addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet();
+ SearchResponse sr = es.getClient()
+ .prepareSearch(logIndices)
+ .setTypes(statictypeArray)
+ .setQuery(QueryBuilders.matchAllQuery())
+ .setSize(0)
+ .addAggregation(AggregationBuilders.terms("IPs")
+ .field("IP")
+ .size(docCount))
+ .execute()
+ .actionGet();
Terms ips = sr.getAggregations().get("IPs");
List<String> ipList = new ArrayList<>();
for (Terms.Bucket entry : ips.getBuckets()) {
- if (entry.getDocCount() >
Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) { // filter
- // out
- // less
- // active users/ips
+ // filter
+ if (entry.getDocCount() >
Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) {
+ // out less active users/ips
ipList.add(entry.getKey().toString());
}
}
bw.write(String.join(",", ipList) + "\n");
// step 2: step the rest rows of csv
- SearchRequestBuilder sr2Builder =
es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-
.addAggregation(AggregationBuilders.terms("KeywordAgg").field("keywords").size(docCount).subAggregation(AggregationBuilders.terms("IPAgg").field("IP").size(docCount)));
+ SearchRequestBuilder sr2Builder = es.getClient()
+ .prepareSearch(logIndices)
+ .setTypes(statictypeArray)
+ .setQuery(QueryBuilders.matchAllQuery())
+ .setSize(0)
+ .addAggregation(AggregationBuilders.terms("KeywordAgg")
+ .field("keywords")
+ .size(docCount)
+ .subAggregation(AggregationBuilders.terms("IPAgg")
+ .field("IP")
+ .size(docCount)));
SearchResponse sr2 = sr2Builder.execute().actionGet();
Terms keywords = sr2.getAggregations().get("KeywordAgg");
@@ -132,6 +148,7 @@ public class HistoryGenerator extends LogAbstract {
}
bw.close();
+ fw.close();
} catch (IOException e) {
e.printStackTrace();
}
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
index f3a7f6d..1d04902 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
@@ -53,7 +53,8 @@ public class LogAbstract extends DiscoveryStepAbstract {
}
protected void initLogIndex() {
- logIndex = props.getProperty(MudrodConstants.LOG_INDEX) +
props.getProperty(MudrodConstants.TIME_SUFFIX);
+ logIndex = props.getProperty(MudrodConstants.LOG_INDEX)
+ + props.getProperty(MudrodConstants.TIME_SUFFIX);
httpType = MudrodConstants.HTTP_TYPE;
ftpType = MudrodConstants.FTP_TYPE;
cleanupType = MudrodConstants.CLEANUP_TYPE;
@@ -116,8 +117,16 @@ public class LogAbstract extends DiscoveryStepAbstract {
int docCount = es.getDocCount(logIndex, type);
- SearchResponse sr =
es.getClient().prepareSearch(logIndex).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-
.addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount)).execute().actionGet();
+ SearchResponse sr = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(type)
+ .setQuery(QueryBuilders.matchAllQuery())
+ .setSize(0)
+ .addAggregation(AggregationBuilders.terms("Users")
+ .field("IP")
+ .size(docCount))
+ .execute()
+ .actionGet();
return sr.getAggregations().get("Users");
}
@@ -138,10 +147,23 @@ public class LogAbstract extends DiscoveryStepAbstract {
int docCount = es.getDocCount(logIndex, httpType);
- AggregationBuilder dailyAgg =
AggregationBuilders.dateHistogram("by_day").field("Time").dateHistogramInterval(DateHistogramInterval.DAY).order(Order.COUNT_DESC);
-
- SearchResponse sr =
es.getClient().prepareSearch(logIndex).setTypes(httpType).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-
.addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount).subAggregation(dailyAgg)).execute().actionGet();
+ AggregationBuilder dailyAgg = AggregationBuilders
+ .dateHistogram("by_day")
+ .field("Time")
+ .dateHistogramInterval(DateHistogramInterval.DAY)
+ .order(Order.COUNT_DESC);
+
+ SearchResponse sr = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(httpType)
+ .setQuery(QueryBuilders.matchAllQuery())
+ .setSize(0)
+ .addAggregation(AggregationBuilders.terms("Users")
+ .field("IP")
+ .size(docCount)
+ .subAggregation(dailyAgg))
+ .execute()
+ .actionGet();
Terms users = sr.getAggregations().get("Users");
Map<String, Long> userList = new HashMap<>();
for (Terms.Bucket user : users.getBuckets()) {
@@ -201,11 +223,16 @@ public class LogAbstract extends DiscoveryStepAbstract {
int docCount = es.getDocCount(this.logIndex, this.cleanupType);
- SearchResponse sr =
es.getClient().prepareSearch(this.logIndex).setTypes(this.cleanupType).setQuery(QueryBuilders.matchAllQuery())
-
.addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount)).execute().actionGet();
-
- Terms Sessions = sr.getAggregations().get("Sessions");
- return Sessions;
+ SearchResponse sr = es.getClient()
+ .prepareSearch(this.logIndex)
+ .setTypes(this.cleanupType)
+ .setQuery(QueryBuilders.matchAllQuery())
+ .addAggregation(AggregationBuilders.terms("Sessions")
+ .field("SessionID")
+ .size(docCount))
+ .execute()
+ .actionGet();
+ return sr.getAggregations().get("Sessions");
}
public List<String> getSessions() {
@@ -213,7 +240,7 @@ public class LogAbstract extends DiscoveryStepAbstract {
Terms sessions = this.getSessionTerms();
List<String> sessionList = new ArrayList<>();
for (Terms.Bucket entry : sessions.getBuckets()) {
- if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) {
+ if (entry.getDocCount() >= 3 && !"invalid".equals(entry.getKey())) {
String session = (String) entry.getKey();
sessionList.add(session);
}
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
index 4ce1535..3e8ffa7 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
@@ -93,50 +93,15 @@ public class SessionGenerator extends LogAbstract {
}
public void genSessionByReferer(int timeThres) throws InterruptedException,
IOException {
-
- genSessionByRefererInParallel(timeThres);
-
+ genSessionByRefererInParallel(timeThres);
}
public void combineShortSessions(int timeThres) throws InterruptedException,
IOException {
-
- combineShortSessionsInParallel(timeThres);
-
+ combineShortSessionsInParallel(timeThres);
}
/**
- * Method to generate session by time threshold and referrer
- *
- * @param timeThres value of time threshold (s)
- * @throws ElasticsearchException ElasticsearchException
- * @throws IOException IOException
- */
- public void genSessionByRefererInSequential(int timeThres) throws
ElasticsearchException, IOException {
-
- Terms users = this.getUserTerms(this.cleanupType);
-
- int sessionCount = 0;
- for (Terms.Bucket entry : users.getBuckets()) {
-
- String user = (String) entry.getKey();
- Integer sessionNum = genSessionByReferer(es, user, timeThres);
- sessionCount += sessionNum;
- }
-
- LOG.info("Initial session count: {}", Integer.toString(sessionCount));
- }
-
- public void combineShortSessionsInSequential(int timeThres) throws
ElasticsearchException, IOException {
-
- Terms users = this.getUserTerms(this.cleanupType);
- for (Terms.Bucket entry : users.getBuckets()) {
- String user = entry.getKey().toString();
- combineShortSessions(es, user, timeThres);
- }
- }
-
- /**
- * Method to remove invalid logs through IP address
+ * Remove invalid logs through IP address
*
* @param es an instantiated es driver
* @param ip invalid IP address
@@ -148,13 +113,24 @@ public class SessionGenerator extends LogAbstract {
BoolQueryBuilder filterAll = new BoolQueryBuilder();
filterAll.must(QueryBuilders.termQuery("IP", ip));
- SearchResponse scrollResp =
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new
TimeValue(60000)).setQuery(filterAll).setSize(100).execute().actionGet();
+ SearchResponse scrollResp = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(this.cleanupType)
+ .setScroll(new TimeValue(60000))
+ .setQuery(filterAll)
+ .setSize(100)
+ .execute()
+ .actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
update(es, logIndex, cleanupType, hit.getId(), "SessionID", "invalid");
}
- scrollResp =
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new
TimeValue(600000)).execute().actionGet();
+ scrollResp = es.getClient()
+ .prepareSearchScroll(scrollResp.getScrollId())
+ .setScroll(new TimeValue(600000))
+ .execute()
+ .actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
@@ -174,14 +150,14 @@ public class SessionGenerator extends LogAbstract {
* @throws IOException
*/
private void update(ESDriver es, String index, String type, String id,
String field1, Object value1) throws IOException {
- UpdateRequest ur = new UpdateRequest(index, type,
id).doc(jsonBuilder().startObject().field(field1, value1).endObject());
+ UpdateRequest ur = new UpdateRequest(index, type, id)
+ .doc(jsonBuilder().startObject().field(field1,
value1).endObject());
es.getBulkProcessor().add(ur);
}
public void genSessionByRefererInParallel(int timeThres) throws
InterruptedException, IOException {
JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
-
int sessionCount = 0;
sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>,
Integer>() {
/**
@@ -226,17 +202,24 @@ public class SessionGenerator extends LogAbstract {
BoolQueryBuilder filterSearch = new BoolQueryBuilder();
filterSearch.must(QueryBuilders.termQuery("IP", user));
- SearchResponse scrollResp =
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new
TimeValue(60000)).setQuery(filterSearch).addSort("Time",
SortOrder.ASC).setSize(100)
- .execute().actionGet();
+ SearchResponse scrollResp = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(this.cleanupType)
+ .setScroll(new TimeValue(60000))
+ .setQuery(filterSearch)
+ .addSort("Time", SortOrder.ASC)
+ .setSize(100)
+ .execute()
+ .actionGet();
Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>();
- String request = "";
- String referer = "";
- String logType = "";
- String id = "";
+ String request;
+ String referer;
+ String logType;
+ String id;
String ip = user;
String indexUrl = props.getProperty(MudrodConstants.BASE_URL) + "/";
- DateTime time = null;
+ DateTime time;
DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
while (scrollResp.getHits().getHits().length != 0) {
@@ -354,6 +337,7 @@ public class SessionGenerator extends LogAbstract {
tmpES.close();
}
});
+ LOG.info("Final Session count (after combining short sessions): {}",
Long.toString(userRDD.count()));
}
public void combineShortSessions(ESDriver es, String user, int timeThres)
throws ElasticsearchException, IOException {
@@ -372,7 +356,14 @@ public class SessionGenerator extends LogAbstract {
BoolQueryBuilder filterCheck = new BoolQueryBuilder();
filterCheck.must(QueryBuilders.termQuery("IP",
user)).must(QueryBuilders.termQuery("Referer", "-"));
- SearchResponse checkReferer =
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new
TimeValue(60000)).setQuery(filterCheck).setSize(0).execute().actionGet();
+ SearchResponse checkReferer = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(this.cleanupType)
+ .setScroll(new TimeValue(60000))
+ .setQuery(filterCheck)
+ .setSize(0)
+ .execute()
+ .actionGet();
long numInvalid = checkReferer.getHits().getTotalHits();
double invalidRate = numInvalid / docCount;
@@ -383,8 +374,17 @@ public class SessionGenerator extends LogAbstract {
}
StatsAggregationBuilder statsAgg =
AggregationBuilders.stats("Stats").field("Time");
- SearchResponse srSession =
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new
TimeValue(60000)).setQuery(filterSearch)
-
.addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount).subAggregation(statsAgg)).execute().actionGet();
+ SearchResponse srSession = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(this.cleanupType)
+ .setScroll(new TimeValue(60000))
+ .setQuery(filterSearch)
+ .addAggregation(AggregationBuilders.terms("Sessions")
+ .field("SessionID")
+ .size(docCount)
+ .subAggregation(statsAgg))
+ .execute()
+ .actionGet();
Terms sessions = srSession.getAggregations().get("Sessions");
@@ -400,7 +400,7 @@ public class SessionGenerator extends LogAbstract {
String last = null;
String lastnewID = null;
String lastoldID = null;
- String current = null;
+ String current;
for (Session s : sessionList) {
current = s.getEndTime();
if (last != null) {
@@ -413,7 +413,14 @@ public class SessionGenerator extends LogAbstract {
QueryBuilder fs =
QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID",
s.getID()));
- SearchResponse scrollResp =
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new
TimeValue(60000)).setQuery(fs).setSize(100).execute().actionGet();
+ SearchResponse scrollResp = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(this.cleanupType)
+ .setScroll(new TimeValue(60000))
+ .setQuery(fs)
+ .setSize(100)
+ .execute()
+ .actionGet();
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
if (lastnewID == null) {
@@ -423,7 +430,11 @@ public class SessionGenerator extends LogAbstract {
}
}
- scrollResp =
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new
TimeValue(600000)).execute().actionGet();
+ scrollResp = es.getClient()
+ .prepareSearchScroll(scrollResp.getScrollId())
+ .setScroll(new TimeValue(600000))
+ .execute()
+ .actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
index 981bece..e99af06 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
@@ -27,7 +27,6 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
import
org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
import org.joda.time.DateTime;
@@ -38,7 +37,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -81,7 +85,7 @@ public class SessionStatistic extends LogAbstract {
}
public void processSession() throws InterruptedException, IOException,
ExecutionException {
- processSessionInParallel();
+ processSessionInParallel();
}
/**
@@ -160,7 +164,13 @@ public class SessionStatistic extends LogAbstract {
BoolQueryBuilder filterSearch = new BoolQueryBuilder();
filterSearch.must(QueryBuilders.termQuery("SessionID", sessionId));
- SearchResponse sr =
es.getClient().prepareSearch(logIndex).setTypes(inputType).setQuery(filterSearch).addAggregation(statsAgg).execute().actionGet();
+ SearchResponse sr = es.getClient()
+ .prepareSearch(logIndex)
+ .setTypes(inputType)
+ .setQuery(filterSearch)
+ .addAggregation(statsAgg)
+ .execute()
+ .actionGet();
Stats agg = sr.getAggregations().get("Stats");
min = agg.getMinAsString();
@@ -231,9 +241,9 @@ public class SessionStatistic extends LogAbstract {
String view = findDataset(request);
if ("".equals(views))
views = view;
- else if (!views.contains(view))
- views = views + "," + view;
- }
+ else if (!views.contains(view))
+ views = views + "," + view;
+ }
}
if (MudrodConstants.FTP_LOG.equals(logType)) {
ftpRequestCount++;
@@ -257,7 +267,11 @@ public class SessionStatistic extends LogAbstract {
}
- scrollResp =
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new
TimeValue(600000)).execute().actionGet();
+ scrollResp = es.getClient()
+ .prepareSearchScroll(scrollResp.getScrollId())
+ .setScroll(new TimeValue(600000))
+ .execute()
+ .actionGet();
// Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
break;
@@ -269,12 +283,17 @@ public class SessionStatistic extends LogAbstract {
}
if (searchDataListRequestCount != 0 &&
- searchDataListRequestCount <=
Integer.parseInt(props.getProperty(MudrodConstants.SEARCH_F)) &&
- searchDataRequestCount != 0 &&
- searchDataRequestCount <=
Integer.parseInt(props.getProperty(MudrodConstants.VIEW_F)) &&
- ftpRequestCount <=
Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_F)))
+ searchDataListRequestCount <=
Integer.parseInt(props.getProperty(MudrodConstants.SEARCH_F)) &&
+ searchDataRequestCount != 0 &&
+ searchDataRequestCount <=
Integer.parseInt(props.getProperty(MudrodConstants.VIEW_F)) &&
+ ftpRequestCount <=
Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_F)))
{
- String sessionURL = props.getProperty(MudrodConstants.SESSION_PORT) +
props.getProperty(MudrodConstants.SESSION_URL) + "?sessionid=" + sessionId +
"&sessionType=" + outputType + "&requestType=" + inputType;
+ String sessionURL = props.getProperty(
+ MudrodConstants.SESSION_PORT)
+ + props.getProperty(MudrodConstants.SESSION_URL)
+ + "?sessionid=" + sessionId
+ + "&sessionType=" + outputType
+ + "&requestType=" + inputType;
sessionCount = 1;
IndexRequest ir = new IndexRequest(logIndex, outputType).source(
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
index 70c4067..193115e 100644
---
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
+++
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
@@ -18,7 +18,7 @@ import org.apache.sdap.mudrod.driver.ESDriver;
import org.apache.sdap.mudrod.driver.SparkDriver;
import org.apache.sdap.mudrod.main.MudrodConstants;
import org.apache.sdap.mudrod.semantics.SVDAnalyzer;
-import org.apache.sdap.mudrod.ssearch.ClickstreamImporter;
+import org.apache.sdap.mudrod.ssearch.ClickStreamImporter;
import org.apache.sdap.mudrod.utils.LinkageTriple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,18 +51,18 @@ public class ClickStreamAnalyzer extends
DiscoveryStepAbstract {
LOG.info("Starting ClickStreamAnalyzer...");
startTime = System.currentTimeMillis();
try {
- String clickstream_matrixFile =
props.getProperty(MudrodConstants.CLICKSTREAM_PATH);
- File f = new File(clickstream_matrixFile);
+ String clickstreamMatrixFile =
props.getProperty(MudrodConstants.CLICKSTREAM_PATH);
+ File f = new File(clickstreamMatrixFile);
if (f.exists()) {
SVDAnalyzer svd = new SVDAnalyzer(props, es, spark);
- svd.getSVDMatrix(clickstream_matrixFile,
+ svd.getSVDMatrix(clickstreamMatrixFile,
Integer.parseInt(props.getProperty(MudrodConstants.CLICKSTREAM_SVD_DIM)),
props.getProperty(MudrodConstants.CLICKSTREAM_SVD_PATH));
List<LinkageTriple> tripleList =
svd.calTermSimfromMatrix(props.getProperty(MudrodConstants.CLICKSTREAM_SVD_PATH));
svd.saveToES(tripleList,
props.getProperty(MudrodConstants.ES_INDEX_NAME),
MudrodConstants.CLICK_STREAM_LINKAGE_TYPE);
// Store click stream in ES for the ranking use
- ClickstreamImporter cs = new ClickstreamImporter(props, es, spark);
+ ClickStreamImporter cs = new ClickStreamImporter(props, es, spark);
cs.importfromCSVtoES();
}
} catch (Exception e) {
diff --git
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
index c4c5da4..cb1e12f 100644
---
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
+++
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
@@ -50,7 +50,7 @@ public class UserHistoryAnalyzer extends
DiscoveryStepAbstract {
SemanticAnalyzer sa = new SemanticAnalyzer(props, es, spark);
List<LinkageTriple> tripleList =
sa.calTermSimfromMatrix(props.getProperty(MudrodConstants.USER_HISTORY_PATH));
- sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME),
props.getProperty(MudrodConstants.USE_HISTORY_LINKAGE_TYPE));
+ sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME),
props.getProperty(MudrodConstants.USER_HISTORY_LINKAGE_TYPE));
endTime = System.currentTimeMillis();
es.refreshIndex();
diff --git a/core/src/main/resources/config.properties
b/core/src/main/resources/config.properties
index 495d29c..5ed2504 100644
--- a/core/src/main/resources/config.properties
+++ b/core/src/main/resources/config.properties
@@ -16,14 +16,14 @@ mudrod.es.transport.tcp.port = 9300
mudrod.es.unicast.hosts = 127.0.0.1
mudrod.es.http.port = 9200
mudrod.es.index = mudrod
-
+
# Spark related
# Log processing type. Possible values include 'sequential' or 'parallel'
mudrod.processing.type = parallel
mudrod.spark.app.name = MudrodSparkApp
mudrod.spark.master = local[4]
mudrod.spark.optimize = repartition
-
+
# Web log processing configuration
# index name has to be all lowercase
mudrod.log.index = log
@@ -38,37 +38,43 @@ mudrod.download.freq = 100
mudrod.request.rate = 30
mudrod.session.port = 8080
mudrod.session.url = /mudrod-service/session.html
-mudrod.request.time.gap = 600
+mudrod.request.time.gap = 600
mudrod.view.url.marker = /dataset/
mudrod.search.url.marker = /datasetlist?
-# In order to better parse a URL (getting searching keyword, etc.), please
consider custimize
+# In order to better parse a URL (getting searching keyword, etc.), please
consider customizing the
# org.apache.sdap.mudrod.weblog.structure.RequestUrl - GetSearchInfo,
getFilterInfo
-
+
# User search history
mudrod.query.min = 0
mudrod.user.history.weight = 2
-
+mudrod.user.history.linkage = UserHistoryLinkage
+mudrod.user.history.path = userHistoryMatrix
+
# clickstream
mudrod.download.weight = 3
mudrod.clickstream.svd.d = 50
mudrod.clickstream.weight = 2
-
+mudrod.clickstream.matrix = clickStreamMatrix
+mudrod.clickstream.linkage = ClickStreamLinkage
+mudrod.clickstream.path = clickStreamMatrix
+mudrod.clickstream.svd.path = clickStreamMatrixSVD
+
# metadata
-mudrod.metadata.download = 0
+mudrod.metadata.download = 1
mudrod.metadata.download.url =
https://podaac.jpl.nasa.gov/api/dataset?startIndex=$startIndex&entries=10&sortField=Dataset-AllTimePopularity&sortOrder=asc&id=&value=&search=
mudrod.metadata.svd.d = 50
mudrod.metadata.url = null
mudrod.metadata.weight = 1
mudrod.metadata.type = RawMetadata
-
+
# ranking, ${svmSgdModel.value} is resolved at build time. See the property in
core/pom.xml for the value
mudrod.ranking.machine.learning = 1
mudrod.ranking.model = ${svmSgdModel.value}.zip
-
+
# recommendation
mudrod.metadata.id = Dataset-ShortName
mudrod.metadata.semantic.fields =
DatasetParameter-Term,DatasetParameter-Variable,Dataset-ExtractTerm
# ontology service implementation. Possible values include EsipPortal -
EsipPortalOntology EsipCOR - EsipCOROntology Local -
org.apache.sdap.mudrod.ontology.process.Local
mudrod.ontology.implementation = Local
-mudrod.ontology.weight = 2
+mudrod.ontology.weight = 2
\ No newline at end of file
diff --git a/core/src/main/resources/elastic_mappings.json
b/core/src/main/resources/elastic_mappings.json
index ddb1952..3730b87 100644
--- a/core/src/main/resources/elastic_mappings.json
+++ b/core/src/main/resources/elastic_mappings.json
@@ -1,116 +1,68 @@
{
- "mappings": {
- "doc": {
- "dynamic_templates": [
- {
- "geo_point_mapping": {
- "match_mapping_type": "nested",
- "match": "Point",
- "mapping": {
- "type": "geo_point"
- }
- },
- "geo_shape_mapping": {
- "match_mapping_type": "nested",
- "match": "Polygon",
- "mapping": {
- "type": "geo_shape"
- }
- }
- }
- ]
- }
- },
- "eonet_event": {
- "properties": {
- "sources": {
- "type": "nested",
- "fields": {
- "id": {
- "ignore_above": 256,
- "type": "keyword"
- },
- "url": {
- "ignore_above": 256,
- "type": "keyword"
- }
- }
- },
- "geometries": {
- "type": "nested",
- "fields": {
- "date": {
- "ignore_above": 256,
- "type": "date"
- },
- "type": {
- "ignore_above": 256,
- "type": "keyword"
- },
- "coordinates": {
- "ignore_above": 256,
- "dynamic": true
- }
- }
- },
- "link": {
- "type": "text",
- "fields": {
- "keyword": {
- "ignore_above": 256,
- "type": "keyword"
- }
- }
- },
- "closed": {
- "type": "text",
- "fields": {
- "keyword": {
- "ignore_above": 256,
- "type": "keyword"
- }
- }
- },
- "description": {
- "type": "text",
- "fields": {
- "keyword": {
- "ignore_above": 256,
- "type": "keyword"
- }
- }
- },
- "categories": {
- "type": "nested",
- "fields": {
- "id": {
- "ignore_above": 256,
- "type": "integer"
- },
- "title": {
- "ignore_above": 256,
- "type": "keyword"
- }
- }
- },
- "id": {
- "type": "text",
- "fields": {
- "keyword": {
- "ignore_above": 256,
- "type": "keyword"
- }
- }
- },
- "title": {
- "type": "completion",
- "fields": {
- "keyword": {
- "ignore_above": 256,
- "type": "keyword"
- }
- }
- }
- }
+ "_default_":{
+ "properties":{
+ "keywords":{
+ "type":"text",
+ "analyzer":"csv",
+ "fielddata":true
+ },
+ "views":{
+ "type":"string",
+ "analyzer":"csv"
+ },
+ "downloads":{
+ "type":"string",
+ "analyzer":"csv"
+ },
+ "RequestUrl":{
+ "type":"string",
+ "include_in_all":false,
+ "index":"no"
+ },
+ "IP":{
+ "type":"keyword",
+ "index":"not_analyzed"
+ },
+ "Browser":{
+ "type":"string",
+ "include_in_all":false,
+ "index":"no"
+ },
+ "SessionURL":{
+ "type":"string",
+ "include_in_all":false,
+ "index":"no"
+ },
+ "Referer":{
+ "type":"string",
+ "index":"not_analyzed"
+ },
+ "SessionID":{
+ "type":"string",
+ "index":"not_analyzed"
+ },
+ "Response":{
+ "type":"string",
+ "include_in_all":false,
+ "index":"no"
+ },
+ "Request":{
+ "type":"string",
+ "include_in_all":false,
+ "index":"no"
+ },
+ "Coordinates":{
+ "type":"geo_point",
+ "include_in_all":false,
+ "index":"no"
+ },
+ "LogType":{
+ "type":"string",
+ "index":"not_analyzed"
+ },
+ "Dataset-Metadata":{
+ "type":"completion"
+ }
}
+ }
}
\ No newline at end of file
diff --git
a/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java
b/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java
index 6b410f4..9bbc1cd 100644
---
a/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java
+++
b/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java
@@ -1,5 +1,15 @@
-/**
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.sdap.mudrod.services.eonet;
--
To stop receiving notification emails like this one, please contact
[email protected].