http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RemoveRawLog.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RemoveRawLog.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RemoveRawLog.java
new file mode 100644
index 0000000..22aa09c
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RemoveRawLog.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Supports ability to remove raw logs after processing is finished
+ */
+public class RemoveRawLog extends LogAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveRawLog.class);
+
+  public RemoveRawLog(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Starting raw log removal.");
+    startTime = System.currentTimeMillis();
+    es.deleteAllByQuery(logIndex, httpType, QueryBuilders.matchAllQuery());
+    es.deleteAllByQuery(logIndex, ftpType, QueryBuilders.matchAllQuery());
+    endTime = System.currentTimeMillis();
+    es.refreshIndex();
+    LOG.info("Raw log removal complete. Time elapsed {} seconds.", (endTime - 
startTime) / 1000);
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
new file mode 100644
index 0000000..b1153bf
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.weblog.structure.Session;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.metrics.stats.Stats;
+import 
org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * Supports ability to generate user session by time threshold and referrer
+ */
+public class SessionGenerator extends LogAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(SessionGenerator.class);
+
+  public SessionGenerator(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Starting Session Generation.");
+    startTime = System.currentTimeMillis();
+    generateSession();
+    endTime = System.currentTimeMillis();
+    es.refreshIndex();
+    LOG.info("Session generating complete. Time elapsed {} seconds.", (endTime 
- startTime) / 1000);
+    return null;
+  }
+
+  public void generateSession() {
+    try {
+      es.createBulkProcessor();
+      genSessionByReferer(Integer.parseInt(props.getProperty("timegap")));
+      es.destroyBulkProcessor();
+
+      es.createBulkProcessor();
+      combineShortSessions(Integer.parseInt(props.getProperty("timegap")));
+      es.destroyBulkProcessor();
+    } catch (ElasticsearchException e) {
+      LOG.error("Error whilst executing bulk processor.", e);
+    } catch (IOException e) {
+      LOG.error("Error whilst reading configuration.", e);
+    } catch (NumberFormatException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void genSessionByReferer(int timeThres) throws InterruptedException, 
IOException {
+    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
+    if (processingType.equals("sequential")) {
+      genSessionByRefererInSequential(timeThres);
+    } else if (processingType.equals("parallel")) {
+      genSessionByRefererInParallel(timeThres);
+    }
+  }
+
+  public void combineShortSessions(int timeThres) throws InterruptedException, 
IOException {
+    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
+    if (processingType.equals("sequential")) {
+      combineShortSessionsInSequential(timeThres);
+    } else if (processingType.equals("parallel")) {
+      combineShortSessionsInParallel(timeThres);
+    }
+  }
+
+  /**
+   * Method to generate session by time threshold and referrer
+   *
+   * @param timeThres value of time threshold (s)
+   * @throws ElasticsearchException ElasticsearchException
+   * @throws IOException            IOException
+   */
+  public void genSessionByRefererInSequential(int timeThres) throws 
ElasticsearchException, IOException {
+
+    Terms users = this.getUserTerms(this.cleanupType);
+
+    int sessionCount = 0;
+    for (Terms.Bucket entry : users.getBuckets()) {
+
+      String user = (String) entry.getKey();
+      Integer sessionNum = genSessionByReferer(es, user, timeThres);
+      sessionCount += sessionNum;
+    }
+
+    LOG.info("Initial session count: {}", Integer.toString(sessionCount));
+  }
+
+  public void combineShortSessionsInSequential(int timeThres) throws 
ElasticsearchException, IOException {
+
+    Terms users = this.getUserTerms(this.cleanupType);
+    for (Terms.Bucket entry : users.getBuckets()) {
+      String user = entry.getKey().toString();
+      combineShortSessions(es, user, timeThres);
+    }
+  }
+
+  /**
+   * Method to remove invalid logs through IP address
+   *
+   * @param es an instantiated es driver
+   * @param ip invalid IP address
+   * @throws ElasticsearchException ElasticsearchException
+   * @throws IOException            IOException
+   */
+  public void deleteInvalid(ESDriver es, String ip) throws IOException {
+
+    BoolQueryBuilder filterAll = new BoolQueryBuilder();
+    filterAll.must(QueryBuilders.termQuery("IP", ip));
+
+    SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterAll).setSize(100).execute().actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        update(es, logIndex, cleanupType, hit.getId(), "SessionID", "invalid");
+      }
+
+      scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Method to update a Elasticsearch record/document by id, field, and value
+   *
+   * @param es
+   * @param index  index name is Elasticsearch
+   * @param type   type name
+   * @param id     ID of the document that needs to be updated
+   * @param field1 field of the document that needs to be updated
+   * @param value1 value of the document that needs to be changed to
+   * @throws ElasticsearchException
+   * @throws IOException
+   */
+  private void update(ESDriver es, String index, String type, String id, 
String field1, Object value1) throws IOException {
+    UpdateRequest ur = new UpdateRequest(index, type, 
id).doc(jsonBuilder().startObject().field(field1, value1).endObject());
+    es.getBulkProcessor().add(ur);
+  }
+
+  public void genSessionByRefererInParallel(int timeThres) throws 
InterruptedException, IOException {
+
+    JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
+
+    int sessionCount = 0;
+    sessionCount = userRDD.mapPartitions(new FlatMapFunction<Iterator<String>, 
Integer>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
+        ESDriver tmpES = new ESDriver(props);
+        tmpES.createBulkProcessor();
+        List<Integer> sessionNums = new ArrayList<>();
+        while (arg0.hasNext()) {
+          String s = arg0.next();
+          Integer sessionNum = genSessionByReferer(tmpES, s, timeThres);
+          sessionNums.add(sessionNum);
+        }
+        tmpES.destroyBulkProcessor();
+        tmpES.close();
+        return sessionNums.iterator();
+      }
+    }).reduce(new Function2<Integer, Integer, Integer>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Integer call(Integer a, Integer b) {
+        return a + b;
+      }
+    });
+
+    LOG.info("Initial Session count: {}", Integer.toString(sessionCount));
+  }
+
+  public int genSessionByReferer(ESDriver es, String user, int timeThres) 
throws ElasticsearchException, IOException {
+
+    String startTime = null;
+    int sessionCountIn = 0;
+
+    BoolQueryBuilder filterSearch = new BoolQueryBuilder();
+    filterSearch.must(QueryBuilders.termQuery("IP", user));
+
+    SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterSearch).addSort("Time", 
SortOrder.ASC).setSize(100)
+        .execute().actionGet();
+
+    Map<String, Map<String, DateTime>> sessionReqs = new HashMap<>();
+    String request = "";
+    String referer = "";
+    String logType = "";
+    String id = "";
+    String ip = user;
+    String indexUrl = "http://podaac.jpl.nasa.gov/";;
+    DateTime time = null;
+    DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
+
+    while (scrollResp.getHits().getHits().length != 0) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> result = hit.getSource();
+        request = (String) result.get("RequestUrl");
+        referer = (String) result.get("Referer");
+        logType = (String) result.get("LogType");
+        time = fmt.parseDateTime((String) result.get("Time"));
+        id = hit.getId();
+
+        if ("PO.DAAC".equals(logType)) {
+          if ("-".equals(referer) || referer.equals(indexUrl) || 
!referer.contains(indexUrl)) {
+            sessionCountIn++;
+            sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, 
DateTime>());
+            sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
+
+            update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + 
sessionCountIn);
+
+          } else {
+            int count = sessionCountIn;
+            int rollbackNum = 0;
+            while (true) {
+              Map<String, DateTime> requests = sessionReqs.get(ip + "@" + 
count);
+              if (requests == null) {
+                sessionReqs.put(ip + "@" + count, new HashMap<String, 
DateTime>());
+                sessionReqs.get(ip + "@" + count).put(request, time);
+                update(es, logIndex, this.cleanupType, id, "SessionID", ip + 
"@" + count);
+
+                break;
+              }
+              ArrayList<String> keys = new ArrayList<>(requests.keySet());
+              boolean bFindRefer = false;
+
+              for (int i = keys.size() - 1; i >= 0; i--) {
+                rollbackNum++;
+                if (keys.get(i).equalsIgnoreCase(referer)) {
+                  bFindRefer = true;
+                  // threshold,if time interval > 10*
+                  // click num, start a new session
+                  if 
(Math.abs(Seconds.secondsBetween(requests.get(keys.get(i)), time).getSeconds()) 
< timeThres * rollbackNum) {
+                    sessionReqs.get(ip + "@" + count).put(request, time);
+                    update(es, logIndex, this.cleanupType, id, "SessionID", ip 
+ "@" + count);
+                  } else {
+                    sessionCountIn++;
+                    sessionReqs.put(ip + "@" + sessionCountIn, new 
HashMap<String, DateTime>());
+                    sessionReqs.get(ip + "@" + sessionCountIn).put(request, 
time);
+                    update(es, logIndex, this.cleanupType, id, "SessionID", ip 
+ "@" + sessionCountIn);
+                  }
+
+                  break;
+                }
+              }
+
+              if (bFindRefer) {
+                break;
+              }
+
+              count--;
+              if (count < 0) {
+                sessionCountIn++;
+
+                sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, 
DateTime>());
+                sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
+                update(es, props.getProperty(MudrodConstants.ES_INDEX_NAME), 
this.cleanupType, id, "SessionID", ip + "@" + sessionCountIn);
+
+                break;
+              }
+            }
+          }
+        } else if ("ftp".equals(logType)) {
+
+          // may affect computation efficiency
+          Map<String, DateTime> requests = sessionReqs.get(ip + "@" + 
sessionCountIn);
+          if (requests == null) {
+            sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, 
DateTime>());
+          } else {
+            ArrayList<String> keys = new ArrayList<>(requests.keySet());
+            int size = keys.size();
+            if (Math.abs(Seconds.secondsBetween(requests.get(keys.get(size - 
1)), time).getSeconds()) > timeThres) {
+              sessionCountIn += 1;
+              sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, 
DateTime>());
+            }
+          }
+          sessionReqs.get(ip + "@" + sessionCountIn).put(request, time);
+          update(es, logIndex, this.cleanupType, id, "SessionID", ip + "@" + 
sessionCountIn);
+        }
+      }
+
+      scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
+    }
+
+    return sessionCountIn;
+  }
+
+  public void combineShortSessionsInParallel(int timeThres) throws 
InterruptedException, IOException {
+
+    JavaRDD<String> userRDD = getUserRDD(this.cleanupType);
+
+    userRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public void call(Iterator<String> arg0) throws Exception {
+        ESDriver tmpES = new ESDriver(props);
+        tmpES.createBulkProcessor();
+        while (arg0.hasNext()) {
+          String s = arg0.next();
+          combineShortSessions(tmpES, s, timeThres);
+        }
+        tmpES.destroyBulkProcessor();
+        tmpES.close();
+      }
+    });
+  }
+
+  public void combineShortSessions(ESDriver es, String user, int timeThres) 
throws ElasticsearchException, IOException {
+
+    BoolQueryBuilder filterSearch = new BoolQueryBuilder();
+    filterSearch.must(QueryBuilders.termQuery("IP", user));
+
+    String[] indexArr = new String[] { logIndex };
+    String[] typeArr = new String[] { cleanupType };
+    int docCount = es.getDocCount(indexArr, typeArr, filterSearch);
+
+    if (docCount < 3) {
+      deleteInvalid(es, user);
+      return;
+    }
+
+    BoolQueryBuilder filterCheck = new BoolQueryBuilder();
+    filterCheck.must(QueryBuilders.termQuery("IP", 
user)).must(QueryBuilders.termQuery("Referer", "-"));
+    SearchResponse checkReferer = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterCheck).setSize(0).execute().actionGet();
+
+    long numInvalid = checkReferer.getHits().getTotalHits();
+    double invalidRate = numInvalid / docCount;
+
+    if (invalidRate >= 0.8) {
+      deleteInvalid(es, user);
+      return;
+    }
+
+    StatsAggregationBuilder statsAgg = 
AggregationBuilders.stats("Stats").field("Time");
+    SearchResponse srSession = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(filterSearch)
+        
.addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount).subAggregation(statsAgg)).execute().actionGet();
+
+    Terms sessions = srSession.getAggregations().get("Sessions");
+
+    List<Session> sessionList = new ArrayList<>();
+    for (Terms.Bucket session : sessions.getBuckets()) {
+      Stats agg = session.getAggregations().get("Stats");
+      Session sess = new Session(props, es, agg.getMinAsString(), 
agg.getMaxAsString(), session.getKey().toString());
+      sessionList.add(sess);
+    }
+
+    Collections.sort(sessionList);
+    DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
+    String last = null;
+    String lastnewID = null;
+    String lastoldID = null;
+    String current = null;
+    for (Session s : sessionList) {
+      current = s.getEndTime();
+      if (last != null) {
+        if (Seconds.secondsBetween(fmt.parseDateTime(last), 
fmt.parseDateTime(current)).getSeconds() < timeThres) {
+          if (lastnewID == null) {
+            s.setNewID(lastoldID);
+          } else {
+            s.setNewID(lastnewID);
+          }
+
+          QueryBuilder fs = 
QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("SessionID", 
s.getID()));
+
+          SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(this.cleanupType).setScroll(new 
TimeValue(60000)).setQuery(fs).setSize(100).execute().actionGet();
+          while (true) {
+            for (SearchHit hit : scrollResp.getHits().getHits()) {
+              if (lastnewID == null) {
+                update(es, logIndex, this.cleanupType, hit.getId(), 
"SessionID", lastoldID);
+              } else {
+                update(es, logIndex, this.cleanupType, hit.getId(), 
"SessionID", lastnewID);
+              }
+            }
+
+            scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
+            if (scrollResp.getHits().getHits().length == 0) {
+              break;
+            }
+          }
+        }
+        ;
+      }
+      lastoldID = s.getID();
+      lastnewID = s.getNewID();
+      last = current;
+    }
+
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
new file mode 100644
index 0000000..6f5c5f7
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.weblog.structure.RequestUrl;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.metrics.stats.Stats;
+import 
org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * Supports ability to post-process session, including summarizing statistics
+ * and filtering
+ */
+public class SessionStatistic extends LogAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(SessionStatistic.class);
+
+  public SessionStatistic(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Starting Session Summarization.");
+    startTime = System.currentTimeMillis();
+    try {
+      processSession();
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
+    endTime = System.currentTimeMillis();
+    es.refreshIndex();
+    LOG.info("Session Summarization complete. Time elapsed {} seconds.", 
(endTime - startTime) / 1000);
+    return null;
+  }
+
+  public void processSession() throws InterruptedException, IOException, 
ExecutionException {
+    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
+    if (processingType.equals("sequential")) {
+      processSessionInSequential();
+    } else if (processingType.equals("parallel")) {
+      processSessionInParallel();
+    }
+  }
+
+  public void processSessionInSequential() throws IOException, 
InterruptedException, ExecutionException {
+    es.createBulkProcessor();
+    Terms Sessions = this.getSessionTerms();
+    int session_count = 0;
+    for (Terms.Bucket entry : Sessions.getBuckets()) {
+      if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) {
+        String sessionid = entry.getKey().toString();
+        int sessionNum = processSession(es, sessionid);
+        session_count += sessionNum;
+      }
+    }
+    LOG.info("Final Session count: {}", Integer.toString(session_count));
+    es.destroyBulkProcessor();
+  }
+
+  /**
+   * Extract the dataset ID from a long request
+   *
+   * @param request raw log request
+   * @return dataset ID
+   */
+  public String findDataset(String request) {
+    String pattern1 = "/dataset/";
+    String pattern2;
+    if (request.contains("?")) {
+      pattern2 = "?";
+    } else {
+      pattern2 = " ";
+    }
+
+    Pattern p = Pattern.compile(Pattern.quote(pattern1) + "(.*?)" + 
Pattern.quote(pattern2));
+    Matcher m = p.matcher(request);
+    if (m.find()) {
+      return m.group(1);
+    }
+    return null;
+  }
+
+  public void processSessionInParallel() throws InterruptedException, 
IOException {
+
+    List<String> sessions = this.getSessions();
+    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessions, partition);
+
+    int sessionCount = 0;
+    sessionCount = sessionRDD.mapPartitions(new 
FlatMapFunction<Iterator<String>, Integer>() {
+      @Override
+      public Iterator<Integer> call(Iterator<String> arg0) throws Exception {
+        ESDriver tmpES = new ESDriver(props);
+        tmpES.createBulkProcessor();
+        List<Integer> sessionNums = new ArrayList<Integer>();
+        sessionNums.add(0);
+        while (arg0.hasNext()) {
+          String s = arg0.next();
+          Integer sessionNum = processSession(tmpES, s);
+          sessionNums.add(sessionNum);
+        }
+        tmpES.destroyBulkProcessor();
+        tmpES.close();
+        return sessionNums.iterator();
+      }
+    }).reduce(new Function2<Integer, Integer, Integer>() {
+      @Override
+      public Integer call(Integer a, Integer b) {
+        return a + b;
+      }
+    });
+
+    LOG.info("Final Session count: {}", Integer.toString(sessionCount));
+  }
+
+  public int processSession(ESDriver es, String sessionId) throws IOException, 
InterruptedException, ExecutionException {
+
+    String inputType = cleanupType;
+    String outputType = sessionStats;
+
+    DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
+    String min = null;
+    String max = null;
+    DateTime start = null;
+    DateTime end = null;
+    int duration = 0;
+    float request_rate = 0;
+
+    int session_count = 0;
+    Pattern pattern = Pattern.compile("get (.*?) http/*");
+
+    StatsAggregationBuilder statsAgg = 
AggregationBuilders.stats("Stats").field("Time");
+
+    BoolQueryBuilder filter_search = new BoolQueryBuilder();
+    filter_search.must(QueryBuilders.termQuery("SessionID", sessionId));
+
+    SearchResponse sr = 
es.getClient().prepareSearch(logIndex).setTypes(inputType).setQuery(filter_search).addAggregation(statsAgg).execute().actionGet();
+
+    Stats agg = sr.getAggregations().get("Stats");
+    min = agg.getMinAsString();
+    max = agg.getMaxAsString();
+    start = fmt.parseDateTime(min);
+    end = fmt.parseDateTime(max);
+
+    duration = Seconds.secondsBetween(start, end).getSeconds();
+
+    int searchDataListRequest_count = 0;
+    int searchDataRequest_count = 0;
+    int searchDataListRequest_byKeywords_count = 0;
+    int ftpRequest_count = 0;
+    int keywords_num = 0;
+
+    String IP = null;
+    String keywords = "";
+    String views = "";
+    String downloads = "";
+
+    SearchResponse scrollResp = 
es.getClient().prepareSearch(logIndex).setTypes(inputType).setScroll(new 
TimeValue(60000)).setQuery(filter_search).setSize(100).execute().actionGet();
+
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> result = hit.getSource();
+
+        String request = (String) result.get("Request");
+        String logType = (String) result.get("LogType");
+        IP = (String) result.get("IP");
+        Matcher matcher = pattern.matcher(request.trim().toLowerCase());
+        while (matcher.find()) {
+          request = matcher.group(1);
+        }
+
+        String datasetlist = "/datasetlist?";
+        String dataset = "/dataset/";
+        if (request.contains(datasetlist)) {
+          searchDataListRequest_count++;
+
+          RequestUrl requestURL = new RequestUrl();
+          String infoStr = requestURL.getSearchInfo(request) + ",";
+          String info = es.customAnalyzing(props.getProperty("indexName"), 
infoStr);
+
+          if (!info.equals(",")) {
+            if (keywords.equals("")) {
+              keywords = keywords + info;
+            } else {
+              String[] items = info.split(",");
+              String[] keywordList = keywords.split(",");
+              for (int m = 0; m < items.length; m++) {
+                if (!Arrays.asList(keywordList).contains(items[m])) {
+                  keywords = keywords + items[m] + ",";
+                }
+              }
+            }
+          }
+
+        }
+        if (request.startsWith(dataset)) {
+          searchDataRequest_count++;
+          if (findDataset(request) != null) {
+            String view = findDataset(request);
+
+            if ("".equals(views)) {
+              views = view;
+            } else {
+              if (views.contains(view)) {
+
+              } else {
+                views = views + "," + view;
+              }
+            }
+          }
+        }
+        if ("ftp".equals(logType)) {
+          ftpRequest_count++;
+          String download = "";
+          String requestLowercase = request.toLowerCase();
+          if (requestLowercase.endsWith(".jpg") == false && 
requestLowercase.endsWith(".pdf") == false && requestLowercase.endsWith(".txt") 
== false && requestLowercase.endsWith(".gif") == false) {
+            download = request;
+          }
+
+          if ("".equals(downloads)) {
+            downloads = download;
+          } else {
+            if (downloads.contains(download)) {
+
+            } else {
+              downloads = downloads + "," + download;
+            }
+          }
+        }
+
+      }
+
+      scrollResp = 
es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new 
TimeValue(600000)).execute().actionGet();
+      // Break condition: No hits are returned
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    if (!keywords.equals("")) {
+      keywords_num = keywords.split(",").length;
+    }
+
+    if (searchDataListRequest_count != 0 && searchDataListRequest_count <= 
Integer.parseInt(props.getProperty("searchf")) && searchDataRequest_count != 0 
&& searchDataRequest_count <= Integer
+        .parseInt(props.getProperty("viewf")) && ftpRequest_count <= 
Integer.parseInt(props.getProperty("downloadf"))) {
+      String sessionURL = props.getProperty("SessionPort") + 
props.getProperty("SessionUrl") + "?sessionid=" + sessionId + "&sessionType=" + 
outputType + "&requestType=" + inputType;
+      session_count = 1;
+
+      IndexRequest ir = new IndexRequest(logIndex, outputType).source(
+          jsonBuilder().startObject().field("SessionID", 
sessionId).field("SessionURL", sessionURL).field("Duration", 
duration).field("Number of Keywords", keywords_num).field("Time", min)
+              .field("End_time", max).field("searchDataListRequest_count", 
searchDataListRequest_count).field("searchDataListRequest_byKeywords_count", 
searchDataListRequest_byKeywords_count)
+              .field("searchDataRequest_count", 
searchDataRequest_count).field("keywords", es.customAnalyzing(logIndex, 
keywords)).field("views", views).field("downloads", downloads)
+              .field("request_rate", request_rate).field("Comments", 
"").field("Validation", 0).field("Produceby", 0).field("Correlation", 
0).field("IP", IP).endObject());
+
+      es.getBulkProcessor().add(ir);
+    }
+
+    return session_count;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/package-info.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/package-info.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/package-info.java
new file mode 100644
index 0000000..5bf7f27
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes Preprocessing for all functionality required by the
+ * {@link org.apache.sdap.mudrod.discoveryengine.WeblogDiscoveryEngine}
+ */
+package org.apache.sdap.mudrod.weblog.pre;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
new file mode 100644
index 0000000..68fad4d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.process;
+
+import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.semantics.SVDAnalyzer;
+import org.apache.sdap.mudrod.ssearch.ClickstreamImporter;
+import org.apache.sdap.mudrod.utils.LinkageTriple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Supports ability to calculate term similarity based on click stream
+ */
+public class ClickStreamAnalyzer extends DiscoveryStepAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClickStreamAnalyzer.class);
+
+  public ClickStreamAnalyzer(Properties props, ESDriver es, SparkDriver spark) 
{
+    super(props, es, spark);
+  }
+
+  /**
+   * Method of executing click stream analyzer
+   */
+  @Override
+  public Object execute() {
+    LOG.info("Starting ClickStreamAnalyzer...");
+    startTime = System.currentTimeMillis();
+    try {
+      String clickstream_matrixFile = props.getProperty("clickstreamMatrix");
+      File f = new File(clickstream_matrixFile);
+      if (f.exists()) {
+        SVDAnalyzer svd = new SVDAnalyzer(props, es, spark);
+        svd.getSVDMatrix(props.getProperty("clickstreamMatrix"), 
Integer.parseInt(props.getProperty("clickstreamSVDDimension")), 
props.getProperty("clickstreamSVDMatrix_tmp"));
+        List<LinkageTriple> tripleList = 
svd.calTermSimfromMatrix(props.getProperty("clickstreamSVDMatrix_tmp"));
+        svd.saveToES(tripleList, props.getProperty("indexName"), 
props.getProperty("clickStreamLinkageType"));
+      
+        // Store click stream in ES for the ranking use
+        ClickstreamImporter cs = new ClickstreamImporter(props, es, spark);
+        cs.importfromCSVtoES();
+      }
+    } catch (Exception e) {
+      LOG.error("Encountered an error during execution of 
ClickStreamAnalyzer.", e);
+    }
+
+    endTime = System.currentTimeMillis();
+    es.refreshIndex();
+    LOG.info("ClickStreamAnalyzer complete. Time elapsed: {}s", (endTime - 
startTime) / 1000);
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
new file mode 100644
index 0000000..d95475c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.process;
+
+import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.semantics.SemanticAnalyzer;
+import org.apache.sdap.mudrod.utils.LinkageTriple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Supports ability to calculate term similarity based on user history
+ */
+public class UserHistoryAnalyzer extends DiscoveryStepAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(UserHistoryAnalyzer.class);
+
+  public UserHistoryAnalyzer(Properties props, ESDriver es, SparkDriver spark) 
{
+    super(props, es, spark);
+  }
+
+  /**
+   * Method of executing user history analyzer
+   */
+  @Override
+  public Object execute() {
+    LOG.info("Starting UserHistoryAnalyzer...");
+    startTime = System.currentTimeMillis();
+
+    SemanticAnalyzer sa = new SemanticAnalyzer(props, es, spark);
+    List<LinkageTriple> tripleList = 
sa.calTermSimfromMatrix(props.getProperty("userHistoryMatrix"));
+    sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), 
props.getProperty(MudrodConstants.USE_HISTORY_LINKAGE_TYPE));
+
+    endTime = System.currentTimeMillis();
+    es.refreshIndex();
+    LOG.info("UserHistoryAnalyzer complete. Time elapsed: {}s", (endTime - 
startTime) / 1000);
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/process/package-info.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/package-info.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/package-info.java
new file mode 100644
index 0000000..a6b55f4
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes web log processing classes.
+ */
+package org.apache.sdap.mudrod.weblog.process;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java
 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java
new file mode 100644
index 0000000..0127e2d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.structure;
+
+import com.google.gson.Gson;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.sdap.mudrod.weblog.pre.CrawlerDetection;
+
+/**
+ * This class represents an Apache access log line. See
+ * http://httpd.apache.org/docs/2.2/logs.html for more details.
+ */
+public class ApacheAccessLog extends WebLog implements Serializable {
+
+  // double Bytes;
+  String Response;
+  String Referer;
+  String Browser;
+
+  @Override
+  public double getBytes() {
+    return this.Bytes;
+  }
+
+  public String getBrowser() {
+    return this.Browser;
+  }
+
+  public String getResponse() {
+    return this.Response;
+  }
+
+  public String getReferer() {
+    return this.Referer;
+  }
+
+  public ApacheAccessLog() {
+
+  }
+
+  public static String parseFromLogLine(String log) throws IOException, 
ParseException {
+
+    String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) 
\\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" 
\"([^\"]+)\"";
+    final int NUM_FIELDS = 9;
+    Pattern p = Pattern.compile(logEntryPattern);
+    Matcher matcher;
+
+    String lineJson = "{}";
+    matcher = p.matcher(log);
+    if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) {
+      return lineJson;
+    }
+
+    String time = matcher.group(4);
+    time = SwithtoNum(time);
+    SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
+    Date date = formatter.parse(time);
+
+    String bytes = matcher.group(7);
+
+    if (bytes.equals("-")) {
+      bytes = "0";
+    }
+
+    String request = matcher.group(5).toLowerCase();
+    String agent = matcher.group(9);
+    CrawlerDetection crawlerDe = new CrawlerDetection();
+    if (crawlerDe.checkKnownCrawler(agent)) {
+      return lineJson;
+    } else {
+
+      boolean tag = false;
+      String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", 
"image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / 
http/1.1", ".jpeg", "/ws/" };
+      for (int i = 0; i < mimeTypes.length; i++) {
+        if (request.contains(mimeTypes[i])) {
+          tag = true;
+          return lineJson;
+        }
+      }
+
+      if (tag == false) {
+        ApacheAccessLog accesslog = new ApacheAccessLog();
+        accesslog.LogType = "PO.DAAC";
+        accesslog.IP = matcher.group(1);
+        accesslog.Request = matcher.group(5);
+        accesslog.Response = matcher.group(6);
+        accesslog.Bytes = Double.parseDouble(bytes);
+        accesslog.Referer = matcher.group(8);
+        accesslog.Browser = matcher.group(9);
+        SimpleDateFormat df = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'");
+        accesslog.Time = df.format(date);
+
+        Gson gson = new Gson();
+        lineJson = gson.toJson(accesslog);
+
+        return lineJson;
+      }
+    }
+
+    lineJson = "{}";
+    return lineJson;
+  }
+
+  public static boolean checknull(WebLog s) {
+    if (s == null) {
+      return false;
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ClickStream.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ClickStream.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ClickStream.java
new file mode 100644
index 0000000..2f0c34d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ClickStream.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.structure;
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.Serializable;
+
+/**
+ * ClassName: ClickStream Function: user click stream data related operations.
+ */
+public class ClickStream implements Serializable {
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  // keywords: query words related to the click behaviour
+  private String keywords;
+  // viewDataset: the dataset name user viewed
+  private String viewDataset;
+  // downloadDataset: the dataset name user downloaded
+  private String downloadDataset;
+  // sessionID: session ID
+  private String sessionID;
+  // type: session type name
+  private String type;
+
+  /**
+   * Creates a new instance of ClickStream.
+   *
+   * @param keywords    the query user searched
+   * @param viewDataset the dataset name user viewed
+   * @param download:   if user download the data set after viewing it, this 
parameter is
+   *                    true, otherwise, it is false.
+   */
+  public ClickStream(String keywords, String viewDataset, boolean download) {
+    this.keywords = keywords;
+    this.viewDataset = viewDataset;
+    this.downloadDataset = "";
+    if (download) {
+      this.downloadDataset = viewDataset;
+    }
+  }
+
+  public ClickStream() {
+    //default constructor
+  }
+
+  public String getSessionID() {
+    return sessionID;
+  }
+
+  /**
+   * setKeyWords: Set the query user searched.
+   *
+   * @param query search words
+   */
+  public void setKeyWords(String query) {
+    this.keywords = query;
+  }
+
+  /**
+   * setViewDataset:Set the data set name user viewed
+   *
+   * @param dataset short name of data set
+   */
+  public void setViewDataset(String dataset) {
+    this.viewDataset = dataset;
+  }
+
+  /**
+   * setDownloadDataset: Set the data set name user downloaded
+   *
+   * @param dataset short name of data set
+   */
+  public void setDownloadDataset(String dataset) {
+    this.downloadDataset = dataset;
+  }
+
+  /**
+   * getKeyWords: Get the query user searched
+   *
+   * @return data set name
+   */
+  public String getKeyWords() {
+    return this.keywords;
+  }
+
+  /**
+   * getViewDataset: Get the data set user viewed
+   *
+   * @return data set name
+   */
+  public String getViewDataset() {
+    return this.viewDataset;
+  }
+
+  /**
+   * isDownload: Show whether the data is downloaded in the session.
+   *
+   * @return True or False
+   */
+  public Boolean isDownload() {
+    if ("".equals(this.downloadDataset)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * setSessionId: Set ID of session
+   *
+   * @param sessionID session id
+   */
+  public void setSessionId(String sessionID) {
+    this.sessionID = sessionID;
+  }
+
+  /**
+   * setType: Set session type name
+   *
+   * @param type session type name in elasticsearch
+   */
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  /**
+   * Output click stream info in string format
+   *
+   * @see java.lang.Object#toString()
+   */
+  @Override
+  public String toString() {
+    return "Query: " + keywords + " || View Dataset: " + viewDataset + " || 
Download Dataset: " + downloadDataset;
+  }
+
+  /**
+   * toJson: Output click stream info in Json format
+   *
+   * @return session in string format
+   */
+  public String toJson() {
+    String jsonQuery = "{";
+    jsonQuery += "\"query\":\"" + this.keywords + "\",";
+    jsonQuery += "\"viewdataset\":\"" + this.viewDataset + "\",";
+    jsonQuery += "\"downloaddataset\":\"" + this.downloadDataset + "\",";
+    jsonQuery += "\"sessionId\":\"" + this.sessionID + "\",";
+    jsonQuery += "\"type\":\"" + this.type + "\"";
+    jsonQuery += "},";
+    return jsonQuery;
+  }
+
+  /**
+   * parseFromTextLine: Convert string to click stream data
+   *
+   * @param logline http log line
+   * @return {@link ClickStream}
+   */
+  public static ClickStream parseFromTextLine(String logline) {
+    JSONObject jsonData = null;
+    ClickStream data = null;
+    try {
+      jsonData = new JSONObject(logline);
+      data = new ClickStream();
+      data.setKeyWords(jsonData.getString("query"));
+      data.setViewDataset(jsonData.getString("viewdataset"));
+      data.setDownloadDataset(jsonData.getString("downloaddataset"));
+
+    } catch (JSONException e) {
+      e.printStackTrace();
+    }
+
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Coordinates.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Coordinates.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Coordinates.java
new file mode 100644
index 0000000..5e6fd07
--- /dev/null
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Coordinates.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.structure;
+
+public class Coordinates {
+  /*
+   * public String lat; public String lon;
+   */
+  public String latlon;
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java
new file mode 100644
index 0000000..488fe52
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.structure;
+
+import com.google.gson.Gson;
+
+import org.apache.sdap.mudrod.weblog.pre.ImportLogFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * This class represents an FTP access log line.
+ */
+public class FtpLog extends WebLog implements Serializable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ImportLogFile.class);
+
+  public static String parseFromLogLine(String log) {
+
+    try {
+      String ip = log.split(" +")[6];
+
+      String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + 
log.split(" +")[3] + ":" + log.split(" +")[4];
+
+      time = SwithtoNum(time);
+      SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy");
+      Date date = formatter.parse(time);
+
+      String bytes = log.split(" +")[7];
+
+      String request = log.split(" +")[8].toLowerCase();
+
+      if (!request.contains("/misc/") && !request.contains("readme")) {
+        FtpLog ftplog = new FtpLog();
+        ftplog.LogType = "ftp";
+        ftplog.IP = ip;
+        ftplog.Request = request;
+        ftplog.Bytes = Double.parseDouble(bytes);
+
+        SimpleDateFormat df = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'");
+        ftplog.Time = df.format(date);
+
+        return new Gson().toJson(ftplog);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error parsing ftp log line [{}]. Skipping this line.", log, e);
+    }
+    return "{}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/GeoIp.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/GeoIp.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/GeoIp.java
new file mode 100644
index 0000000..d3e94dc
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/GeoIp.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.structure;
+
+import org.apache.sdap.mudrod.utils.HttpRequest;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * ClassName: GeoIp Function: convert IP to geo location
+ */
+public class GeoIp {
+
+  /**
+   * toLocation: convert ip to location
+   *
+   * @param ip ip address
+   * @return coordinates
+   */
+  public Coordinates toLocation(String ip) {
+    String url = "http://getcitydetails.geobytes.com/GetCityDetails?fqcn="; + 
ip;
+    HttpRequest http = new HttpRequest();
+    String response = http.getRequest(url);
+    JsonParser parser = new JsonParser();
+    JsonElement jobSon = parser.parse(response);
+    JsonObject responseObject = jobSon.getAsJsonObject();
+
+    Coordinates co = new Coordinates();
+    String lon = 
responseObject.get("geobyteslongitude").toString().replace("\"", "");
+    String lat = 
responseObject.get("geobyteslatitude").toString().replace("\"", "");
+    co.latlon = lat + "," + lon;
+    return co;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RankingTrainData.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RankingTrainData.java
 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RankingTrainData.java
new file mode 100644
index 0000000..cf4ec23
--- /dev/null
+++ 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RankingTrainData.java
@@ -0,0 +1,147 @@
+package org.apache.sdap.mudrod.weblog.structure;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * ClassName: train data extracted from web logs for training ranking weightss.
+ */
+public class RankingTrainData implements Serializable {
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  // sessionID: session ID
+  private String sessionID;
+  // type: session type name
+  private String index;
+  // query: query words related to the click
+  private String query;
+  // datasetA
+  private String highRankDataset;
+  // datasetB
+  private String lowRankDataset;
+
+  private Map<String, String> filter;
+
+  /**
+   * Creates a new instance of ClickStream.
+   *
+   * @param query           the user query string
+   * @param highRankDataset the dataset name for the highest ranked dataset
+   * @param lowRankDataset  the dataset name for the lowest ranked dataset
+   */
+  public RankingTrainData(String query, String highRankDataset, String 
lowRankDataset) {
+    this.query = query;
+    this.highRankDataset = highRankDataset;
+    this.lowRankDataset = lowRankDataset;
+  }
+
+  public RankingTrainData() {
+    //default constructor
+  }
+
+  public String getSessionID() {
+    return sessionID;
+  }
+
+  /**
+   * setKeyWords: Set the query user searched.
+   *
+   * @param query search words
+   */
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  /**
+   * getKeyWords: Get the query user searched
+   *
+   * @return data set name
+   */
+  public String getQuery() {
+    return this.query;
+  }
+
+  /**
+   * setViewDataset:Set the data set name user viewed
+   *
+   * @param dataset short name of data set
+   */
+  public void setHighRankDataset(String dataset) {
+    this.highRankDataset = dataset;
+  }
+
+  /**
+   * setDownloadDataset: Set the data set name user downloaded
+   *
+   * @param dataset short name of data set
+   */
+  public void setLowRankDataset(String dataset) {
+    this.lowRankDataset = dataset;
+  }
+
+  /**
+   * getViewDataset: Get the data set user viewed
+   *
+   * @return data set name
+   */
+  public String getLowRankDataset() {
+    return this.lowRankDataset;
+  }
+
+  /**
+   * setSessionId: Set ID of session
+   *
+   * @param sessionID session id
+   */
+  public void setSessionId(String sessionID) {
+    this.sessionID = sessionID;
+  }
+
+  /**
+   * setType: Set session type name
+   *
+   * @param index session type name in elasticsearch
+   */
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  public void setFilter(Map<String, String> filter) {
+    this.filter = filter;
+  }
+
+  /**
+   * Output click stream info in string format
+   *
+   * @see java.lang.Object#toString()
+   */
+  @Override
+  public String toString() {
+    return "query:" + query + "|| highRankDataset:" + highRankDataset + "|| 
lowRankDataset:" + lowRankDataset;
+  }
+
+  /**
+   * toJson: Output click stream info in Json format
+   *
+   * @return session in string format
+   */
+  public String toJson() {
+    String jsonQuery = "{";
+    jsonQuery += "\"query\":\"" + this.query + "\",";
+    jsonQuery += "\"highRankDataset\":\"" + this.highRankDataset + "\",";
+    jsonQuery += "\"lowRankDataset\":\"" + this.lowRankDataset + "\",";
+
+    if (this.filter != null) {
+      for (String key : filter.keySet()) {
+        jsonQuery += "\"" + key + "\":\"" + filter.get(key) + "\",";
+      }
+    }
+
+    jsonQuery += "\"sessionId\":\"" + this.sessionID + "\",";
+    jsonQuery += "\"index\":\"" + this.index + "\"";
+    jsonQuery += "},";
+    return jsonQuery;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java
new file mode 100644
index 0000000..f86438d
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.structure;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ClassName: RequestUrl Function: request url relate operations
+ */
+public class RequestUrl {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RequestUrl.class);
+
+  /**
+   * Default Constructor
+   */
+  public RequestUrl() {
+    /* Default Constructor */
+  }
+
+  /**
+   * UrlPage: Get url page from url link
+   *
+   * @param strURL request url
+   * @return page name
+   */
+  public static String urlPage(String strURL) {
+    String strPage = null;
+    String[] arrSplit = null;
+
+    String newURL = strURL.trim().toLowerCase();
+
+    arrSplit = newURL.split("[?]");
+    if (newURL.length() > 0 && arrSplit.length > 1 && arrSplit[0] != null) {
+      strPage = arrSplit[0];
+    }
+
+    return strPage;
+  }
+
+  /**
+   * TruncateUrlPage: Get url params from url link
+   *
+   * @param strURL
+   * @return url params
+   */
+  private static String truncateUrlPage(String strURL) {
+    String strAllParam = null;
+    String[] arrSplit = null;
+
+    strURL = strURL.trim().toLowerCase(); // keep this in mind
+
+    arrSplit = strURL.split("[?]");
+    if (strURL.length() > 1) {
+      if (arrSplit.length > 1) {
+        if (arrSplit[1] != null) {
+          strAllParam = arrSplit[1];
+        }
+      }
+    }
+
+    return strAllParam;
+  }
+
+  /**
+   * URLRequest: Get url params from url link in a map format
+   *
+   * @param URL request url
+   * @return url params key value map
+   */
+  public static Map<String, String> uRLRequest(String URL) {
+    Map<String, String> mapRequest = new HashMap<String, String>();
+
+    String[] arrSplit = null;
+
+    String strUrlParam = truncateUrlPage(URL);
+    if (strUrlParam == null) {
+      return mapRequest;
+    }
+
+    arrSplit = strUrlParam.split("[&]");
+    for (String strSplit : arrSplit) {
+      String[] arrSplitEqual = null;
+      arrSplitEqual = strSplit.split("[=]");
+
+      if (arrSplitEqual.length > 1) {
+
+        mapRequest.put(arrSplitEqual[0], arrSplitEqual[1]);
+
+      } else {
+        if (arrSplitEqual[0] != "") {
+
+          mapRequest.put(arrSplitEqual[0], "");
+        }
+      }
+    }
+    return mapRequest;
+  }
+
+  /**
+   * GetSearchInfo: Get search information from url link
+   *
+   * @param URL request url
+   * @return search params
+   * @throws UnsupportedEncodingException UnsupportedEncodingException
+   */
+  public String getSearchInfo(String URL) throws UnsupportedEncodingException {
+    List<String> info = new ArrayList<String>();
+    String keyword = "";
+    Map<String, String> mapRequest = RequestUrl.uRLRequest(URL);
+    if (mapRequest.get("search") != null) {
+      try {
+        keyword = mapRequest.get("search");
+
+        keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", 
"%25"), "UTF-8");
+        if (keyword.contains("%2b") || keyword.contains("%20") || 
keyword.contains("%25")) {
+          keyword = keyword.replace("%2b", " ");
+          keyword = keyword.replace("%20", " ");
+          keyword = keyword.replace("%25", " ");
+        }
+
+        keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " 
").replaceAll("\\s+", " ").trim();
+
+      } catch (UnsupportedEncodingException e) {
+        LOG.error(mapRequest.get("search"));
+        e.printStackTrace();
+      }
+      if (!"".equals(keyword)) {
+        info.add(keyword);
+      }
+
+    }
+
+    if (mapRequest.get("ids") != null && mapRequest.get("values") != null) {
+      String id_raw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8");
+      String value_raw = URLDecoder.decode(mapRequest.get("values"), "UTF-8");
+      String[] ids = id_raw.split(":");
+      String[] values = value_raw.split(":");
+
+      int a = ids.length;
+      int b = values.length;
+      int l = a < b ? a : b;
+
+      for (int i = 0; i < l; i++) {
+        if (ids[i].equals("collections") || ids[i].equals("measurement") || 
ids[i].equals("sensor") || ids[i].equals("platform") || 
ids[i].equals("variable") || ids[i].equals("spatialcoverage")) {
+          try {
+            values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25");
+            if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && 
!URLDecoder.decode(values[i], "UTF-8").equals("")) {
+              String item = URLDecoder.decode(values[i], "UTF-8").trim();
+              if (item.contains("%2b") || item.contains("%20") || 
item.contains("%25")) {
+                item = item.replace("%2b", " ");
+                item = item.replace("%20", " ");
+                item = item.replace("%25", " ");
+              }
+              item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " 
").replaceAll("\\s+", " ").trim();
+              info.add(item);
+            }
+          } catch (Exception e) {
+            LOG.error(values[i]);
+            e.printStackTrace();
+          }
+        }
+
+      }
+    }
+
+    return String.join(",", info);
+  }
+
+  /**
+   * GetSearchWord: Get search words from url link
+   *
+   * @param url request url
+   * @return query
+   */
+  public static String getSearchWord(String url) {
+    String keyword = "";
+
+    Map<String, String> mapRequest = RequestUrl.uRLRequest(url);
+    if (mapRequest.get("search") != null) {
+      try {
+        keyword = mapRequest.get("search");
+
+        keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", 
"%25"), "UTF-8");
+        if (keyword.contains("%2b") || keyword.contains("%20") || 
keyword.contains("%25")) {
+          keyword = keyword.replace("%2b", " ");
+          keyword = keyword.replace("%20", " ");
+          keyword = keyword.replace("%25", " ");
+        }
+        keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " 
").replaceAll("\\s+", " ").trim();
+      } catch (UnsupportedEncodingException e) {
+        LOG.error(mapRequest.get("search"));
+        e.printStackTrace();
+      }
+    }
+
+    return keyword;
+  }
+
+  /**
+   * GetFilterInfo: Get filter params from url link
+   *
+   * @param url request url
+   * @return filter facet key pair map
+   * @throws UnsupportedEncodingException UnsupportedEncodingException
+   */
+  public static Map<String, String> getFilterInfo(String url) throws 
UnsupportedEncodingException {
+    List<String> info = new ArrayList<>();
+    Map<String, String> filterValues = new HashMap<>();
+
+    String keyword = "";
+    Map<String, String> mapRequest = RequestUrl.uRLRequest(url);
+    if (mapRequest.get("search") != null) {
+      try {
+        keyword = mapRequest.get("search");
+
+        keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", 
"%25"), "UTF-8");
+        if (keyword.contains("%2b") || keyword.contains("%20") || 
keyword.contains("%25")) {
+          keyword = keyword.replace("%2b", " ");
+          keyword = keyword.replace("%20", " ");
+          keyword = keyword.replace("%25", " ");
+        }
+        keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " 
").replaceAll("\\s+", " ").trim();
+
+      } catch (UnsupportedEncodingException e) {
+        LOG.error(mapRequest.get("search"));
+        e.printStackTrace();
+      }
+      if (!"".equals(keyword)) {
+        info.add(keyword);
+      }
+
+    }
+
+    if (mapRequest.get("ids") != null && mapRequest.get("values") != null) {
+      String idRaw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8");
+      String valueRaw = URLDecoder.decode(mapRequest.get("values"), "UTF-8");
+      String[] ids = idRaw.split(":");
+      String[] values = valueRaw.split(":");
+
+      int a = ids.length;
+      int b = values.length;
+      int l = a < b ? a : b;
+
+      for (int i = 0; i < l; i++) {
+        try {
+          values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25");
+          if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && 
!URLDecoder.decode(values[i], "UTF-8").equals("")) {
+            String item = URLDecoder.decode(values[i], "UTF-8").trim();
+            if (item.contains("%2b") || item.contains("%20") || 
item.contains("%25")) {
+              item = item.replace("%2b", " ");
+              item = item.replace("%20", " ");
+              item = item.replace("%25", " ");
+            }
+            item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " 
").replaceAll("\\s+", " ").trim();
+            filterValues.put(ids[i], item);
+          }
+        } catch (Exception e) {
+          LOG.error(values[i]);
+          e.printStackTrace();
+        }
+      }
+    }
+
+    if (mapRequest.get("temporalsearch") != null) {
+      String temporalsearch = mapRequest.get("temporalsearch");
+      temporalsearch = 
URLDecoder.decode(temporalsearch.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), 
"UTF-8");
+
+      filterValues.put("temporalsearch", temporalsearch);
+    }
+
+    return filterValues;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java 
b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java
new file mode 100644
index 0000000..15c3d81
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.structure;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortOrder;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * ClassName: Session Function: Session operations.
+ */
+public class Session /*extends MudrodAbstract*/ implements Comparable<Session> 
{
+  private static final Logger LOG = LoggerFactory.getLogger(Session.class);
+  // start: start time of session
+  private String start;
+  // end: end time of session
+  private String end;
+  // id: original session ID
+  private String id;
+  // newid: new session ID
+  private String newid = null;
+  // fmt: time formatter
+  private DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
+
+  private ESDriver es;
+  private Properties props;
+
+  /**
+   * Creates a new instance of Session.
+   *
+   * @param props the Mudrod configuration
+   * @param es    the Elasticsearch drive
+   * @param start start time of session
+   * @param end   end time of session
+   * @param id    session ID
+   */
+  public Session(Properties props, ESDriver es, String start, String end, 
String id) {
+    this.start = start;
+    this.end = end;
+    this.id = id;
+
+    this.props = props;
+    this.es = es;
+  }
+
+  /**
+   * Creates a new instance of Session.
+   *
+   * @param props the Mudrod configuration
+   * @param es    the Elasticsearch drive
+   */
+  public Session(Properties props, ESDriver es) {
+    this.props = props;
+    this.es = es;
+  }
+
+  /**
+   * getID: Get original session ID
+   *
+   * @return session id
+   */
+  public String getID() {
+    return id;
+  }
+
+  /**
+   * getNewID: Get new session ID
+   *
+   * @return new session id
+   */
+  public String getNewID() {
+    return newid;
+  }
+
+  /**
+   * setNewID: Set new session ID
+   *
+   * @param str: session ID
+   * @return new session id
+   */
+  public String setNewID(String str) {
+    return newid = str;
+  }
+
+  /**
+   * getStartTime:Get start time of current session
+   *
+   * @return start time of session
+   */
+  public String getStartTime() {
+    return start;
+  }
+
+  /**
+   * getEndTime:Get end time of current session
+   *
+   * @return end time of session
+   */
+  public String getEndTime() {
+    return end;
+  }
+
+  /**
+   * Compare current session with another session
+   *
+   * @see java.lang.Comparable#compareTo(java.lang.Object)
+   */
+  @Override
+  public int compareTo(Session o) {
+    fmt.parseDateTime(this.end);
+    fmt.parseDateTime(o.end);
+    // ascending order
+    return Seconds.secondsBetween(fmt.parseDateTime(o.end), 
fmt.parseDateTime(this.end)).getSeconds();
+
+  }
+
+  /**
+   * getSessionDetail:Get detail of current session, which is used for session
+   * tree reconstruct
+   *
+   * @param indexName    name of index from which you wish to obtain session 
detail.
+   * @param type: Session type name in Elasticsearch
+   * @param sessionID:   Session ID
+   * @return Session details in Json format
+   */
+  public JsonObject getSessionDetail(String indexName, String type, String 
sessionID) {
+    JsonObject sessionResults = new JsonObject();
+    // for session tree
+    SessionTree tree = null;
+    JsonElement jsonRequest = null;
+    try {
+      tree = this.getSessionTree(indexName, type, sessionID);
+      JsonObject jsonTree = tree.treeToJson(tree.root);
+      sessionResults.add("treeData", jsonTree);
+
+      jsonRequest = this.getRequests(type, sessionID);
+      sessionResults.add("RequestList", jsonRequest);
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Encoding error detected.", e);
+
+    }
+
+    return sessionResults;
+  }
+
+  /**
+   * getClickStreamList: Extracted click stream list from current session.
+   *
+   * @param indexName    an index from which to query for a session list
+   * @param type: Session type name in Elasticsearch
+   * @param sessionID:   Session ID
+   * @return Click stram data list
+   * {@link ClickStream}
+   */
+  public List<ClickStream> getClickStreamList(String indexName, String type, 
String sessionID) {
+    SessionTree tree = null;
+    try {
+      tree = this.getSessionTree(indexName, type, sessionID);
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Erro whilst obtaining the Session Tree: {}", e);
+    }
+
+    List<ClickStream> clickthroughs = tree.getClickStreamList();
+    return clickthroughs;
+  }
+
+  /**
+   * Method of converting a given session to a tree structure
+   *
+   * @param type session type name in Elasticsearch
+   * @param sessionID   ID of session
+   * @return an instance of session tree structure
+   * @throws UnsupportedEncodingException UnsupportedEncodingException
+   */
+  private SessionTree getSessionTree(String indexName, String type, String 
sessionID) throws UnsupportedEncodingException {
+
+    SearchResponse response = 
es.getClient().prepareSearch(indexName).setTypes(type).setQuery(QueryBuilders.termQuery("SessionID",
 sessionID)).setSize(100).addSort("Time", SortOrder.ASC)
+        .execute().actionGet();
+
+    SessionTree tree = new SessionTree(this.props, this.es, sessionID, type);
+    int seq = 1;
+    for (SearchHit hit : response.getHits().getHits()) {
+      Map<String, Object> result = hit.getSource();
+      String request = (String) result.get("Request");
+      String time = (String) result.get("Time");
+      String logType = (String) result.get("LogType");
+      String referer = (String) result.get("Referer");
+
+      SessionNode node = new SessionNode(request, logType, referer, time, seq);
+      tree.insert(node);
+      seq++;
+    }
+
+    return tree;
+  }
+
+  /**
+   * Method of getting all requests from a given current session
+   *
+   * @param cleanuptype Session type name in Elasticsearch
+   * @param sessionID   Session ID
+   * @return all of these requests in JSON
+   * @throws UnsupportedEncodingException UnsupportedEncodingException
+   */
+  private JsonElement getRequests(String cleanuptype, String sessionID) throws 
UnsupportedEncodingException {
+    SearchResponse response = 
es.getClient().prepareSearch(props.getProperty("indexName")).setTypes(cleanuptype).setQuery(QueryBuilders.termQuery("SessionID",
 sessionID)).setSize(100)
+        .addSort("Time", SortOrder.ASC).execute().actionGet();
+
+    Gson gson = new Gson();
+    List<JsonObject> requestList = new ArrayList<>();
+    int seq = 1;
+    for (SearchHit hit : response.getHits().getHits()) {
+      Map<String, Object> result = hit.getSource();
+      String request = (String) result.get("Request");
+      String requestUrl = (String) result.get("RequestUrl");
+      String time = (String) result.get("Time");
+      String logType = (String) result.get("LogType");
+      String referer = (String) result.get("Referer");
+
+      JsonObject req = new JsonObject();
+      req.addProperty("Time", time);
+      req.addProperty("Request", request);
+      req.addProperty("RequestURL", requestUrl);
+      req.addProperty("LogType", logType);
+      req.addProperty("Referer", referer);
+      req.addProperty("Seq", seq);
+      requestList.add(req);
+
+      seq++;
+    }
+    return gson.toJsonTree(requestList);
+  }
+
+  /**
+   * getClickStreamList: Extracted ranking training data from current session.
+   *
+   * @param indexName    an index from which to obtain ranked training data.
+   * @param cleanuptype: Session type name in Elasticsearch
+   * @param sessionID:   Session ID
+   * @return Click stram data list
+   * {@link ClickStream}
+   */
+  public List<RankingTrainData> getRankingTrainData(String indexName, String 
cleanuptype, String sessionID) {
+    SessionTree tree = null;
+    try {
+      tree = this.getSessionTree(indexName, cleanuptype, sessionID);
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Error whilst retreiving Session Tree: {}", e);
+    }
+
+    List<RankingTrainData> trainData = new ArrayList<>();
+    try {
+      trainData = tree.getRankingTrainData(indexName, sessionID);
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Error whilst retreiving ranking training data: {}", e);
+    }
+
+    return trainData;
+  }
+}

Reply via email to