http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java
 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java
deleted file mode 100644
index 4397873..0000000
--- 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package gov.nasa.jpl.mudrod.weblog.partition;
-
-import java.util.*;
-
-public class KGreedyPartitionSolver implements ThePartitionProblemSolver {
-
-  public boolean bsorted = false;
-
-  public KGreedyPartitionSolver() {
-    // default constructor
-  }
-
-  public KGreedyPartitionSolver(boolean bsorted) {
-    this.bsorted = true;
-  }
-
-  @Override
-  public Map<String, Integer> solve(Map<String, Double> labelNums, int k) {
-    List<Double> lista = null;
-    List<String> months = null;
-
-    if (!this.bsorted) {
-      LinkedHashMap sortedMap = this.sortMapByValue(labelNums);
-      lista = new ArrayList(sortedMap.values());
-      months = new ArrayList(sortedMap.keySet());
-    } else {
-      lista = new ArrayList(labelNums.values());
-      months = new ArrayList(labelNums.keySet());
-    }
-
-    List<List<Double>> parts = new ArrayList<>();
-    List<List<String>> splitMonths = new ArrayList<>();
-
-    for (int i = 0; i < k; i++) {
-      List<Double> part = new ArrayList();
-      parts.add(part);
-
-      List<String> monthList = new ArrayList();
-      splitMonths.add(monthList);
-    }
-
-    int j = 0;
-    for (Double lista1 : lista) {
-
-      Double minimalSum = -1.0;
-      int position = 0;
-      for (int i = 0; i < parts.size(); i++) {
-        List<Double> part = parts.get(i);
-        if (minimalSum == -1) {
-          minimalSum = Suma(part);
-          position = i;
-        } else if (Suma(part) < minimalSum) {
-          minimalSum = Suma(part);
-          position = i;
-        }
-      }
-
-      List<Double> part = parts.get(position);
-      part.add(lista1);
-      parts.set(position, part);
-
-      List<String> month = splitMonths.get(position);
-      month.add(months.get(j));
-      splitMonths.set(position, month);
-      j++;
-    }
-
-    /*  for(int i=0; i<splitMonths.size(); i++){
-        System.out.println("group:" + i);
-        printStrList(splitMonths.get(i));
-      }
-      
-      for(int i=0; i<parts.size(); i++){
-        print(parts.get(i));
-      }*/
-
-    Map<String, Integer> LabelGroups = new HashMap<String, Integer>();
-    for (int i = 0; i < splitMonths.size(); i++) {
-      List<String> list = splitMonths.get(i);
-      for (int m = 0; m < list.size(); m++) {
-        LabelGroups.put(list.get(m), i);
-      }
-    }
-
-    return LabelGroups;
-  }
-
-  public LinkedHashMap<String, Double> sortMapByValue(Map passedMap) {
-    List mapKeys = new ArrayList(passedMap.keySet());
-    List mapValues = new ArrayList(passedMap.values());
-    Collections.sort(mapValues, Collections.reverseOrder());
-    Collections.sort(mapKeys, Collections.reverseOrder());
-
-    LinkedHashMap sortedMap = new LinkedHashMap();
-
-    Iterator valueIt = mapValues.iterator();
-    while (valueIt.hasNext()) {
-      Object val = valueIt.next();
-      Iterator keyIt = mapKeys.iterator();
-
-      while (keyIt.hasNext()) {
-        Object key = keyIt.next();
-        String comp1 = passedMap.get(key).toString();
-        String comp2 = val.toString();
-
-        if (comp1.equals(comp2)) {
-          passedMap.remove(key);
-          mapKeys.remove(key);
-          sortedMap.put((String) key, (Double) val);
-          break;
-        }
-
-      }
-
-    }
-    return sortedMap;
-  }
-
-  private Double Suma(List<Double> part) {
-    Double ret = 0.0;
-    for (int i = 0; i < part.size(); i++) {
-      ret += part.get(i);
-    }
-    return ret;
-  }
-
-  private void print(List<Double> list) {
-    /*for (int i = 0; i < list.size(); i++) {
-        System.out.print(list.get(i)+",");
-    }*/
-    System.out.print("sum is:" + Suma(list));
-    System.out.println();
-  }
-
-  private void printStrList(List<String> list) {
-    for (int i = 0; i < list.size(); i++) {
-      System.out.print(list.get(i) + ",");
-    }
-    System.out.println();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java
 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java
deleted file mode 100644
index 11aaed3..0000000
--- 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package gov.nasa.jpl.mudrod.weblog.partition;
-
-import java.util.Map;
-
-public interface ThePartitionProblemSolver {
-
-  public Map<String, Integer> solve(Map<String, Double> labelNums, int k);
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java
deleted file mode 100644
index 4c299dd..0000000
--- 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package gov.nasa.jpl.mudrod.weblog.partition;
-
-import org.apache.spark.Partitioner;
-
-import java.util.Map;
-
-public class logPartitioner extends Partitioner {
-
-  int num;
-  Map<String, Integer> UserGroups;
-
-  public logPartitioner(int num) {
-    this.num = num;
-  }
-
-  public logPartitioner(Map<String, Integer> UserGroups, int num) {
-    this.UserGroups = UserGroups;
-    this.num = num;
-  }
-
-  @Override
-  public int getPartition(Object arg0) {
-    // TODO Auto-generated method stub
-    String user = (String) arg0;
-    return UserGroups.get(user);
-  }
-
-  @Override
-  public int numPartitions() {
-    // TODO Auto-generated method stub
-    return num;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java
deleted file mode 100644
index 34323df..0000000
--- 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix;
-import gov.nasa.jpl.mudrod.utils.MatrixUtil;
-import gov.nasa.jpl.mudrod.weblog.structure.ClickStream;
-import gov.nasa.jpl.mudrod.weblog.structure.SessionExtractor;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Supports ability to extract click stream data based on session processing 
results
- */
-public class ClickStreamGenerator extends DiscoveryStepAbstract {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(ClickStreamGenerator.class);
-
-  public ClickStreamGenerator(Properties props, ESDriver es, SparkDriver 
spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("Starting ClickStreamGenerator...");
-    startTime = System.currentTimeMillis();
-
-    String clickstremMatrixFile = props.getProperty("clickstreamMatrix");
-    try {
-      SessionExtractor extractor = new SessionExtractor();
-      JavaRDD<ClickStream> clickstreamRDD = 
extractor.extractClickStreamFromES(this.props, this.es, this.spark);
-      int weight = Integer.parseInt(props.getProperty("downloadWeight"));
-      JavaPairRDD<String, List<String>> metaddataQueryRDD = 
extractor.bulidDataQueryRDD(clickstreamRDD, weight);
-      LabeledRowMatrix wordDocMatrix = 
MatrixUtil.createWordDocMatrix(metaddataQueryRDD);
-
-      MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, 
wordDocMatrix.colkeys, clickstremMatrixFile);
-    } catch (Exception e) {
-      LOG.error("Encountered error within ClickStreamGenerator: {}", e);
-    }
-
-    endTime = System.currentTimeMillis();
-    LOG.info("ClickStreamGenerator complete. Time elapsed {} seconds.", 
(endTime - startTime) / 1000);
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java
deleted file mode 100644
index 80bf33b..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.AggregationBuilder;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import 
org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
-import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
-import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.joda.time.DateTime;
-import org.joda.time.Seconds;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * An {@link DiscoveryStepAbstract}
- * implementation which detects a known list of Web crawlers which may may be
- * present within, and pollute various logs acting as input to Mudrod.
- */
-public class CrawlerDetection extends LogAbstract {
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(CrawlerDetection.class);
-
-  public static final String CRAWLER = "crawler";
-  public static final String GOOGLE_BOT = "googlebot";
-  public static final String BING_BOT = "bingbot";
-  public static final String YAHOO_BOT = "slurp";
-  public static final String YACY_BOT = "yacybot";
-  public static final String ROGER_BOT = "rogerbot";
-  public static final String YANDEX_BOT = "yandexbot";
-
-  public static final String NO_AGENT_BOT = "-";
-  public static final String PERL_BOT = "libwww-perl/";
-  public static final String APACHE_HHTP = "apache-httpclient/";
-  public static final String JAVA_CLIENT = "java/";
-  public static final String CURL = "curl/";
-
-  /**
-   * Paramterized constructor to instantiate a configured instance of
-   * {@link CrawlerDetection}
-   *
-   * @param props populated {@link java.util.Properties} object
-   * @param es    {@link ESDriver} object to use in
-   *              crawler detection preprocessing.
-   * @param spark {@link SparkDriver} object to use in
-   *              crawler detection preprocessing.
-   */
-  public CrawlerDetection(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  public CrawlerDetection() {
-    super(null, null, null);
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("Starting Crawler detection {}.", httpType);
-    startTime = System.currentTimeMillis();
-    try {
-      checkByRate();
-    } catch (InterruptedException | IOException e) {
-      LOG.error("Encountered an error whilst detecting Web crawlers.", e);
-    }
-    endTime = System.currentTimeMillis();
-    es.refreshIndex();
-    LOG.info("Crawler detection complete. Time elapsed {} seconds", (endTime - 
startTime) / 1000);
-    return null;
-  }
-
-  /**
-   * Check known crawler through crawler agent name list
-   *
-   * @param agent name of a log line
-   * @return 1 if the log is initiated by crawler, 0 otherwise
-   */
-  public boolean checkKnownCrawler(String agent) {
-    agent = agent.toLowerCase();
-    if (agent.contains(CRAWLER) || agent.contains(GOOGLE_BOT) || 
agent.contains(BING_BOT) || agent.contains(APACHE_HHTP) || 
agent.contains(PERL_BOT) || agent.contains(YAHOO_BOT) || agent
-        .contains(YANDEX_BOT) || agent.contains(NO_AGENT_BOT) || 
agent.contains(PERL_BOT) || agent.contains(APACHE_HHTP) || 
agent.contains(JAVA_CLIENT) || agent.contains(CURL)) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  public void checkByRate() throws InterruptedException, IOException {
-    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
-    if (processingType.equals("sequential")) {
-      checkByRateInSequential();
-    } else if (processingType.equals("parallel")) {
-      checkByRateInParallel();
-    }
-  }
-
-  /**
-   * Check crawler by request sending rate, which is read from configruation
-   * file
-   *
-   * @throws InterruptedException InterruptedException
-   * @throws IOException          IOException
-   */
-  public void checkByRateInSequential() throws InterruptedException, 
IOException {
-    es.createBulkProcessor();
-
-    int rate = Integer.parseInt(props.getProperty("sendingrate"));
-
-    Terms users = this.getUserTerms(this.httpType);
-    LOG.info("Original User count: {}", 
Integer.toString(users.getBuckets().size()));
-
-    int userCount = 0;
-    for (Terms.Bucket entry : users.getBuckets()) {
-      String user = entry.getKey().toString();
-      int count = checkByRate(es, user);
-      userCount += count;
-    }
-    es.destroyBulkProcessor();
-    LOG.info("User count: {}", Integer.toString(userCount));
-  }
-
-  void checkByRateInParallel() throws InterruptedException, IOException {
-
-    JavaRDD<String> userRDD = getUserRDD(this.httpType);
-    LOG.info("Original User count: {}", userRDD.count());
-
-    int userCount = 0;
-    userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, 
Integer>) iterator -> {
-      ESDriver tmpES = new ESDriver(props);
-      tmpES.createBulkProcessor();
-      List<Integer> realUserNums = new ArrayList<>();
-      while (iterator.hasNext()) {
-        String s = iterator.next();
-        Integer realUser = checkByRate(tmpES, s);
-        realUserNums.add(realUser);
-      }
-      tmpES.destroyBulkProcessor();
-      tmpES.close();
-      return realUserNums.iterator();
-    }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
-
-    LOG.info("User count: {}", Integer.toString(userCount));
-  }
-
-  private int checkByRate(ESDriver es, String user) {
-
-    int rate = Integer.parseInt(props.getProperty("sendingrate"));
-    Pattern pattern = Pattern.compile("get (.*?) http/*");
-    Matcher matcher;
-
-    BoolQueryBuilder filterSearch = new BoolQueryBuilder();
-    filterSearch.must(QueryBuilders.termQuery("IP", user));
-
-    AggregationBuilder aggregation = 
AggregationBuilders.dateHistogram("by_minute").field("Time").dateHistogramInterval(DateHistogramInterval.MINUTE).order(Order.COUNT_DESC);
-    SearchResponse checkRobot = 
es.getClient().prepareSearch(logIndex).setTypes(httpType, 
ftpType).setQuery(filterSearch).setSize(0).addAggregation(aggregation).execute().actionGet();
-
-    Histogram agg = checkRobot.getAggregations().get("by_minute");
-
-    List<? extends Histogram.Bucket> botList = agg.getBuckets();
-    long maxCount = botList.get(0).getDocCount();
-    if (maxCount >= rate) {
-      return 0;
-    } else {
-      DateTime dt1 = null;
-      int toLast = 0;
-      SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(httpType, 
ftpType).setScroll(new 
TimeValue(60000)).setQuery(filterSearch).setSize(100).execute().actionGet();
-      while (true) {
-        for (SearchHit hit : scrollResp.getHits().getHits()) {
-          Map<String, Object> result = hit.getSource();
-          String logtype = (String) result.get("LogType");
-          if (logtype.equals("PO.DAAC")) {
-            String request = (String) result.get("Request");
-            matcher = pattern.matcher(request.trim().toLowerCase());
-            boolean find = false;
-            while (matcher.find()) {
-              request = matcher.group(1);
-              result.put("RequestUrl", "http://podaac.jpl.nasa.gov"; + request);
-              find = true;
-            }
-            if (!find) {
-              result.put("RequestUrl", request);
-            }
-          } else {
-            result.put("RequestUrl", result.get("Request"));
-          }
-
-          DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
-          DateTime dt2 = fmt.parseDateTime((String) result.get("Time"));
-
-          if (dt1 == null) {
-            toLast = 0;
-          } else {
-            toLast = Math.abs(Seconds.secondsBetween(dt1, dt2).getSeconds());
-          }
-          result.put("ToLast", toLast);
-          IndexRequest ir = new IndexRequest(logIndex, 
cleanupType).source(result);
-
-          es.getBulkProcessor().add(ir);
-          dt1 = dt2;
-        }
-
-        scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
-        if (scrollResp.getHits().getHits().length == 0) {
-          break;
-        }
-      }
-
-    }
-
-    return 1;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java
deleted file mode 100644
index d5dc102..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.*;
-
-/**
- * Supports ability to generate search history (queries) for each individual
- * user (IP)
- */
-public class HistoryGenerator extends LogAbstract {
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(HistoryGenerator.class);
-
-  public HistoryGenerator(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("Starting HistoryGenerator...");
-    startTime = System.currentTimeMillis();
-
-    generateBinaryMatrix();
-
-    endTime = System.currentTimeMillis();
-    LOG.info("HistoryGenerator complete. Time elapsed {} seconds", (endTime - 
startTime) / 1000);
-    return null;
-  }
-
-  /**
-   * Method to generate a binary user*query matrix (stored in temporary .csv
-   * file)
-   */
-  public void generateBinaryMatrix() {
-    try {
-
-      System.out.println(props.getProperty("userHistoryMatrix"));
-      File file = new File(props.getProperty("userHistoryMatrix"));
-      if (file.exists()) {
-        file.delete();
-      }
-
-      file.createNewFile();
-
-      FileWriter fw = new FileWriter(file.getAbsoluteFile());
-      BufferedWriter bw = new BufferedWriter(fw);
-      bw.write("Num" + ",");
-
-      // step 1: write first row of csv
-      List<String> logIndexList = 
es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
-
-      String[] logIndices = logIndexList.toArray(new String[0]);
-      String[] statictypeArray = new String[] { this.sessionStats };
-      int docCount = es.getDocCount(logIndices, statictypeArray);
-
-      SearchResponse sr = 
es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-          
.addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet();
-      Terms ips = sr.getAggregations().get("IPs");
-      List<String> ipList = new ArrayList<>();
-      for (Terms.Bucket entry : ips.getBuckets()) {
-        if (entry.getDocCount() > 
Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) { // 
filter
-          // out
-          // less
-          // active users/ips
-          ipList.add(entry.getKey().toString());
-        }
-      }
-      bw.write(String.join(",", ipList) + "\n");
-
-      // step 2: step the rest rows of csv
-      SearchRequestBuilder sr2Builder = 
es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-          
.addAggregation(AggregationBuilders.terms("KeywordAgg").field("keywords").size(docCount).subAggregation(AggregationBuilders.terms("IPAgg").field("IP").size(docCount)));
-
-      SearchResponse sr2 = sr2Builder.execute().actionGet();
-      Terms keywords = sr2.getAggregations().get("KeywordAgg");
-
-      for (Terms.Bucket keyword : keywords.getBuckets()) {
-
-        Map<String, Integer> ipMap = new HashMap<>();
-        Terms ipAgg = keyword.getAggregations().get("IPAgg");
-
-        int distinctUser = ipAgg.getBuckets().size();
-        if (distinctUser > 
Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) {
-          bw.write(keyword.getKey() + ",");
-          for (Terms.Bucket IP : ipAgg.getBuckets()) {
-
-            ipMap.put(IP.getKey().toString(), 1);
-          }
-          for (int i = 0; i < ipList.size(); i++) {
-            if (ipMap.containsKey(ipList.get(i))) {
-              bw.write(ipMap.get(ipList.get(i)) + ",");
-            } else {
-              bw.write("0,");
-            }
-          }
-          bw.write("\n");
-        }
-      }
-
-      bw.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java
deleted file mode 100644
index c55082a..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ImportLogFile.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.weblog.structure.ApacheAccessLog;
-import gov.nasa.jpl.mudrod.weblog.structure.FtpLog;
-import org.apache.spark.api.java.JavaRDD;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-/**
- * Supports ability to parse and process FTP and HTTP log files
- */
-public class ImportLogFile extends LogAbstract {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ImportLogFile.class);
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-
-  String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) 
\\[([\\w:/]+\\s[+\\-]\\d{4})\\] " + "\"(.+?)\" (\\d{3}) (\\d+|-) 
\"((?:[^\"]|\")+)\" \"([^\"]+)\"";
-
-  public static final int NUM_FIELDS = 9;
-  Pattern p = Pattern.compile(logEntryPattern);
-  transient Matcher matcher;
-
-  /**
-   * Constructor supporting a number of parameters documented below.
-   *
-   * @param props a {@link java.util.Map} containing K,V of type String, String
-   *              respectively.
-   * @param es    the {@link ESDriver} used to persist log
-   *              files.
-   * @param spark the {@link SparkDriver} used to process
-   *              input log files.
-   */
-  public ImportLogFile(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("Starting Log Import {}", 
props.getProperty(MudrodConstants.TIME_SUFFIX));
-    startTime = System.currentTimeMillis();
-    readFile();
-    endTime = System.currentTimeMillis();
-    LOG.info("Log Import complete. Time elapsed {} seconds", (endTime - 
startTime) / 1000);
-    es.refreshIndex();
-    return null;
-  }
-
-  /**
-   * Utility function to aid String to Number formatting such that three letter
-   * months such as 'Jan' are converted to the Gregorian integer equivalent.
-   *
-   * @param time the input {@link java.lang.String} to convert to int.
-   * @return the converted Month as an int.
-   */
-  public String switchtoNum(String time) {
-    String newTime = time;
-    if (newTime.contains("Jan")) {
-      newTime = newTime.replace("Jan", "1");
-    } else if (newTime.contains("Feb")) {
-      newTime = newTime.replace("Feb", "2");
-    } else if (newTime.contains("Mar")) {
-      newTime = newTime.replace("Mar", "3");
-    } else if (newTime.contains("Apr")) {
-      newTime = newTime.replace("Apr", "4");
-    } else if (newTime.contains("May")) {
-      newTime = newTime.replace("May", "5");
-    } else if (newTime.contains("Jun")) {
-      newTime = newTime.replace("Jun", "6");
-    } else if (newTime.contains("Jul")) {
-      newTime = newTime.replace("Jul", "7");
-    } else if (newTime.contains("Aug")) {
-      newTime = newTime.replace("Aug", "8");
-    } else if (newTime.contains("Sep")) {
-      newTime = newTime.replace("Sep", "9");
-    } else if (newTime.contains("Oct")) {
-      newTime = newTime.replace("Oct", "10");
-    } else if (newTime.contains("Nov")) {
-      newTime = newTime.replace("Nov", "11");
-    } else if (newTime.contains("Dec")) {
-      newTime = newTime.replace("Dec", "12");
-    }
-    return newTime;
-  }
-
-  public void readFile() {
-
-    String httplogpath = null;
-    String ftplogpath = null;
-    
-    File directory = new File(props.getProperty(MudrodConstants.DATA_DIR));
-    File[] fList = directory.listFiles();
-    for (File file : fList) {
-      if (file.isFile() && 
file.getName().contains(props.getProperty(MudrodConstants.TIME_SUFFIX))) 
-      {
-        if 
(file.getName().contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) 
-        {
-          httplogpath = file.getAbsolutePath();
-        }
-        
-        if 
(file.getName().contains(props.getProperty(MudrodConstants.FTP_PREFIX))) 
-        {
-          ftplogpath = file.getAbsolutePath();
-        }
-      }
-    }
-    
-    if(httplogpath == null || ftplogpath == null)
-    {
-      LOG.error("WWW file or FTP logs cannot be found, please check your data 
directory.");
-      return;
-    }
-
-    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE, 
"parallel");
-    if (processingType.equals("sequential")) {
-      readFileInSequential(httplogpath, ftplogpath);
-    } else if (processingType.equals("parallel")) {
-      readFileInParallel(httplogpath, ftplogpath);
-    }
-  }
-
-  /**
-   * Read the FTP or HTTP log path with the intention of processing lines from
-   * log files.
-   *
-   * @param httplogpath path to the parent directory containing http logs
-   * @param ftplogpath  path to the parent directory containing ftp logs
-   */
-  public void readFileInSequential(String httplogpath, String ftplogpath) {
-    es.createBulkProcessor();
-    try {
-      readLogFile(httplogpath, "http", logIndex, httpType);
-      readLogFile(ftplogpath, "FTP", logIndex, ftpType);
-
-    } catch (IOException e) {
-      LOG.error("Error whilst reading log file.", e);
-    }
-    es.destroyBulkProcessor();
-  }
-
-  /**
-   * Read the FTP or HTTP log path with the intention of processing lines from
-   * log files.
-   *
-   * @param httplogpath path to the parent directory containing http logs
-   * @param ftplogpath  path to the parent directory containing ftp logs
-   */
-  public void readFileInParallel(String httplogpath, String ftplogpath) {
-
-    importHttpfile(httplogpath);
-    importFtpfile(ftplogpath);
-  }
-
-  public void importHttpfile(String httplogpath) {
-    // import http logs
-    JavaRDD<String> accessLogs = spark.sc.textFile(httplogpath, 
this.partition).map(s -> 
ApacheAccessLog.parseFromLogLine(s)).filter(ApacheAccessLog::checknull);
-
-    JavaEsSpark.saveJsonToEs(accessLogs, logIndex + "/" + this.httpType);
-  }
-
-  public void importFtpfile(String ftplogpath) {
-    // import ftp logs
-    JavaRDD<String> ftpLogs = spark.sc.textFile(ftplogpath, 
this.partition).map(s -> FtpLog.parseFromLogLine(s)).filter(FtpLog::checknull);
-
-    JavaEsSpark.saveJsonToEs(ftpLogs, logIndex + "/" + this.ftpType);
-  }
-
-  /**
-   * Process a log path on local file system which contains the relevant
-   * parameters as below.
-   *
-   * @param fileName the {@link java.lang.String} path to the log directory on 
file
-   *                 system
-   * @param protocol whether to process 'http' or 'FTP'
-   * @param index    the index name to write logs to
-   * @param type     one of the available protocols from which Mudrod logs are 
obtained.
-   * @throws IOException if there is an error reading anything from the 
fileName provided.
-   */
-  public void readLogFile(String fileName, String protocol, String index, 
String type) throws IOException {
-    BufferedReader br = new BufferedReader(new FileReader(fileName));
-    int count = 0;
-    try {
-      String line = br.readLine();
-      while (line != null) {
-        if ("FTP".equals(protocol)) {
-          parseSingleLineFTP(line, index, type);
-        } else {
-          parseSingleLineHTTP(line, index, type);
-        }
-        line = br.readLine();
-        count++;
-      }
-    } catch (FileNotFoundException e) {
-      LOG.error("File not found.", e);
-    } catch (IOException e) {
-      LOG.error("Error reading input directory.", e);
-    } finally {
-      br.close();
-      LOG.info("Num of {} entries:\t{}", protocol, count);
-    }
-  }
-
-  /**
-   * Parse a single FTP log entry
-   *
-   * @param log   a single log line
-   * @param index the index name we wish to persist the log line to
-   * @param type  one of the available protocols from which Mudrod logs are 
obtained.
-   */
-  public void parseSingleLineFTP(String log, String index, String type) {
-    String ip = log.split(" +")[6];
-
-    String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + 
log.split(" +")[3] + ":" + log.split(" +")[4];
-
-    time = switchtoNum(time);
-    SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy");
-    Date date = null;
-    try {
-      date = formatter.parse(time);
-    } catch (ParseException e) {
-      LOG.error("Error whilst parsing the date.", e);
-    }
-    String bytes = log.split(" +")[7];
-
-    String request = log.split(" +")[8].toLowerCase();
-
-    if (!request.contains("/misc/") && !request.contains("readme")) {
-      IndexRequest ir;
-      try {
-        ir = new IndexRequest(index, type)
-            .source(jsonBuilder().startObject().field("LogType", 
"ftp").field("IP", ip).field("Time", date).field("Request", 
request).field("Bytes", Long.parseLong(bytes)).endObject());
-        es.getBulkProcessor().add(ir);
-      } catch (NumberFormatException e) {
-        LOG.error("Error whilst processing numbers", e);
-      } catch (IOException e) {
-        LOG.error("IOError whilst adding to the bulk processor.", e);
-      }
-    }
-
-  }
-
-  /**
-   * Parse a single HTTP log entry
-   *
-   * @param log   a single log line
-   * @param index the index name we wish to persist the log line to
-   * @param type  one of the available protocols from which Mudrod logs are 
obtained.
-   */
-  public void parseSingleLineHTTP(String log, String index, String type) {
-    matcher = p.matcher(log);
-    if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) {
-      return;
-    }
-    String time = matcher.group(4);
-    time = switchtoNum(time);
-    SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
-    Date date = null;
-    try {
-      date = formatter.parse(time);
-    } catch (ParseException e) {
-      LOG.error("Error whilst attempting to parse date.", e);
-    }
-
-    String bytes = matcher.group(7);
-    if ("-".equals(bytes)) {
-      bytes = "0";
-    }
-
-    String request = matcher.group(5).toLowerCase();
-    String agent = matcher.group(9);
-    CrawlerDetection crawlerDe = new CrawlerDetection(this.props, this.es, 
this.spark);
-    if (!crawlerDe.checkKnownCrawler(agent)) {
-      boolean tag = false;
-      String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", 
"image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / 
http/1.1", ".jpeg", "/ws/" };
-      for (int i = 0; i < mimeTypes.length; i++) {
-        if (request.contains(mimeTypes[i])) {
-          tag = true;
-          break;
-        }
-      }
-
-      if (!tag) {
-        IndexRequest ir = null;
-        executeBulkRequest(ir, index, type, matcher, date, bytes);
-      }
-    }
-  }
-
-  private void executeBulkRequest(IndexRequest ir, String index, String type, 
Matcher matcher, Date date, String bytes) {
-    IndexRequest newIr = ir;
-    try {
-      newIr = new IndexRequest(index, type).source(
-          jsonBuilder().startObject().field("LogType", "PO.DAAC").field("IP", 
matcher.group(1)).field("Time", date).field("Request", 
matcher.group(5)).field("Response", matcher.group(6))
-              .field("Bytes", Integer.parseInt(bytes)).field("Referer", 
matcher.group(8)).field("Browser", matcher.group(9)).endObject());
-
-      es.getBulkProcessor().add(newIr);
-    } catch (NumberFormatException e) {
-      LOG.error("Error whilst processing numbers", e);
-    } catch (IOException e) {
-      LOG.error("IOError whilst adding to the bulk processor.", e);
-    }
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java
deleted file mode 100644
index 5b8ed9b..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/LogAbstract.java
+++ /dev/null
@@ -1,228 +0,0 @@
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.weblog.partition.KGreedyPartitionSolver;
-import gov.nasa.jpl.mudrod.weblog.partition.ThePartitionProblemSolver;
-import gov.nasa.jpl.mudrod.weblog.partition.logPartitioner;
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.Partition;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.aggregations.AggregationBuilder;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import 
org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
-import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
-import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-
-public class LogAbstract extends DiscoveryStepAbstract {
-
-  /**
-   * 
-   */
-  private static final long serialVersionUID = 1L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(LogAbstract.class);
-
-  public String logIndex = null;
-  public String httpType = null;
-  public String ftpType = null;
-  public String cleanupType = null;
-  public String sessionStats = null;
-  public int partition = 96;
-
-  public LogAbstract(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-    if (props != null) {
-      initLogIndex();
-    }
-  }
-
-  protected void initLogIndex() {
-    logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + 
props.getProperty(MudrodConstants.TIME_SUFFIX);
-    httpType = props.getProperty(MudrodConstants.HTTP_TYPE_PREFIX);
-    ftpType = props.getProperty(MudrodConstants.FTP_TYPE_PREFIX);
-    cleanupType = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX);
-    sessionStats = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX);
-
-    InputStream settingsStream = 
getClass().getClassLoader().getResourceAsStream(ES_SETTINGS);
-    InputStream mappingsStream = 
getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS);
-    JSONObject settingsJSON = null;
-    JSONObject mappingJSON = null;
-
-    try {
-      settingsJSON = new JSONObject(IOUtils.toString(settingsStream));
-    } catch (JSONException | IOException e1) {
-      LOG.error("Error reading Elasticsearch settings!", e1);
-    }
-
-    try {
-      mappingJSON = new JSONObject(IOUtils.toString(mappingsStream));
-    } catch (JSONException | IOException e1) {
-      LOG.error("Error reading Elasticsearch mappings!", e1);
-    }
-
-    try {
-      if (settingsJSON != null && mappingJSON != null) {
-        this.es.putMapping(logIndex, settingsJSON.toString(), 
mappingJSON.toString());
-      }
-    } catch (IOException e) {
-      LOG.error("Error entering Elasticsearch Mappings!", e);
-    }
-  }
-
-  @Override
-  public Object execute() {
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-  public JavaRDD<String> getUserRDD(String... type) {
-    Map<String, Double> userDocs = getUserDocs(type);
-    return parallizeUsers(userDocs);
-  }
-
-  public List<String> getUsers(String type) {
-
-    Terms users = this.getUserTerms(type);
-    List<String> userList = new ArrayList<>();
-    for (Terms.Bucket entry : users.getBuckets()) {
-      String ip = (String) entry.getKey();
-      userList.add(ip);
-    }
-
-    return userList;
-  }
-
-  public Terms getUserTerms(String... type) {
-
-    int docCount = es.getDocCount(logIndex, type);
-
-    SearchResponse sr = 
es.getClient().prepareSearch(logIndex).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-        
.addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount)).execute().actionGet();
-    return sr.getAggregations().get("Users");
-  }
-
-  public Map<String, Double> getUserDocs(String... type) {
-
-    Terms users = this.getUserTerms(type);
-    Map<String, Double> userList = new HashMap<>();
-    for (Terms.Bucket entry : users.getBuckets()) {
-      String ip = (String) entry.getKey();
-      Long count = entry.getDocCount();
-      userList.put(ip, Double.valueOf(count));
-    }
-
-    return userList;
-  }
-
-  public Map<String, Long> getUserDailyDocs() {
-
-    int docCount = es.getDocCount(logIndex, httpType);
-
-    AggregationBuilder dailyAgg = 
AggregationBuilders.dateHistogram("by_day").field("Time").dateHistogramInterval(DateHistogramInterval.DAY).order(Order.COUNT_DESC);
-
-    SearchResponse sr = 
es.getClient().prepareSearch(logIndex).setTypes(httpType).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
-        
.addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount).subAggregation(dailyAgg)).execute().actionGet();
-    Terms users = sr.getAggregations().get("Users");
-    Map<String, Long> userList = new HashMap<>();
-    for (Terms.Bucket user : users.getBuckets()) {
-      String ip = (String) user.getKey();
-
-      System.out.println(ip);
-
-      Histogram agg = user.getAggregations().get("by_day");
-      List<? extends Histogram.Bucket> dateList = agg.getBuckets();
-      int size = dateList.size();
-      for (int i = 0; i < size; i++) {
-        Long count = dateList.get(i).getDocCount();
-        String date = dateList.get(i).getKey().toString();
-
-        System.out.println(date);
-        System.out.println(count);
-      }
-    }
-
-    return userList;
-  }
-
-  protected void checkUserPartition(JavaRDD<String> userRDD) {
-    System.out.println("hhhhh");
-    List<Partition> partitios = userRDD.partitions();
-    System.out.println(partitios.size());
-    int[] partitionIds = new int[partitios.size()];
-    for (int i = 0; i < partitios.size(); i++) {
-      int index = partitios.get(i).index();
-      partitionIds[i] = index;
-    }
-
-    List<String>[] userIPs = userRDD.collectPartitions(partitionIds);
-    for (int i = 0; i < userIPs.length; i++) {
-      List<String> iuser = userIPs[i];
-      System.out.println(i + " partition");
-      System.out.println(iuser.toString());
-    }
-  }
-
-  public JavaRDD<String> parallizeUsers(Map<String, Double> userDocs) {
-
-    // prepare list for parallize
-    List<Tuple2<String, Double>> list = new ArrayList<>();
-    for (String user : userDocs.keySet()) {
-      list.add(new Tuple2<String, Double>(user, userDocs.get(user)));
-    }
-
-    // group users
-    ThePartitionProblemSolver solution = new KGreedyPartitionSolver();
-    Map<String, Integer> userGroups = solution.solve(userDocs, this.partition);
-
-    JavaPairRDD<String, Double> pairRdd = spark.sc.parallelizePairs(list);
-    JavaPairRDD<String, Double> userPairRDD = pairRdd.partitionBy(new 
logPartitioner(userGroups, this.partition));
-
-    // repartitioned user RDD
-    return userPairRDD.keys();
-  }
-
-  public Terms getSessionTerms() {
-
-    int docCount = es.getDocCount(this.logIndex, this.cleanupType);
-
-    SearchResponse sr = 
es.getClient().prepareSearch(this.logIndex).setTypes(this.cleanupType).setQuery(QueryBuilders.matchAllQuery())
-        
.addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount)).execute().actionGet();
-
-    Terms Sessions = sr.getAggregations().get("Sessions");
-    return Sessions;
-  }
-
-  public List<String> getSessions() {
-
-    Terms sessions = this.getSessionTerms();
-    List<String> sessionList = new ArrayList<>();
-    for (Terms.Bucket entry : sessions.getBuckets()) {
-      if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) {
-        String session = (String) entry.getKey();
-        sessionList.add(session);
-      }
-    }
-
-    return sessionList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java
 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java
deleted file mode 100644
index 12d7386..0000000
--- 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RankingTrainDataGenerator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.weblog.structure.RankingTrainData;
-import gov.nasa.jpl.mudrod.weblog.structure.SessionExtractor;
-import org.apache.spark.api.java.JavaRDD;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-public class RankingTrainDataGenerator extends DiscoveryStepAbstract {
-
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(RankingTrainDataGenerator.class);
-
-  public RankingTrainDataGenerator(Properties props, ESDriver es, SparkDriver 
spark) {
-    super(props, es, spark);
-    // TODO Auto-generated constructor stub
-  }
-
-  @Override
-  public Object execute() {
-    // TODO Auto-generated method stub
-    LOG.info("Starting generate ranking train data.");
-    startTime = System.currentTimeMillis();
-
-    String rankingTrainFile = 
"E:\\Mudrod_input_data\\Testing_Data_4_1monthLog+Meta+Onto\\traing.txt";
-    try {
-      SessionExtractor extractor = new SessionExtractor();
-      JavaRDD<RankingTrainData> rankingTrainDataRDD = 
extractor.extractRankingTrainData(this.props, this.es, this.spark);
-
-      JavaRDD<String> rankingTrainData_JsonRDD = rankingTrainDataRDD.map(f -> 
f.toJson());
-
-      rankingTrainData_JsonRDD.coalesce(1, 
true).saveAsTextFile(rankingTrainFile);
-
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-
-    endTime = System.currentTimeMillis();
-    LOG.info("Ranking train data generation complete. Time elapsed {} 
seconds.", (endTime - startTime) / 1000);
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java
deleted file mode 100644
index 01b57c0..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/RemoveRawLog.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-/**
- * Supports ability to remove raw logs after processing is finished
- */
-public class RemoveRawLog extends LogAbstract {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveRawLog.class);
-
-  public RemoveRawLog(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("Starting raw log removal.");
-    startTime = System.currentTimeMillis();
-    es.deleteAllByQuery(logIndex, httpType, QueryBuilders.matchAllQuery());
-    es.deleteAllByQuery(logIndex, ftpType, QueryBuilders.matchAllQuery());
-    endTime = System.currentTimeMillis();
-    es.refreshIndex();
-    LOG.info("Raw log removal complete. Time elapsed {} seconds.", (endTime - 
startTime) / 1000);
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java
deleted file mode 100644
index b95e30b..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionGenerator.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.weblog.structure.Session;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.metrics.stats.Stats;
-import 
org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
-import org.elasticsearch.search.sort.SortOrder;
-import org.joda.time.DateTime;
-import org.joda.time.Seconds;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-/**
- * Supports ability to generate user session by time threshold and referrer
- */
-public class SessionGenerator extends LogAbstract {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(SessionGenerator.class);
-
-  public SessionGenerator(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("Starting Session Generation.");
-    startTime = System.currentTimeMillis();
-    generateSession();
-    endTime = System.currentTimeMillis();
-    es.refreshIndex();
-    LOG.info("Session generating complete. Time elapsed {} seconds.", (endTime 
- startTime) / 1000);
-    return null;
-  }
-
-  public void generateSession() {
-    try {
-      es.createBulkProcessor();
-      genSessionByReferer(Integer.parseInt(props.getProperty("timegap")));
-      es.destroyBulkProcessor();
-
-      es.createBulkProcessor();
-      combineShortSessions(Integer.parseInt(props.getProperty("timegap")));
-      es.destroyBulkProcessor();
-    } catch (ElasticsearchException e) {
-      LOG.error("Error whilst executing bulk processor.", e);
-    } catch (IOException e) {
-      LOG.error("Error whilst reading configuration.", e);
-    } catch (NumberFormatException e) {
-      e.printStackTrace();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-
-  public void genSessionByReferer(int timeThres) throws InterruptedException, 
IOException {
-    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
-    if (processingType.equals("sequential")) {
-      genSessionByRefererInSequential(timeThres);
-    } else if (processingType.equals("parallel")) {
-      genSessionByRefererInParallel(timeThres);
-    }
-  }
-
-  public void combineShortSessions(int timeThres) throws InterruptedException, 
IOException {
-    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
-    if (processingType.equals("sequential")) {
-      combineShortSessionsInSequential(timeThres);
-    } else if (processingType.equals("parallel")) {
-      combineShortSessionsInParallel(timeThres);
-    }
-  }
-
-  /**
-   * Method to generate session by time threshold and referrer
-   *
-   * @param timeThres value of time threshold (s)
-   * @throws ElasticsearchException ElasticsearchException
-   * @throws IOException            IOException
-   */
-  public void genSessionByRefererInSequential(int timeThres) throws 
ElasticsearchException, IOException {
-
-    Terms users = this.getUserTerms(this.cleanupType);
-
-    int sessionCount = 0;
-    for (Terms.Bucket entry : users.getBuckets()) {
-
-      String user = (String) entry.getKey();
-      Integer sessionNum = genSessionByReferer(es, user, timeThres);
-      sessionCount += sessionNum;
-    }
-
-    LOG.info("Initial session count: {}", Integer.toString(sessionCount));
-  }
-
-  public void combineShortSessionsInSequential(int timeThres) throws 
ElasticsearchException, IOException {
-
-    Terms users = this.getUserTerms(this.cleanupType);
-    for (Terms.Bucket entry : users.getBuckets()) {
-      String user = entry.getKey().toString();
-      combineShortSessions(es, user, timeThres);
-    }
-  }
-
-  /**
-   * Method to remove invalid logs through IP address
-   *
-   * @param es an instantiated es driver
-   * @param ip invalid IP address
-   * @throws ElasticsearchException ElasticsearchException
-   * @throws IOException            IOException
-   */
-  public void deleteInvalid(ESDriver es, String ip) throws IOException {
-
-    BoolQueryBuilder filterAll = new BoolQueryBuilder();
-    filterAll.must(QueryBuilders.termQuery("IP", ip));
-
-    SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterAll).setSize(100).execute().actionGet();
-    while (true) {
-      for (SearchHit hit : scrollResp.getHits().getHits()) {
-        update(es, logIndex, cleanupType, hit.getId(), "SessionID", "invalid");
-      }
-
-      scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
-      if (scrollResp.getHits().getHits().length == 0) {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Method to update a Elasticsearch record/document by id, field, and value
-   *
-   * @param es
-   * @param index  index name is Elasticsearch
-   * @param type   type name
-   * @param id     ID of the document that needs to be updated
-   * @param field1 field of the document that needs to be updated
-   * @param value1 value of the document that needs to be changed to
-   * @throws ElasticsearchException
-   * @throws IOException
-   */
-  private void update(ESDriver es, String index, String type, String id, 
String field1, Object value1) throws IOException {
-    UpdateRequest ur = new UpdateRequest(index, type, 
id).doc(jsonBuilder().startObject().field(field1, value1).endObject());
-    es.getBulkProcessor().add(ur);
-  }
-
-  public void genSessionByRefererInParallel(int timeThres) throws 
InterruptedException, IOException {
-
-    JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
-
-    int sessionCount = 0;
-    sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, 
Integer>() {
-      /**
-       *
-       */
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
-        ESDriver tmpES = new ESDriver(props);
-        tmpES.createBulkProcessor();
-        List<Integer> sessionNums = new ArrayList<>();
-        while (arg0.hasNext()) {
-          String s = arg0.next();
-          Integer sessionNum = genSessionByReferer(tmpES, s, timeThres);
-          sessionNums.add(sessionNum);
-        }
-        tmpES.destroyBulkProcessor();
-        tmpES.close();
-        return sessionNums.iterator();
-      }
-    }).reduce(new Function2<Integer, Integer, Integer>() {
-      /**
-       *
-       */
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public Integer call(Integer a, Integer b) {
-        return a + b;
-      }
-    });
-
-    LOG.info("Initial Session count: {}", Integer.toString(sessionCount));
-  }
-
-  public int genSessionByReferer(ESDriver es, String user, int timeThres) 
throws ElasticsearchException, IOException {
-
-    String startTime = null;
-    int sessionCountIn = 0;
-
-    BoolQueryBuilder filterSearch = new BoolQueryBuilder();
-    filterSearch.must(QueryBuilders.termQuery("IP", user));
-
-    SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterSearch).addSort("Time", 
SortOrder.ASC).setSize(100)
-        .execute().actionGet();
-
-    Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>();
-    String request = "";
-    String referer = "";
-    String logType = "";
-    String id = "";
-    String ip = user;
-    String indexUrl = "http://podaac.jpl.nasa.gov/";;
-    DateTime time = null;
-    DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
-
-    while (scrollResp.getHits().getHits().length != 0) {
-      for (SearchHit hit : scrollResp.getHits().getHits()) {
-        Map<String, Object> result = hit.getSource();
-        request = (String) result.get("RequestUrl");
-        referer = (String) result.get("Referer");
-        logType = (String) result.get("LogType");
-        time = fmt.parseDateTime((String) result.get("Time"));
-        id = hit.getId();
-
-        if ("PO.DAAC".equals(logType)) {
-          if ("-".equals(referer) || referer.equals(indexUrl) || 
!referer.contains(indexUrl)) {
-            sessionCountIn++;
-            sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, 
DateTime>());
-            sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
-
-            update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + 
sessionCountIn);
-
-          } else {
-            int count = sessionCountIn;
-            int rollbackNum = 0;
-            while (true) {
-              Map<String, DateTime> requests = sessionReqs.get(ip + "@" + 
count);
-              if (requests == null) {
-                sessionReqs.put(ip + "@" + count, new HashMap<String, 
DateTime>());
-                sessionReqs.get(ip + "@" + count).put(request, time);
-                update(es, logIndex, this.cleanupType, id, "SessionID", ip + 
"@" + count);
-
-                break;
-              }
-              ArrayList<String> keys = new ArrayList<>(requests.keySet());
-              boolean bFindRefer = false;
-
-              for (int i = keys.size() - 1; i >= 0; i--) {
-                rollbackNum++;
-                if (keys.get(i).equalsIgnoreCase(referer)) {
-                  bFindRefer = true;
-                  // threshold,if time interval > 10*
-                  // click num, start a new session
-                  if 
(Math.abs(Seconds.secondsBetween(requests.get(keys.get(i)), time).getSeconds()) 
< timeThres * rollbackNum) {
-                    sessionReqs.get(ip + "@" + count).put(request, time);
-                    update(es, logIndex, this.cleanupType, id, "SessionID", ip 
+ "@" + count);
-                  } else {
-                    sessionCountIn++;
-                    sessionReqs.put(ip + "@" + sessionCountIn, new 
HashMap<String, DateTime>());
-                    sessionReqs.get(ip + "@" + sessionCountIn).put(request, 
time);
-                    update(es, logIndex, this.cleanupType, id, "SessionID", ip 
+ "@" + sessionCountIn);
-                  }
-
-                  break;
-                }
-              }
-
-              if (bFindRefer) {
-                break;
-              }
-
-              count--;
-              if (count < 0) {
-                sessionCountIn++;
-
-                sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, 
DateTime>());
-                sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
-                update(es, props.getProperty(MudrodConstants.ES_INDEX_NAME), 
this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn);
-
-                break;
-              }
-            }
-          }
-        } else if ("ftp".equals(logType)) {
-
-          // may affect computation efficiency
-          Map<String, DateTime> requests = sessionReqs.get(ip + "@" + 
sessionCountIn);
-          if (requests == null) {
-            sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, 
DateTime>());
-          } else {
-            ArrayList<String> keys = new ArrayList<>(requests.keySet());
-            int size = keys.size();
-            if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(size - 
1)), time).getSeconds()) > timeThres) {
-              sessionCountIn += 1;
-              sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, 
DateTime>());
-            }
-          }
-          sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
-          update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + 
sessionCountIn);
-        }
-      }
-
-      scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
-    }
-
-    return sessionCountIn;
-  }
-
-  public void combineShortSessionsInParallel(int timeThres) throws 
InterruptedException, IOException {
-
-    JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
-
-    userRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
-      /**
-       *
-       */
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public void call(Iterator<String> arg0) throws Exception {
-        ESDriver tmpES = new ESDriver(props);
-        tmpES.createBulkProcessor();
-        while (arg0.hasNext()) {
-          String s = arg0.next();
-          combineShortSessions(tmpES, s, timeThres);
-        }
-        tmpES.destroyBulkProcessor();
-        tmpES.close();
-      }
-    });
-  }
-
-  public void combineShortSessions(ESDriver es, String user, int timeThres) 
throws ElasticsearchException, IOException {
-
-    BoolQueryBuilder filterSearch = new BoolQueryBuilder();
-    filterSearch.must(QueryBuilders.termQuery("IP", user));
-
-    String[] indexArr = new String[] { logIndex };
-    String[] typeArr = new String[] { cleanupType };
-    int docCount = es.getDocCount(indexArr, typeArr, filterSearch);
-
-    if (docCount < 3) {
-      deleteInvalid(es, user);
-      return;
-    }
-
-    BoolQueryBuilder filterCheck = new BoolQueryBuilder();
-    filterCheck.must(QueryBuilders.termQuery("IP", 
user)).must(QueryBuilders.termQuery("Referer", "-"));
-    SearchResponse checkReferer = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterCheck).setSize(0).execute().actionGet();
-
-    long numInvalid = checkReferer.getHits().getTotalHits();
-    double invalidRate = numInvalid / docCount;
-
-    if (invalidRate >= 0.8) {
-      deleteInvalid(es, user);
-      return;
-    }
-
-    StatsAggregationBuilder statsAgg = 
AggregationBuilders.stats("Stats").field("Time");
-    SearchResponse srSession = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterSearch)
-        
.addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount).subAggregation(statsAgg)).execute().actionGet();
-
-    Terms sessions = srSession.getAggregations().get("Sessions");
-
-    List<Session> sessionList = new ArrayList<>();
-    for (Terms.Bucket session : sessions.getBuckets()) {
-      Stats agg = session.getAggregations().get("Stats");
-      Session sess = new Session(props, es, agg.getMinAsString(), 
agg.getMaxAsString(), session.getKey().toString());
-      sessionList.add(sess);
-    }
-
-    Collections.sort(sessionList);
-    DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
-    String last = null;
-    String lastnewID = null;
-    String lastoldID = null;
-    String current = null;
-    for (Session s : sessionList) {
-      current = s.getEndTime();
-      if (last != null) {
-        if (Seconds.secondsBetween(fmt.parseDateTime(last), 
fmt.parseDateTime(current)).getSeconds() < timeThres) {
-          if (lastnewID == null) {
-            s.setNewID(lastoldID);
-          } else {
-            s.setNewID(lastnewID);
-          }
-
-          QueryBuilder fs = 
QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", 
s.getID()));
-
-          SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(fs).setSize(100).execute().actionGet();
-          while (true) {
-            for (SearchHit hit : scrollResp.getHits().getHits()) {
-              if (lastnewID == null) {
-                update(es, logIndex, this.cleanupType, hit.getId(), 
"SessionID", lastoldID);
-              } else {
-                update(es, logIndex, this.cleanupType, hit.getId(), 
"SessionID", lastnewID);
-              }
-            }
-
-            scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
-            if (scrollResp.getHits().getHits().length == 0) {
-              break;
-            }
-          }
-        }
-        ;
-      }
-      lastoldID = s.getID();
-      lastnewID = s.getNewID();
-      last = current;
-    }
-
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java
deleted file mode 100644
index 7e7640f..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/SessionStatistic.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.weblog.pre;
-
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.weblog.structure.RequestUrl;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.metrics.stats.Stats;
-import 
org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
-import org.joda.time.DateTime;
-import org.joda.time.Seconds;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-/**
- * Supports ability to post-process session, including summarizing statistics
- * and filtering
- */
-public class SessionStatistic extends LogAbstract {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(SessionStatistic.class);
-
-  public SessionStatistic(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("Starting Session Summarization.");
-    startTime = System.currentTimeMillis();
-    try {
-      processSession();
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    } catch (ExecutionException e) {
-      e.printStackTrace();
-    }
-    endTime = System.currentTimeMillis();
-    es.refreshIndex();
-    LOG.info("Session Summarization complete. Time elapsed {} seconds.", 
(endTime - startTime) / 1000);
-    return null;
-  }
-
-  public void processSession() throws InterruptedException, IOException, 
ExecutionException {
-    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
-    if (processingType.equals("sequential")) {
-      processSessionInSequential();
-    } else if (processingType.equals("parallel")) {
-      processSessionInParallel();
-    }
-  }
-
-  public void processSessionInSequential() throws IOException, 
InterruptedException, ExecutionException {
-    es.createBulkProcessor();
-    Terms Sessions = this.getSessionTerms();
-    int session_count = 0;
-    for (Terms.Bucket entry : Sessions.getBuckets()) {
-      if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) {
-        String sessionid = entry.getKey().toString();
-        int sessionNum = processSession(es, sessionid);
-        session_count += sessionNum;
-      }
-    }
-    LOG.info("Final Session count: {}", Integer.toString(session_count));
-    es.destroyBulkProcessor();
-  }
-
-  /**
-   * Extract the dataset ID from a long request
-   *
-   * @param request raw log request
-   * @return dataset ID
-   */
-  public String findDataset(String request) {
-    String pattern1 = "/dataset/";
-    String pattern2;
-    if (request.contains("?")) {
-      pattern2 = "?";
-    } else {
-      pattern2 = " ";
-    }
-
-    Pattern p = Pattern.compile(Pattern.quote(pattern1) + "(.*?)" + 
Pattern.quote(pattern2));
-    Matcher m = p.matcher(request);
-    if (m.find()) {
-      return m.group(1);
-    }
-    return null;
-  }
-
-  public void processSessionInParallel() throws InterruptedException, 
IOException {
-
-    List<String> sessions = this.getSessions();
-    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition);
-
-    int sessionCount = 0;
-    sessionCount = sessionRDD.mapPartitions(new 
FlatMapFunction<Iterator<String>, Integer>() {
-      @Override
-      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
-        ESDriver tmpES = new ESDriver(props);
-        tmpES.createBulkProcessor();
-        List<Integer> sessionNums = new ArrayList<Integer>();
-        sessionNums.add(0);
-        while (arg0.hasNext()) {
-          String s = arg0.next();
-          Integer sessionNum = processSession(tmpES, s);
-          sessionNums.add(sessionNum);
-        }
-        tmpES.destroyBulkProcessor();
-        tmpES.close();
-        return sessionNums.iterator();
-      }
-    }).reduce(new Function2<Integer, Integer, Integer>() {
-      @Override
-      public Integer call(Integer a, Integer b) {
-        return a + b;
-      }
-    });
-
-    LOG.info("Final Session count: {}", Integer.toString(sessionCount));
-  }
-
-  public int processSession(ESDriver es, String sessionId) throws IOException, 
InterruptedException, ExecutionException {
-
-    String inputType = cleanupType;
-    String outputType = sessionStats;
-
-    DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
-    String min = null;
-    String max = null;
-    DateTime start = null;
-    DateTime end = null;
-    int duration = 0;
-    float request_rate = 0;
-
-    int session_count = 0;
-    Pattern pattern = Pattern.compile("get (.*?) http/*");
-
-    StatsAggregationBuilder statsAgg = 
AggregationBuilders.stats("Stats").field("Time");
-
-    BoolQueryBuilder filter_search = new BoolQueryBuilder();
-    filter_search.must(QueryBuilders.termQuery("SessionID", sessionId));
-
-    SearchResponse sr = 
es.getClient().prepareSearch(logIndex).setTypes(inputType).setQuery(filter_search).addAggregation(statsAgg).execute().actionGet();
-
-    Stats agg = sr.getAggregations().get("Stats");
-    min = agg.getMinAsString();
-    max = agg.getMaxAsString();
-    start = fmt.parseDateTime(min);
-    end = fmt.parseDateTime(max);
-
-    duration = Seconds.secondsBetween(start, end).getSeconds();
-
-    int searchDataListRequest_count = 0;
-    int searchDataRequest_count = 0;
-    int searchDataListRequest_byKeywords_count = 0;
-    int ftpRequest_count = 0;
-    int keywords_num = 0;
-
-    String IP = null;
-    String keywords = "";
-    String views = "";
-    String downloads = "";
-
-    SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(inputType).setScroll(new 
TimeValue(60000)).setQuery(filter_search).setSize(100).execute().actionGet();
-
-    while (true) {
-      for (SearchHit hit : scrollResp.getHits().getHits()) {
-        Map<String, Object> result = hit.getSource();
-
-        String request = (String) result.get("Request");
-        String logType = (String) result.get("LogType");
-        IP = (String) result.get("IP");
-        Matcher matcher = pattern.matcher(request.trim().toLowerCase());
-        while (matcher.find()) {
-          request = matcher.group(1);
-        }
-
-        String datasetlist = "/datasetlist?";
-        String dataset = "/dataset/";
-        if (request.contains(datasetlist)) {
-          searchDataListRequest_count++;
-
-          RequestUrl requestURL = new RequestUrl();
-          String infoStr = requestURL.getSearchInfo(request) + ",";
-          String info = es.customAnalyzing(props.getProperty("indexName"), 
infoStr);
-
-          if (!info.equals(",")) {
-            if (keywords.equals("")) {
-              keywords = keywords + info;
-            } else {
-              String[] items = info.split(",");
-              String[] keywordList = keywords.split(",");
-              for (int m = 0; m < items.length; m++) {
-                if (!Arrays.asList(keywordList).contains(items[m])) {
-                  keywords = keywords + items[m] + ",";
-                }
-              }
-            }
-          }
-
-        }
-        if (request.startsWith(dataset)) {
-          searchDataRequest_count++;
-          if (findDataset(request) != null) {
-            String view = findDataset(request);
-
-            if ("".equals(views)) {
-              views = view;
-            } else {
-              if (views.contains(view)) {
-
-              } else {
-                views = views + "," + view;
-              }
-            }
-          }
-        }
-        if ("ftp".equals(logType)) {
-          ftpRequest_count++;
-          String download = "";
-          String requestLowercase = request.toLowerCase();
-          if (requestLowercase.endsWith(".jpg") == false && 
requestLowercase.endsWith(".pdf") == false && requestLowercase.endsWith(".txt") 
== false && requestLowercase.endsWith(".gif") == false) {
-            download = request;
-          }
-
-          if ("".equals(downloads)) {
-            downloads = download;
-          } else {
-            if (downloads.contains(download)) {
-
-            } else {
-              downloads = downloads + "," + download;
-            }
-          }
-        }
-
-      }
-
-      scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
-      // Break condition: No hits are returned
-      if (scrollResp.getHits().getHits().length == 0) {
-        break;
-      }
-    }
-
-    if (!keywords.equals("")) {
-      keywords_num = keywords.split(",").length;
-    }
-
-    if (searchDataListRequest_count != 0 && searchDataListRequest_count <= 
Integer.parseInt(props.getProperty("searchf")) && searchDataRequest_count != 0 
&& searchDataRequest_count <= Integer
-        .parseInt(props.getProperty("viewf")) && ftpRequest_count <= 
Integer.parseInt(props.getProperty("downloadf"))) {
-      String sessionURL = props.getProperty("SessionPort") + 
props.getProperty("SessionUrl") + "?sessionid=" + sessionId + "&sessionType=" + 
outputType + "&requestType=" + inputType;
-      session_count = 1;
-
-      IndexRequest ir = new IndexRequest(logIndex, outputType).source(
-          jsonBuilder().startObject().field("SessionID", 
sessionId).field("SessionURL", sessionURL).field("Duration", 
duration).field("Number of Keywords", keywords_num).field("Time", min)
-              .field("End_time", max).field("searchDataListRequest_count", 
searchDataListRequest_count).field("searchDataListRequest_byKeywords_count", 
searchDataListRequest_byKeywords_count)
-              .field("searchDataRequest_count", 
searchDataRequest_count).field("keywords", es.customAnalyzing(logIndex, 
keywords)).field("views", views).field("downloads", downloads)
-              .field("request_rate", request_rate).field("Comments", 
"").field("Validation", 0).field("Produceby", 0).field("Correlation", 
0).field("IP", IP).endObject());
-
-      es.getBulkProcessor().add(ir);
-    }
-
-    return session_count;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java
deleted file mode 100644
index 5a3a1c3..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes Preprocessing for all functionality required by the
- * {@link gov.nasa.jpl.mudrod.discoveryengine.WeblogDiscoveryEngine}
- */
-package gov.nasa.jpl.mudrod.weblog.pre;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java
 
b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java
deleted file mode 100644
index 2b8f45f..0000000
--- 
a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/process/ClickStreamAnalyzer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.weblog.process;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.semantics.SVDAnalyzer;
-import gov.nasa.jpl.mudrod.ssearch.ClickstreamImporter;
-import gov.nasa.jpl.mudrod.utils.LinkageTriple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Supports ability to calculate term similarity based on click stream
- */
-public class ClickStreamAnalyzer extends DiscoveryStepAbstract {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ClickStreamAnalyzer.class);
-
-  public ClickStreamAnalyzer(Properties props, ESDriver es, SparkDriver spark) 
{
-    super(props, es, spark);
-  }
-
-  /**
-   * Method of executing click stream analyzer
-   */
-  @Override
-  public Object execute() {
-    LOG.info("Starting ClickStreamAnalyzer...");
-    startTime = System.currentTimeMillis();
-    try {
-      String clickstream_matrixFile = props.getProperty("clickstreamMatrix");
-      File f = new File(clickstream_matrixFile);
-      if (f.exists()) {
-        SVDAnalyzer svd = new SVDAnalyzer(props, es, spark);
-        svd.getSVDMatrix(props.getProperty("clickstreamMatrix"), 
Integer.parseInt(props.getProperty("clickstreamSVDDimension")), 
props.getProperty("clickstreamSVDMatrix_tmp"));
-        List<LinkageTriple> tripleList = 
svd.calTermSimfromMatrix(props.getProperty("clickstreamSVDMatrix_tmp"));
-        svd.saveToES(tripleList, props.getProperty("indexName"), 
props.getProperty("clickStreamLinkageType"));
-      
-        // Store click stream in ES for the ranking use
-        ClickstreamImporter cs = new ClickstreamImporter(props, es, spark);
-        cs.importfromCSVtoES();
-      }
-    } catch (Exception e) {
-      LOG.error("Encountered an error during execution of 
ClickStreamAnalyzer.", e);
-    }
-
-    endTime = System.currentTimeMillis();
-    es.refreshIndex();
-    LOG.info("ClickStreamAnalyzer complete. Time elapsed: {}s", (endTime - 
startTime) / 1000);
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-}

Reply via email to