[ 
https://issues.apache.org/jira/browse/SDAP-71?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473993#comment-16473993
 ] 

ASF GitHub Bot commented on SDAP-71:
------------------------------------

lewismc closed pull request #16: SDAP-71 full ingestion workflow and process 
broken in several places
URL: https://github.com/apache/incubator-sdap-mudrod/pull/16
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 3009cbf..a8ff594 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,3 +15,5 @@ core/.externalToolBuilders/Maven_Ant_Builder.launch
 core/maven-eclipse.xml
 service/.classpath
 web/.classpath
+web/.externalToolBuilders/
+web/maven-eclipse.xml
diff --git a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java 
b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
index 6d675fb..fc51557 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
@@ -177,15 +177,31 @@ public String customAnalyzing(String indexName, String 
analyzer, String str) thr
   }
 
   public void deleteAllByQuery(String index, String type, QueryBuilder query) {
-    ImmutableOpenMap<String, MappingMetaData> mappings = 
getClient().admin().cluster().prepareState().execute().actionGet()
-        .getState().metaData().index(index).getMappings();
+    ImmutableOpenMap<String, MappingMetaData> mappings = getClient()
+            .admin()
+            .cluster()
+            .prepareState()
+            .execute()
+            .actionGet()
+            .getState()
+            .metaData()
+            .index(index)
+            .getMappings();
     
     //check if the type exists
-    if (!mappings.containsKey(type)) return;
+    if (!mappings.containsKey(type))
+      return;
     
     createBulkProcessor();
-    SearchResponse scrollResp = 
getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new
 TimeValue(60000)).setQuery(query).setSize(10000).execute()
-        .actionGet();
+    SearchResponse scrollResp = getClient()
+            .prepareSearch(index)
+            .setSearchType(SearchType.QUERY_AND_FETCH)
+            .setTypes(type)
+            .setScroll(new TimeValue(60000))
+            .setQuery(query)
+            .setSize(10000)
+            .execute()
+            .actionGet();
 
     while (true) {
       for (SearchHit hit : scrollResp.getHits().getHits()) {
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java 
b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
index 37e7508..a8cc723 100644
--- 
a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
@@ -171,7 +171,7 @@ public JsonObject getIngeratedListInJson(String input) {
    * the similarities from different sources
    */
   public Map<String, List<LinkedTerm>> 
aggregateRelatedTermsFromAllmodel(String input) {
-    aggregateRelatedTerms(input, MudrodConstants.USE_HISTORY_LINKAGE_TYPE);
+    aggregateRelatedTerms(input, MudrodConstants.USER_HISTORY_LINKAGE_TYPE);
     aggregateRelatedTerms(input, MudrodConstants.CLICK_STREAM_LINKAGE_TYPE);
     aggregateRelatedTerms(input, MudrodConstants.METADATA_LINKAGE_TYPE);
     aggregateRelatedTermsSWEET(input, MudrodConstants.ONTOLOGY_LINKAGE_TYPE);
@@ -180,7 +180,7 @@ public JsonObject getIngeratedListInJson(String input) {
   }
 
   public int getModelweight(String model) {
-    if (model.equals(MudrodConstants.USE_HISTORY_LINKAGE_TYPE)) {
+    if (model.equals(MudrodConstants.USER_HISTORY_LINKAGE_TYPE)) {
       return 
Integer.parseInt(props.getProperty(MudrodConstants.USER_HISTORY_W));
     }
 
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java 
b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
index 84ba347..2a43e4d 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
@@ -15,23 +15,22 @@
 
 /**
  * Class contains static constant keys and values relating to Mudrod
- * configuration properties. Property values are read from <a href=
- * 
"https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml";>config.xml</a>
+ * configuration properties. Property values are read from 
<code>config.properties</code>.
  */
 public interface MudrodConstants {
 
   public static final String CLEANUP_TYPE = "cleanup.log";
 
-  public static final String CLICK_STREAM_LINKAGE_TYPE = 
"click.stream.linkage";
+  public static final String CLICK_STREAM_LINKAGE_TYPE = 
"mudrod.clickstream.linkage";
 
-  public static final String CLICK_STREAM_MATRIX_TYPE = "click.stream.matrix";
+  public static final String CLICK_STREAM_MATRIX_TYPE = 
"mudrod.clickstream.matrix";
 
   public static final String CLICKSTREAM_SVD_DIM = "mudrod.clickstream.svd.d";
 
   public static final String CLICKSTREAM_W = "mudrod.clickstream.weight";
-  
+
   public static final String CLICKSTREAM_PATH = "mudrod.clickstream.path";
-  
+
   public static final String CLICKSTREAM_SVD_PATH = 
"mudrod.clickstream.svd.path";
 
   /** Defined on CLI */
@@ -52,25 +51,25 @@
   public static final String FTP_PREFIX = "mudrod.ftp.prefix";
 
   public static final String FTP_TYPE = "raw.ftp";
-  
+
   public static final String FTP_LOG = "ftp";
 
   public static final String HTTP_PREFIX = "mudrod.http.prefix";
 
   public static final String HTTP_TYPE = "raw.http";
-  
+
   public static final String HTTP_LOG = "http";
-  
+
   public static final String BASE_URL = "mudrod.base.url";
-  
+
   public static final String BLACK_LIST_REQUEST = "mudrod.black.request.list";
-  
+
   public static final String BLACK_LIST_AGENT = "mudrod.black.agent.list";
 
   public static final String LOG_INDEX = "mudrod.log.index";
 
   public static final String METADATA_LINKAGE_TYPE = "metadata.linkage";
-  
+
   public static final String METADATA_DOWNLOAD_URL = 
"mudrod.metadata.download.url";
 
   public static final String METADATA_SVD_DIM = "mudrod.metadata.svd.d";
@@ -93,38 +92,38 @@
   public static final String ONTOLOGY_LINKAGE_TYPE = "ontology.linkage";
 
   public static final String ONTOLOGY_W = "mudrod.ontology.weight";
-  
+
   public static final String ONTOLOGY_PATH = "mudrod.ontology.path";
-  
+
   public static final String ONTOLOGY_INPUT_PATH = 
"mudrod.ontology.input.path";
 
   /** Defined on CLI */
   public static final String METADATA_DOWNLOAD = "mudrod.metadata.download";
-  
+
   public static final String RAW_METADATA_PATH = "mudrod.metadata.path";
 
   public static final String RAW_METADATA_TYPE = "mudrod.metadata.type";
-  
+
   public static final String METADATA_MATRIX_PATH = 
"mudrod.metadata.matrix.path";
-  
+
   public static final String METADATA_SVD_PATH = "mudrod.metadata.svd.path";
-  
+
   public static final String RECOM_METADATA_TYPE = "recommedation.metadata";
-  
+
   public static final String METADATA_ID = "mudrod.metadata.id";
-  
+
   public static final String SEMANTIC_FIELDS = 
"mudrod.metadata.semantic.fields";
-  
+
   public static final String METADATA_WORD_SIM_TYPE = "metadata.word.sim";
-  
+
   public static final String METADATA_FEATURE_SIM_TYPE = 
"metadata.feature.sim";
-  
+
   public static final String METADATA_SESSION_SIM_TYPE = 
"metadata.session.sim";
-  
+
   public static final String METADATA_TERM_MATRIX_PATH = 
"metadata.term.matrix.path";
-  
+
   public static final String METADATA_WORD_MATRIX_PATH = 
"metadata.word.matrix.path";
-  
+
   public static final String METADATA_SESSION_MATRIX_PATH = 
"metadata.session.matrix.path";
 
   public static final String REQUEST_RATE = "mudrod.request.rate";
@@ -138,32 +137,29 @@
   public static final String SPARK_APP_NAME = "mudrod.spark.app.name";
 
   public static final String SPARK_MASTER = "mudrod.spark.master";
-  /**
-   * Absolute local location of javaSVMWithSGDModel directory. This is 
typically
-   * 
<code>file:///usr/local/mudrod/core/src/main/resources/javaSVMWithSGDModel</code>
-   */
+
   public static final String RANKING_MODEL = "mudrod.ranking.model";
-  
+
   public static final String RANKING_ML = "mudrod.ranking.machine.learning";
 
   public static final String REQUEST_TIME_GAP = "mudrod.request.time.gap";
 
   public static final String TIME_SUFFIX = "time.suffix";
 
-  public static final String USE_HISTORY_LINKAGE_TYPE = "user.history.linkage";
+  public static final String USER_HISTORY_LINKAGE_TYPE = 
"mudrod.user.history.linkage";
 
   public static final String USER_HISTORY_W = "mudrod.user.history.weight";
-  
+
   public static final String USER_HISTORY_PATH = "mudrod.user.history.path";
 
   public static final String VIEW_F = "mudrod.view.freq";
-  
+
   public static final String VIEW_MARKER = "mudrod.view.url.marker";
-  
+
   public static final String SEARCH_MARKER = "mudrod.search.url.marker";
-  
+
   public static final String SEARCH_F = "mudrod.search.freq";
-  
+
   public static final String DOWNLOAD_F = "mudrod.download.freq";
 
 }
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java 
b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
index b74074a..f492ecf 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
@@ -32,7 +32,7 @@
 import java.util.Properties;
 
 /**
- * ClassName: ApiHarvester Function: Harvest metadata from PO.DAACweb service.
+ * Harvest metadata from PO.DAAC Webservices.
  */
 public class ApiHarvester extends DiscoveryStepAbstract {
 
@@ -70,7 +70,7 @@ public Object execute() {
   }
 
   /**
-   * addMetadataMapping: Add mapping to index metadata in Elasticsearch. Please
+   * Add mapping to index metadata in Elasticsearch. Please
    * invoke this method before import metadata to Elasticsearch.
    */
   public void addMetadataMapping() {
@@ -84,7 +84,7 @@ public void addMetadataMapping() {
   }
 
   /**
-   * importToES: Index metadata into elasticsearch from local file directory.
+   * Index metadata into elasticsearch from local file directory.
    * Please make sure metadata have been harvest from web service before
    * invoking this method.
    */
@@ -118,12 +118,12 @@ private void importSingleFileToES(InputStream is) {
   }
 
   /**
-   * harvestMetadatafromWeb: Harvest metadata from PO.DAAC web service.
+   * Harvest metadata from PO.DAAC web service.
    */
   private void harvestMetadatafromWeb() {
     LOG.info("Metadata download started.");
     int startIndex = 0;
-    int doc_length = 0;
+    int docLength = 0;
     JsonParser parser = new JsonParser();
     do {
       String searchAPI = 
props.getProperty(MudrodConstants.METADATA_DOWNLOAD_URL);
@@ -135,7 +135,7 @@ private void harvestMetadatafromWeb() {
       JsonObject responseObject = json.getAsJsonObject();
       JsonArray docs = 
responseObject.getAsJsonObject("response").getAsJsonArray("docs");
 
-      doc_length = docs.size();
+      docLength = docs.size();
 
       File file = new 
File(props.getProperty(MudrodConstants.RAW_METADATA_PATH));
       if (!file.exists()) {
@@ -145,7 +145,7 @@ private void harvestMetadatafromWeb() {
           LOG.error("Failed to create directory!");
         }
       }
-      for (int i = 0; i < doc_length; i++) {
+      for (int i = 0; i < docLength; i++) {
         JsonElement item = docs.get(i);
         int docId = startIndex + i;
         File itemfile = new 
File(props.getProperty(MudrodConstants.RAW_METADATA_PATH) + "/" + docId + 
".json");
@@ -167,7 +167,7 @@ private void harvestMetadatafromWeb() {
         Thread.currentThread().interrupt();
       }
 
-    } while (doc_length != 0);
+    } while (docLength != 0);
     
     LOG.info("Metadata downloading finished");
   }
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java 
b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
index e4a6320..a8d3d22 100644
--- 
a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
@@ -64,7 +64,12 @@ public Object execute() {
     String metadataMatrixFile = 
props.getProperty(MudrodConstants.METADATA_MATRIX_PATH);
     try {
       MetadataExtractor extractor = new MetadataExtractor();
-      JavaPairRDD<String, List<String>> metadataTermsRDD = 
extractor.loadMetadata(this.es, this.spark.sc, 
props.getProperty(MudrodConstants.ES_INDEX_NAME), 
props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
+      JavaPairRDD<String, List<String>> metadataTermsRDD = 
+              extractor.loadMetadata(
+                      this.es,
+                      this.spark.sc,
+                      props.getProperty(MudrodConstants.ES_INDEX_NAME),
+                      props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
       LabeledRowMatrix wordDocMatrix = 
MatrixUtil.createWordDocMatrix(metadataTermsRDD);
       MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, 
wordDocMatrix.colkeys, metadataMatrixFile);
 
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
 
b/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
index 303eb52..7dc72c5 100644
--- 
a/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
@@ -27,8 +27,7 @@
 import java.util.Properties;
 
 /**
- * ClassName: MetadataAnalyzer
- * Function: Calculate semantic relationship of vocabularies extracted from
+ * Calculate semantic relationship of vocabularies extracted from
  * metadata.
  */
 public class MetadataAnalyzer extends DiscoveryStepAbstract implements 
Serializable {
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
 
b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
index e17d2ce..3486607 100644
--- 
a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
@@ -81,7 +81,11 @@ public LabeledRowMatrix generateWordBasedTFIDF() throws 
Exception {
 
     LabeledRowMatrix wordtfidfMatrix = opt.tFIDFTokens(metadataWords, spark);
 
-    MatrixUtil.exportToCSV(wordtfidfMatrix.rowMatrix, wordtfidfMatrix.rowkeys, 
wordtfidfMatrix.colkeys, 
props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH));
+    MatrixUtil.exportToCSV(
+            wordtfidfMatrix.rowMatrix,
+            wordtfidfMatrix.rowkeys,
+            wordtfidfMatrix.colkeys,
+            props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH));
 
     return wordtfidfMatrix;
   }
@@ -100,7 +104,11 @@ public LabeledRowMatrix generateTermBasedTFIDF() throws 
Exception {
 
     LabeledRowMatrix tokentfidfMatrix = opt.tFIDFTokens(metadataTokens, spark);
 
-    MatrixUtil.exportToCSV(tokentfidfMatrix.rowMatrix, 
tokentfidfMatrix.rowkeys, tokentfidfMatrix.colkeys, 
props.getProperty(MudrodConstants.METADATA_TERM_MATRIX_PATH));
+    MatrixUtil.exportToCSV(
+            tokentfidfMatrix.rowMatrix,
+            tokentfidfMatrix.rowkeys,
+            tokentfidfMatrix.colkeys,
+            props.getProperty(MudrodConstants.METADATA_TERM_MATRIX_PATH));
 
     return tokentfidfMatrix;
   }
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java 
b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java
similarity index 72%
rename from 
core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java
rename to 
core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java
index 9121719..546fae5 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java
@@ -32,13 +32,13 @@
  * Supports ability to import click stream data into Elasticsearch
  * through .csv file
  */
-public class ClickstreamImporter extends MudrodAbstract {
+public class ClickStreamImporter extends MudrodAbstract {
   /**
    *
    */
   private static final long serialVersionUID = 1L;
 
-  public ClickstreamImporter(Properties props, ESDriver es, SparkDriver spark) 
{
+  public ClickStreamImporter(Properties props, ESDriver es, SparkDriver spark) 
{
     super(props, es, spark);
     addClickStreamMapping();
   }
@@ -47,20 +47,23 @@ public ClickstreamImporter(Properties props, ESDriver es, 
SparkDriver spark) {
    * Method to add Elasticsearch mapping for click stream data
    */
   public void addClickStreamMapping() {
-    XContentBuilder Mapping;
+    XContentBuilder mapping;
+    String clickStreamMatrixType = 
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE);
     try {
-      Mapping = jsonBuilder().startObject().startObject(
-              
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).startObject(
-                      "properties").startObject("query").field("type", 
"string").field(
-                              "index", 
"not_analyzed").endObject().startObject("dataID").field(
-                                      "type", "string").field("index", 
"not_analyzed").endObject()
-
-          .endObject().endObject().endObject();
+      mapping = jsonBuilder()
+              .startObject()
+                .startObject(clickStreamMatrixType)
+                  .startObject("properties")
+                    .startObject("query").field("type", 
"string").field("index", "not_analyzed").endObject()
+                    .startObject("dataID").field("type", 
"string").field("index", "not_analyzed").endObject()
+                  .endObject()
+                .endObject()
+              .endObject();
 
       es.getClient().admin().indices().preparePutMapping(
               props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType(
-                      
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE)).setSource(
-                              Mapping).execute().actionGet();
+                      clickStreamMatrixType).setSource(
+                              mapping).execute().actionGet();
     } catch (IOException e) {
       e.printStackTrace();
     }
@@ -70,8 +73,9 @@ public void addClickStreamMapping() {
    * Method to import click stream CSV into Elasticsearch
    */
   public void importfromCSVtoES() {
-    es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), 
-            props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE));
+    String clickStreamMatrixType = 
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE);
+    String esIndexName = props.getProperty(MudrodConstants.ES_INDEX_NAME);
+    es.deleteType(esIndexName, clickStreamMatrixType);
     es.createBulkProcessor();
 
     BufferedReader br = null;
@@ -86,8 +90,7 @@ public void importfromCSVtoES() {
         String[] clicks = line.split(cvsSplitBy);
         for (int i = 1; i < clicks.length; i++) {
           if (!"0.0".equals(clicks[i])) {
-            IndexRequest ir = new 
IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), 
-                    
props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE))
+            IndexRequest ir = new IndexRequest(esIndexName, 
clickStreamMatrixType)
                 .source(jsonBuilder().startObject().field("query", 
clicks[0]).field(
                         "dataID", dataList[i]).field("clicks", 
clicks[i]).endObject());
             es.getBulkProcessor().add(ir);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java 
b/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java
index a574041..1ebae17 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/LinkageTriple.java
@@ -57,7 +57,6 @@
   public static DecimalFormat df = new DecimalFormat("#.00");
 
   public LinkageTriple() {
-    // TODO Auto-generated constructor stub
   }
 
   /**
@@ -96,8 +95,13 @@ public static void insertTriples(ESDriver es, 
List<LinkageTriple> triples, Strin
       } else {
         jsonBuilder.field("keywords", triple.keyA + "," + triple.keyB);
       }
-
-      jsonBuilder.field("weight", 
Double.parseDouble(df.format(triple.weight)));
+      double tripleWeight = 0;
+      try {
+        tripleWeight = Double.parseDouble(df.format(triple.weight));
+      } catch (NumberFormatException  e) {
+        // do nothing, triple weight is 0 as it cannot be parsed
+      }
+      jsonBuilder.field("weight", tripleWeight);
       jsonBuilder.endObject();
 
       IndexRequest ir = new IndexRequest(index, type).source(jsonBuilder);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java 
b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
index 7eef272..ec27a29 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
@@ -457,8 +457,11 @@ public static void exportToCSV(RowMatrix matrix, 
List<String> rowKeys, List<Stri
     }
     try {
       file.createNewFile();
-      FileWriter fw = new FileWriter(file.getAbsoluteFile());
-      BufferedWriter bw = new BufferedWriter(fw);
+    } catch (IOException e1) {
+      e1.printStackTrace();
+    }
+    try (FileWriter fw = new FileWriter(file.getAbsoluteFile());
+            BufferedWriter bw = new BufferedWriter(fw);){
       String coltitle = " Num" + ",";
       for (int j = 0; j < colnum; j++) {
         coltitle += "\"" + colKeys.get(j) + "\",";
@@ -475,12 +478,8 @@ public static void exportToCSV(RowMatrix matrix, 
List<String> rowKeys, List<Stri
         row = row.substring(0, row.length() - 1);
         bw.write(row + "\n");
       }
-
-      bw.close();
-
     } catch (IOException e) {
       e.printStackTrace();
-
     }
   }
 }
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
index 704ccfd..2f37af9 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
@@ -31,7 +31,6 @@
 import 
org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.joda.time.DateTime;
 import org.joda.time.Seconds;
 import org.joda.time.format.DateTimeFormatter;
@@ -40,7 +39,11 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -98,7 +101,8 @@ public Object execute() {
   public boolean checkKnownCrawler(String agent) {
     String[] crawlers = 
props.getProperty(MudrodConstants.BLACK_LIST_AGENT).split(",");
     for (int i = 0; i < crawlers.length; i++) {
-      if (agent.toLowerCase().contains(crawlers[i].trim())) return true;
+      if (agent.toLowerCase().contains(crawlers[i].trim()))
+        return true;
     }  
     return false;
   }
@@ -110,20 +114,20 @@ void checkByRateInParallel() throws InterruptedException, 
IOException {
 
     int userCount = 0;
     userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, 
Integer>) iterator -> {
-      ESDriver tmpES = new ESDriver(props);
-      tmpES.createBulkProcessor();
+      ESDriver tmpEs = new ESDriver(props);
+      tmpEs.createBulkProcessor();
       List<Integer> realUserNums = new ArrayList<>();
       while (iterator.hasNext()) {
         String s = iterator.next();
-        Integer realUser = checkByRate(tmpES, s);
+        Integer realUser = checkByRate(tmpEs, s);
         realUserNums.add(realUser);
       }
-      tmpES.destroyBulkProcessor();
-      tmpES.close();
+      tmpEs.destroyBulkProcessor();
+      tmpEs.close();
       return realUserNums.iterator();
     }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
 
-    LOG.info("User count: {}", Integer.toString(userCount));
+    LOG.info("Final user count: {}", Integer.toString(userCount));
   }
 
   private int checkByRate(ESDriver es, String user) {
@@ -135,8 +139,19 @@ private int checkByRate(ESDriver es, String user) {
     BoolQueryBuilder filterSearch = new BoolQueryBuilder();
     filterSearch.must(QueryBuilders.termQuery("IP", user));
 
-    AggregationBuilder aggregation = 
AggregationBuilders.dateHistogram("by_minute").field("Time").dateHistogramInterval(DateHistogramInterval.MINUTE).order(Order.COUNT_DESC);
-    SearchResponse checkRobot = 
es.getClient().prepareSearch(logIndex).setTypes(httpType, 
ftpType).setQuery(filterSearch).setSize(0).addAggregation(aggregation).execute().actionGet();
+    AggregationBuilder aggregation = AggregationBuilders
+            .dateHistogram("by_minute")
+            .field("Time")
+            .dateHistogramInterval(DateHistogramInterval.MINUTE)
+            .order(Order.COUNT_DESC);
+    SearchResponse checkRobot = es.getClient()
+            .prepareSearch(logIndex)
+            .setTypes(httpType, ftpType)
+            .setQuery(filterSearch)
+            .setSize(0)
+            .addAggregation(aggregation)
+            .execute()
+            .actionGet();
 
     Histogram agg = checkRobot.getAggregations().get("by_minute");
 
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
index 1969e3e..415f377 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
@@ -72,38 +72,54 @@ public void generateBinaryMatrix() {
       bw.write("Num" + ",");
 
       // step 1: write first row of csv
-      List<String> logIndexList = 
es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+      List<String> logIndexList = es.getIndexListWithPrefix(
+              props.getProperty(MudrodConstants.LOG_INDEX));
 
       String[] logIndices = logIndexList.toArray(new String[0]);
       String[] statictypeArray = new String[] { this.sessionStats };
       int docCount = es.getDocCount(logIndices, statictypeArray);
-      
-      LOG.info("{}: {}", this.sessionStats, docCount);      
 
-      if (docCount==0) 
-      { 
+      LOG.info("{}: {}", this.sessionStats, docCount);
+
+      if (docCount==0) { 
         bw.close(); 
         file.delete();
         return;
       }
 
-      SearchResponse sr = 
es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-          
.addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet();
+      SearchResponse sr = es.getClient()
+              .prepareSearch(logIndices)
+              .setTypes(statictypeArray)
+              .setQuery(QueryBuilders.matchAllQuery())
+              .setSize(0)
+              .addAggregation(AggregationBuilders.terms("IPs")
+              .field("IP")
+              .size(docCount))
+              .execute()
+              .actionGet();
       Terms ips = sr.getAggregations().get("IPs");
       List<String> ipList = new ArrayList<>();
       for (Terms.Bucket entry : ips.getBuckets()) {
-        if (entry.getDocCount() > 
Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) { // filter
-          // out
-          // less
-          // active users/ips
+        // filter
+        if (entry.getDocCount() > 
Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) {
+          // out less active users/ips
           ipList.add(entry.getKey().toString());
         }
       }
       bw.write(String.join(",", ipList) + "\n");
 
       // step 2: step the rest rows of csv
-      SearchRequestBuilder sr2Builder = 
es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-          
.addAggregation(AggregationBuilders.terms("KeywordAgg").field("keywords").size(docCount).subAggregation(AggregationBuilders.terms("IPAgg").field("IP").size(docCount)));
+      SearchRequestBuilder sr2Builder = es.getClient()
+              .prepareSearch(logIndices)
+              .setTypes(statictypeArray)
+              .setQuery(QueryBuilders.matchAllQuery())
+              .setSize(0)
+              .addAggregation(AggregationBuilders.terms("KeywordAgg")
+                      .field("keywords")
+                      .size(docCount)
+                      .subAggregation(AggregationBuilders.terms("IPAgg")
+                              .field("IP")
+                              .size(docCount)));
 
       SearchResponse sr2 = sr2Builder.execute().actionGet();
       Terms keywords = sr2.getAggregations().get("KeywordAgg");
@@ -132,6 +148,7 @@ public void generateBinaryMatrix() {
       }
 
       bw.close();
+      fw.close();
     } catch (IOException e) {
       e.printStackTrace();
     }
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
index f3a7f6d..1d04902 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
@@ -53,7 +53,8 @@ public LogAbstract(Properties props, ESDriver es, SparkDriver 
spark) {
   }
 
   protected void initLogIndex() {
-    logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + 
props.getProperty(MudrodConstants.TIME_SUFFIX);
+    logIndex = props.getProperty(MudrodConstants.LOG_INDEX) 
+            + props.getProperty(MudrodConstants.TIME_SUFFIX);
     httpType = MudrodConstants.HTTP_TYPE;
     ftpType = MudrodConstants.FTP_TYPE;
     cleanupType = MudrodConstants.CLEANUP_TYPE;
@@ -116,8 +117,16 @@ public Terms getUserTerms(String... type) {
 
     int docCount = es.getDocCount(logIndex, type);
 
-    SearchResponse sr = 
es.getClient().prepareSearch(logIndex).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-        
.addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount)).execute().actionGet();
+    SearchResponse sr = es.getClient()
+            .prepareSearch(logIndex)
+            .setTypes(type)
+            .setQuery(QueryBuilders.matchAllQuery())
+            .setSize(0)
+            .addAggregation(AggregationBuilders.terms("Users")
+                    .field("IP")
+                    .size(docCount))
+            .execute()
+            .actionGet();
     return sr.getAggregations().get("Users");
   }
 
@@ -138,10 +147,23 @@ public Terms getUserTerms(String... type) {
 
     int docCount = es.getDocCount(logIndex, httpType);
 
-    AggregationBuilder dailyAgg = 
AggregationBuilders.dateHistogram("by_day").field("Time").dateHistogramInterval(DateHistogramInterval.DAY).order(Order.COUNT_DESC);
-
-    SearchResponse sr = 
es.getClient().prepareSearch(logIndex).setTypes(httpType).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-        
.addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount).subAggregation(dailyAgg)).execute().actionGet();
+    AggregationBuilder dailyAgg = AggregationBuilders
+            .dateHistogram("by_day")
+            .field("Time")
+            .dateHistogramInterval(DateHistogramInterval.DAY)
+            .order(Order.COUNT_DESC);
+
+    SearchResponse sr = es.getClient()
+            .prepareSearch(logIndex)
+            .setTypes(httpType)
+            .setQuery(QueryBuilders.matchAllQuery())
+            .setSize(0)
+            .addAggregation(AggregationBuilders.terms("Users")
+            .field("IP")
+            .size(docCount)
+            .subAggregation(dailyAgg))
+            .execute()
+            .actionGet();
     Terms users = sr.getAggregations().get("Users");
     Map<String, Long> userList = new HashMap<>();
     for (Terms.Bucket user : users.getBuckets()) {
@@ -201,11 +223,16 @@ public Terms getSessionTerms() {
 
     int docCount = es.getDocCount(this.logIndex, this.cleanupType);
 
-    SearchResponse sr = 
es.getClient().prepareSearch(this.logIndex).setTypes(this.cleanupType).setQuery(QueryBuilders.matchAllQuery())
-        
.addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount)).execute().actionGet();
-
-    Terms Sessions = sr.getAggregations().get("Sessions");
-    return Sessions;
+    SearchResponse sr = es.getClient()
+            .prepareSearch(this.logIndex)
+            .setTypes(this.cleanupType)
+            .setQuery(QueryBuilders.matchAllQuery())
+            .addAggregation(AggregationBuilders.terms("Sessions")
+            .field("SessionID")
+            .size(docCount))
+            .execute()
+            .actionGet();
+    return sr.getAggregations().get("Sessions");
   }
 
   public List<String> getSessions() {
@@ -213,7 +240,7 @@ public Terms getSessionTerms() {
     Terms sessions = this.getSessionTerms();
     List<String> sessionList = new ArrayList<>();
     for (Terms.Bucket entry : sessions.getBuckets()) {
-      if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) {
+      if (entry.getDocCount() >= 3 && !"invalid".equals(entry.getKey())) {
         String session = (String) entry.getKey();
         sessionList.add(session);
       }
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
index 4ce1535..3e8ffa7 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
@@ -93,50 +93,15 @@ public void generateSession() {
   }
 
   public void genSessionByReferer(int timeThres) throws InterruptedException, 
IOException {
-    
-      genSessionByRefererInParallel(timeThres);
-  
+    genSessionByRefererInParallel(timeThres);
   }
 
   public void combineShortSessions(int timeThres) throws InterruptedException, 
IOException {
-   
-      combineShortSessionsInParallel(timeThres);
-    
+    combineShortSessionsInParallel(timeThres);
   }
 
   /**
-   * Method to generate session by time threshold and referrer
-   *
-   * @param timeThres value of time threshold (s)
-   * @throws ElasticsearchException ElasticsearchException
-   * @throws IOException            IOException
-   */
-  public void genSessionByRefererInSequential(int timeThres) throws 
ElasticsearchException, IOException {
-
-    Terms users = this.getUserTerms(this.cleanupType);
-
-    int sessionCount = 0;
-    for (Terms.Bucket entry : users.getBuckets()) {
-
-      String user = (String) entry.getKey();
-      Integer sessionNum = genSessionByReferer(es, user, timeThres);
-      sessionCount += sessionNum;
-    }
-
-    LOG.info("Initial session count: {}", Integer.toString(sessionCount));
-  }
-
-  public void combineShortSessionsInSequential(int timeThres) throws 
ElasticsearchException, IOException {
-
-    Terms users = this.getUserTerms(this.cleanupType);
-    for (Terms.Bucket entry : users.getBuckets()) {
-      String user = entry.getKey().toString();
-      combineShortSessions(es, user, timeThres);
-    }
-  }
-
-  /**
-   * Method to remove invalid logs through IP address
+   * Remove invalid logs through IP address
    *
    * @param es an instantiated es driver
    * @param ip invalid IP address
@@ -148,13 +113,24 @@ public void deleteInvalid(ESDriver es, String ip) throws 
IOException {
     BoolQueryBuilder filterAll = new BoolQueryBuilder();
     filterAll.must(QueryBuilders.termQuery("IP", ip));
 
-    SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterAll).setSize(100).execute().actionGet();
+    SearchResponse scrollResp = es.getClient()
+            .prepareSearch(logIndex)
+            .setTypes(this.cleanupType)
+            .setScroll(new TimeValue(60000))
+            .setQuery(filterAll)
+            .setSize(100)
+            .execute()
+            .actionGet();
     while (true) {
       for (SearchHit hit : scrollResp.getHits().getHits()) {
         update(es, logIndex, cleanupType, hit.getId(), "SessionID", "invalid");
       }
 
-      scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
+      scrollResp = es.getClient()
+              .prepareSearchScroll(scrollResp.getScrollId())
+              .setScroll(new TimeValue(600000))
+              .execute()
+              .actionGet();
       if (scrollResp.getHits().getHits().length == 0) {
         break;
       }
@@ -174,14 +150,14 @@ public void deleteInvalid(ESDriver es, String ip) throws 
IOException {
    * @throws IOException
    */
   private void update(ESDriver es, String index, String type, String id, 
String field1, Object value1) throws IOException {
-    UpdateRequest ur = new UpdateRequest(index, type, 
id).doc(jsonBuilder().startObject().field(field1, value1).endObject());
+    UpdateRequest ur = new UpdateRequest(index, type, id)
+            .doc(jsonBuilder().startObject().field(field1, 
value1).endObject());
     es.getBulkProcessor().add(ur);
   }
 
   public void genSessionByRefererInParallel(int timeThres) throws 
InterruptedException, IOException {
 
     JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
-
     int sessionCount = 0;
     sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, 
Integer>() {
       /**
@@ -226,17 +202,24 @@ public int genSessionByReferer(ESDriver es, String user, 
int timeThres) throws E
     BoolQueryBuilder filterSearch = new BoolQueryBuilder();
     filterSearch.must(QueryBuilders.termQuery("IP", user));
 
-    SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterSearch).addSort("Time", 
SortOrder.ASC).setSize(100)
-        .execute().actionGet();
+    SearchResponse scrollResp = es.getClient()
+            .prepareSearch(logIndex)
+            .setTypes(this.cleanupType)
+            .setScroll(new TimeValue(60000))
+            .setQuery(filterSearch)
+            .addSort("Time", SortOrder.ASC)
+            .setSize(100)
+            .execute()
+            .actionGet();
 
     Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>();
-    String request = "";
-    String referer = "";
-    String logType = "";
-    String id = "";
+    String request;
+    String referer;
+    String logType;
+    String id;
     String ip = user;
     String indexUrl = props.getProperty(MudrodConstants.BASE_URL) + "/";
-    DateTime time = null;
+    DateTime time;
     DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
 
     while (scrollResp.getHits().getHits().length != 0) {
@@ -354,6 +337,7 @@ public void call(Iterator<String> arg0) throws Exception {
         tmpES.close();
       }
     });
+    LOG.info("Final Session count (after combining short sessions): {}", 
Long.toString(userRDD.count()));
   }
 
   public void combineShortSessions(ESDriver es, String user, int timeThres) 
throws ElasticsearchException, IOException {
@@ -372,7 +356,14 @@ public void combineShortSessions(ESDriver es, String user, 
int timeThres) throws
 
     BoolQueryBuilder filterCheck = new BoolQueryBuilder();
     filterCheck.must(QueryBuilders.termQuery("IP", 
user)).must(QueryBuilders.termQuery("Referer", "-"));
-    SearchResponse checkReferer = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterCheck).setSize(0).execute().actionGet();
+    SearchResponse checkReferer = es.getClient()
+            .prepareSearch(logIndex)
+            .setTypes(this.cleanupType)
+            .setScroll(new TimeValue(60000))
+            .setQuery(filterCheck)
+            .setSize(0)
+            .execute()
+            .actionGet();
 
     long numInvalid = checkReferer.getHits().getTotalHits();
     double invalidRate = numInvalid / docCount;
@@ -383,8 +374,17 @@ public void combineShortSessions(ESDriver es, String user, 
int timeThres) throws
     }
 
     StatsAggregationBuilder statsAgg = 
AggregationBuilders.stats("Stats").field("Time");
-    SearchResponse srSession = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterSearch)
-        
.addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount).subAggregation(statsAgg)).execute().actionGet();
+    SearchResponse srSession = es.getClient()
+            .prepareSearch(logIndex)
+            .setTypes(this.cleanupType)
+            .setScroll(new TimeValue(60000))
+            .setQuery(filterSearch)
+            .addAggregation(AggregationBuilders.terms("Sessions")
+            .field("SessionID")
+            .size(docCount)
+            .subAggregation(statsAgg))
+            .execute()
+            .actionGet();
 
     Terms sessions = srSession.getAggregations().get("Sessions");
 
@@ -400,7 +400,7 @@ public void combineShortSessions(ESDriver es, String user, 
int timeThres) throws
     String last = null;
     String lastnewID = null;
     String lastoldID = null;
-    String current = null;
+    String current;
     for (Session s : sessionList) {
       current = s.getEndTime();
       if (last != null) {
@@ -413,7 +413,14 @@ public void combineShortSessions(ESDriver es, String user, 
int timeThres) throws
 
           QueryBuilder fs = 
QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", 
s.getID()));
 
-          SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(fs).setSize(100).execute().actionGet();
+          SearchResponse scrollResp = es.getClient()
+                  .prepareSearch(logIndex)
+                  .setTypes(this.cleanupType)
+                  .setScroll(new TimeValue(60000))
+                  .setQuery(fs)
+                  .setSize(100)
+                  .execute()
+                  .actionGet();
           while (true) {
             for (SearchHit hit : scrollResp.getHits().getHits()) {
               if (lastnewID == null) {
@@ -423,7 +430,11 @@ public void combineShortSessions(ESDriver es, String user, 
int timeThres) throws
               }
             }
 
-            scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
+            scrollResp = es.getClient()
+                    .prepareSearchScroll(scrollResp.getScrollId())
+                    .setScroll(new TimeValue(600000))
+                    .execute()
+                    .actionGet();
             if (scrollResp.getHits().getHits().length == 0) {
               break;
             }
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
index 981bece..e99af06 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
@@ -27,7 +27,6 @@
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.metrics.stats.Stats;
 import 
org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
 import org.joda.time.DateTime;
@@ -38,7 +37,12 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -81,7 +85,7 @@ public Object execute() {
   }
 
   public void processSession() throws InterruptedException, IOException, 
ExecutionException {
-      processSessionInParallel();
+    processSessionInParallel();
   }
 
   /**
@@ -160,7 +164,13 @@ public int processSession(ESDriver es, String sessionId) 
throws IOException, Int
     BoolQueryBuilder filterSearch = new BoolQueryBuilder();
     filterSearch.must(QueryBuilders.termQuery("SessionID", sessionId));
 
-    SearchResponse sr = 
es.getClient().prepareSearch(logIndex).setTypes(inputType).setQuery(filterSearch).addAggregation(statsAgg).execute().actionGet();
+    SearchResponse sr = es.getClient()
+            .prepareSearch(logIndex)
+            .setTypes(inputType)
+            .setQuery(filterSearch)
+            .addAggregation(statsAgg)
+            .execute()
+            .actionGet();
 
     Stats agg = sr.getAggregations().get("Stats");
     min = agg.getMinAsString();
@@ -231,9 +241,9 @@ public int processSession(ESDriver es, String sessionId) 
throws IOException, Int
             String view = findDataset(request);
             if ("".equals(views)) 
               views = view;
-             else if (!views.contains(view)) 
-                views = views + "," + view;          
-           }          
+            else if (!views.contains(view)) 
+              views = views + "," + view;
+          }
         }
         if (MudrodConstants.FTP_LOG.equals(logType)) {
           ftpRequestCount++;
@@ -257,7 +267,11 @@ else if (!views.contains(view))
 
       }
 
-      scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
+      scrollResp = es.getClient()
+              .prepareSearchScroll(scrollResp.getScrollId())
+              .setScroll(new TimeValue(600000))
+              .execute()
+              .actionGet();
       // Break condition: No hits are returned
       if (scrollResp.getHits().getHits().length == 0) {
         break;
@@ -269,12 +283,17 @@ else if (!views.contains(view))
     }
 
     if (searchDataListRequestCount != 0 && 
-        searchDataListRequestCount <= 
Integer.parseInt(props.getProperty(MudrodConstants.SEARCH_F)) && 
-        searchDataRequestCount != 0 && 
-        searchDataRequestCount <= 
Integer.parseInt(props.getProperty(MudrodConstants.VIEW_F)) && 
-        ftpRequestCount <= 
Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_F))) 
+            searchDataListRequestCount <= 
Integer.parseInt(props.getProperty(MudrodConstants.SEARCH_F)) && 
+            searchDataRequestCount != 0 && 
+            searchDataRequestCount <= 
Integer.parseInt(props.getProperty(MudrodConstants.VIEW_F)) && 
+            ftpRequestCount <= 
Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_F))) 
     {
-      String sessionURL = props.getProperty(MudrodConstants.SESSION_PORT) + 
props.getProperty(MudrodConstants.SESSION_URL) + "?sessionid=" + sessionId + 
"&sessionType=" + outputType + "&requestType=" + inputType;
+      String sessionURL = props.getProperty(
+              MudrodConstants.SESSION_PORT)
+              + props.getProperty(MudrodConstants.SESSION_URL)
+              + "?sessionid=" + sessionId
+              + "&sessionType=" + outputType
+              + "&requestType=" + inputType;
       sessionCount = 1;
 
       IndexRequest ir = new IndexRequest(logIndex, outputType).source(
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
index 70c4067..193115e 100644
--- 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
@@ -18,7 +18,7 @@
 import org.apache.sdap.mudrod.driver.SparkDriver;
 import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.apache.sdap.mudrod.semantics.SVDAnalyzer;
-import org.apache.sdap.mudrod.ssearch.ClickstreamImporter;
+import org.apache.sdap.mudrod.ssearch.ClickStreamImporter;
 import org.apache.sdap.mudrod.utils.LinkageTriple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,18 +51,18 @@ public Object execute() {
     LOG.info("Starting ClickStreamAnalyzer...");
     startTime = System.currentTimeMillis();
     try {
-      String clickstream_matrixFile = 
props.getProperty(MudrodConstants.CLICKSTREAM_PATH);
-      File f = new File(clickstream_matrixFile);
+      String clickstreamMatrixFile = 
props.getProperty(MudrodConstants.CLICKSTREAM_PATH);
+      File f = new File(clickstreamMatrixFile);
       if (f.exists()) {
         SVDAnalyzer svd = new SVDAnalyzer(props, es, spark);
-        svd.getSVDMatrix(clickstream_matrixFile, 
+        svd.getSVDMatrix(clickstreamMatrixFile, 
             
Integer.parseInt(props.getProperty(MudrodConstants.CLICKSTREAM_SVD_DIM)), 
             props.getProperty(MudrodConstants.CLICKSTREAM_SVD_PATH));
         List<LinkageTriple> tripleList = 
svd.calTermSimfromMatrix(props.getProperty(MudrodConstants.CLICKSTREAM_SVD_PATH));
         svd.saveToES(tripleList, 
props.getProperty(MudrodConstants.ES_INDEX_NAME), 
MudrodConstants.CLICK_STREAM_LINKAGE_TYPE);
       
         // Store click stream in ES for the ranking use
-        ClickstreamImporter cs = new ClickstreamImporter(props, es, spark);
+        ClickStreamImporter cs = new ClickStreamImporter(props, es, spark);
         cs.importfromCSVtoES();
       }
     } catch (Exception e) {
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
index c4c5da4..cb1e12f 100644
--- 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
@@ -50,7 +50,7 @@ public Object execute() {
 
     SemanticAnalyzer sa = new SemanticAnalyzer(props, es, spark);
     List<LinkageTriple> tripleList = 
sa.calTermSimfromMatrix(props.getProperty(MudrodConstants.USER_HISTORY_PATH));
-    sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), 
props.getProperty(MudrodConstants.USE_HISTORY_LINKAGE_TYPE));
+    sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), 
props.getProperty(MudrodConstants.USER_HISTORY_LINKAGE_TYPE));
 
     endTime = System.currentTimeMillis();
     es.refreshIndex();
diff --git a/core/src/main/resources/config.properties 
b/core/src/main/resources/config.properties
index 495d29c..5ed2504 100644
--- a/core/src/main/resources/config.properties
+++ b/core/src/main/resources/config.properties
@@ -16,14 +16,14 @@ mudrod.es.transport.tcp.port = 9300
 mudrod.es.unicast.hosts = 127.0.0.1
 mudrod.es.http.port = 9200
 mudrod.es.index = mudrod
-    
+
 # Spark related
 # Log processing type. Possible values include 'sequential' or 'parallel'
 mudrod.processing.type = parallel
 mudrod.spark.app.name = MudrodSparkApp
 mudrod.spark.master = local[4]
 mudrod.spark.optimize = repartition
-    
+
 # Web log processing configuration
 # index name has to be all lowercase
 mudrod.log.index = log
@@ -38,37 +38,43 @@ mudrod.download.freq = 100
 mudrod.request.rate = 30
 mudrod.session.port = 8080
 mudrod.session.url = /mudrod-service/session.html
-mudrod.request.time.gap = 600   
+mudrod.request.time.gap = 600
 mudrod.view.url.marker = /dataset/
 mudrod.search.url.marker = /datasetlist?
-# In order to better parse a URL (getting searching keyword, etc.), please 
consider custimize 
+# In order to better parse a URL (getting searching keyword, etc.), please 
consider customizing the 
 # org.apache.sdap.mudrod.weblog.structure.RequestUrl - GetSearchInfo, 
getFilterInfo
-       
+
 # User search history
 mudrod.query.min = 0
 mudrod.user.history.weight = 2
-       
+mudrod.user.history.linkage = UserHistoryLinkage
+mudrod.user.history.path = userHistoryMatrix
+
 # clickstream
 mudrod.download.weight = 3
 mudrod.clickstream.svd.d = 50
 mudrod.clickstream.weight = 2
-                               
+mudrod.clickstream.matrix = clickStreamMatrix
+mudrod.clickstream.linkage = ClickStreamLinkage
+mudrod.clickstream.path = clickStreamMatrix
+mudrod.clickstream.svd.path = clickStreamMatrixSVD
+
 # metadata
-mudrod.metadata.download = 0
+mudrod.metadata.download = 1
 mudrod.metadata.download.url = 
https://podaac.jpl.nasa.gov/api/dataset?startIndex=$startIndex&amp;entries=10&amp;sortField=Dataset-AllTimePopularity&amp;sortOrder=asc&amp;id=&amp;value=&amp;search=
 mudrod.metadata.svd.d = 50
 mudrod.metadata.url = null
 mudrod.metadata.weight = 1
 mudrod.metadata.type = RawMetadata
-               
+
 # ranking, ${svmSgdModel.value} is resolved at build time. See the property in 
core/pom.xml for the value
 mudrod.ranking.machine.learning = 1
 mudrod.ranking.model = ${svmSgdModel.value}.zip
-               
+
 # recommendation
 mudrod.metadata.id = Dataset-ShortName
 mudrod.metadata.semantic.fields = 
DatasetParameter-Term,DatasetParameter-Variable,Dataset-ExtractTerm
 
 # ontology service implementation. Possible values include EsipPortal - 
EsipPortalOntology EsipCOR - EsipCOROntology Local - 
org.apache.sdap.mudrod.ontology.process.Local
 mudrod.ontology.implementation = Local
-mudrod.ontology.weight = 2
+mudrod.ontology.weight = 2
\ No newline at end of file
diff --git a/core/src/main/resources/elastic_mappings.json 
b/core/src/main/resources/elastic_mappings.json
index ddb1952..3730b87 100644
--- a/core/src/main/resources/elastic_mappings.json
+++ b/core/src/main/resources/elastic_mappings.json
@@ -1,116 +1,68 @@
 {
-    "mappings": {
-        "doc": {
-            "dynamic_templates": [
-                {
-                    "geo_point_mapping": {
-                        "match_mapping_type": "nested",
-                        "match": "Point",
-                        "mapping": {
-                            "type": "geo_point"
-                        }
-                    },
-                    "geo_shape_mapping": {
-                        "match_mapping_type": "nested",
-                        "match": "Polygon",
-                        "mapping": {
-                            "type": "geo_shape"
-                        }
-                    }
-                }
-            ]
-        }
-    },
-    "eonet_event": {
-        "properties": {
-            "sources": {
-                "type": "nested",
-                "fields": {
-                    "id": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    },
-                    "url": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    }
-                }
-            },
-            "geometries": {
-                "type": "nested",
-                "fields": {
-                    "date": {
-                        "ignore_above": 256,
-                        "type": "date"
-                    },
-                    "type": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    },
-                    "coordinates": {
-                        "ignore_above": 256,
-                        "dynamic": true
-                    }
-                }
-            },
-            "link": {
-                "type": "text",
-                "fields": {
-                    "keyword": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    }
-                }
-            },
-            "closed": {
-                "type": "text",
-                "fields": {
-                    "keyword": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    }
-                }
-            },
-            "description": {
-                "type": "text",
-                "fields": {
-                    "keyword": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    }
-                }
-            },
-            "categories": {
-                "type": "nested",
-                "fields": {
-                    "id": {
-                        "ignore_above": 256,
-                        "type": "integer"
-                    },
-                    "title": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    }
-                }
-            },
-            "id": {
-                "type": "text",
-                "fields": {
-                    "keyword": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    }
-                }
-            },
-            "title": {
-                "type": "completion",
-                "fields": {
-                    "keyword": {
-                        "ignore_above": 256,
-                        "type": "keyword"
-                    }
-                }
-            }
-        }
+  "_default_":{
+    "properties":{
+      "keywords":{
+        "type":"text",
+        "analyzer":"csv",
+        "fielddata":true
+      },
+      "views":{
+        "type":"string",
+        "analyzer":"csv"
+      },
+      "downloads":{
+        "type":"string",
+        "analyzer":"csv"
+      },
+      "RequestUrl":{
+        "type":"string",
+        "include_in_all":false,
+        "index":"no"
+      },
+      "IP":{
+        "type":"keyword",
+        "index":"not_analyzed"
+      },
+      "Browser":{
+        "type":"string",
+        "include_in_all":false,
+        "index":"no"
+      },
+      "SessionURL":{
+        "type":"string",
+        "include_in_all":false,
+        "index":"no"
+      },
+      "Referer":{
+        "type":"string",
+        "index":"not_analyzed"
+      },
+      "SessionID":{
+        "type":"string",
+        "index":"not_analyzed"
+      },
+      "Response":{
+        "type":"string",
+        "include_in_all":false,
+        "index":"no"
+      },
+      "Request":{
+        "type":"string",
+        "include_in_all":false,
+        "index":"no"
+      },
+      "Coordinates":{
+        "type":"geo_point",
+        "include_in_all":false,
+        "index":"no"
+      },
+      "LogType":{
+        "type":"string",
+        "index":"not_analyzed"
+      },
+      "Dataset-Metadata":{
+        "type":"completion"
+      }
     }
+  }
 }
\ No newline at end of file
diff --git 
a/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java
 
b/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java
index 6b410f4..9bbc1cd 100644
--- 
a/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java
+++ 
b/service/src/main/java/org/apache/sdap/mudrod/services/eonet/EONETIngestionResource.java
@@ -1,5 +1,15 @@
-/**
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
  * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.sdap.mudrod.services.eonet;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> full ingestion workflow and process broken in several places
> ------------------------------------------------------------
>
>                 Key: SDAP-71
>                 URL: https://issues.apache.org/jira/browse/SDAP-71
>             Project: Apache Science Data Analytics Platform
>          Issue Type: Improvement
>            Reporter: Lewis John McGibbney
>            Priority: Major
>
> Amongst other things, the configuration overhaul broke several parts of the 
> full ingestion workflow. This has taken literally ages to debug however I now 
> have a patch at hand which fixes all issues I've encountered. 
> This has been an absolute blocker for me implementing the eventsView... I'll 
> now get on to that.
> PR coming up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to