LENS-693 : Queries get purged as soon as they are finished

Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/669e8727
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/669e8727
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/669e8727

Branch: refs/heads/current-release-line
Commit: 669e8727292a297487dbeb2b3edc6f17c2cea651
Parents: d457dd0
Author: Rajat Khandelwal <pro...@apache.org>
Authored: Wed Sep 23 14:36:40 2015 +0530
Committer: Amareshwari Sriramadasu <amareshw...@apache.org>
Committed: Wed Sep 23 14:36:40 2015 +0530

----------------------------------------------------------------------
 .../main/java/org/apache/lens/api/LensConf.java |   7 +-
 .../lens/api/query/InMemoryQueryResult.java     |  24 +-
 .../org/apache/lens/api/query/LensQuery.java    |   8 +-
 .../lens/api/query/PersistentQueryResult.java   |  18 +
 .../org/apache/lens/api/query/QueryResult.java  |   4 +-
 .../org/apache/lens/api/query/QueryStatus.java  |   1 +
 .../lens/cli/commands/LensQueryCommands.java    |   9 +-
 .../org/apache/lens/driver/hive/HiveDriver.java |   5 +-
 .../driver/hive/HivePersistentResultSet.java    |   2 +-
 .../apache/lens/driver/hive/TestHiveDriver.java |   2 +-
 .../lens/server/api/LensConfConstants.java      |   8 +-
 .../server/api/driver/InMemoryResultSet.java    |  24 +-
 .../lens/server/api/driver/LensResultSet.java   |   9 +
 .../server/api/driver/PersistentResultSet.java  |  19 +-
 .../server/api/query/FinishedLensQuery.java     |   5 +-
 .../lens/server/api/query/QueryCancelled.java   |  10 +-
 .../lens/server/api/query/QueryClosed.java      |  10 +-
 .../lens/server/api/query/QueryContext.java     |   6 +-
 .../lens/server/api/query/QueryEnded.java       |   8 +-
 .../lens/server/api/query/QueryFailed.java      |  10 +-
 .../lens/server/api/query/QuerySuccess.java     |  10 +-
 .../lens/server/api/driver/MockDriver.java      |   9 +-
 .../lens/server/query/LensPersistentResult.java |  73 +++-
 .../lens/server/query/QueryEndNotifier.java     |  79 ++---
 .../server/query/QueryExecutionServiceImpl.java | 345 +++++++++----------
 .../QueryExecutionStatisticsGenerator.java      |  11 +-
 .../lens/server/query/ResultFormatter.java      |  17 +-
 .../src/main/resources/lensserver-default.xml   |   6 +-
 .../org/apache/lens/server/LensJerseyTest.java  |  21 +-
 .../lens/server/common/RestAPITestUtil.java     | 122 +++++--
 .../lens/server/query/TestEventService.java     |  11 +-
 .../server/query/TestQueryEndEmailNotifier.java | 196 +++++------
 .../lens/server/query/TestQueryService.java     | 253 +++++---------
 .../lens/server/query/TestResultFormatting.java |  11 +-
 lens-server/src/test/resources/lens-site.xml    |   6 +-
 src/site/apt/admin/config.apt                   |  42 +--
 36 files changed, 754 insertions(+), 647 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/LensConf.java
----------------------------------------------------------------------
diff --git a/lens-api/src/main/java/org/apache/lens/api/LensConf.java 
b/lens-api/src/main/java/org/apache/lens/api/LensConf.java
index 3b1ad87..ff965d6 100644
--- a/lens-api/src/main/java/org/apache/lens/api/LensConf.java
+++ b/lens-api/src/main/java/org/apache/lens/api/LensConf.java
@@ -63,11 +63,14 @@ public class LensConf implements Serializable {
     properties.put(key, value);
   }
 
+  public void addProperty(Object key, Object value) {
+    properties.put(String.valueOf(key), String.valueOf(value));
+  }
+
   /**
    * Adds Map of properties.
    *
-   * @param key   the key
-   * @param value the value
+   * @param props the properties
    */
   public void addProperties(Map<String, String> props) {
     properties.putAll(props);

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java
----------------------------------------------------------------------
diff --git 
a/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java 
b/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java
index 6a9f455..36d0ba0 100644
--- a/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java
+++ b/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java
@@ -27,10 +27,7 @@ import java.util.List;
 import javax.xml.bind.annotation.XmlElementWrapper;
 import javax.xml.bind.annotation.XmlRootElement;
 
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
 /**
  * The Class InMemoryQueryResult.
@@ -48,11 +45,26 @@ import lombok.NoArgsConstructor;
  */
 @NoArgsConstructor(access = AccessLevel.PROTECTED)
 public class InMemoryQueryResult extends QueryResult {
-
+  public static final String DECLARATION = "Result available in memory, 
attaching here: \n\n";
   /**
    * The rows.
    */
   @XmlElementWrapper
   @Getter
-  private List<ResultRow> rows = new ArrayList<ResultRow>();
+  private List<ResultRow> rows = new ArrayList<>();
+
+  public String toPrettyString() {
+    StringBuilder b = new StringBuilder();
+    b.append(DECLARATION);
+    int numRows = 0;
+    for (ResultRow row : getRows()) {
+      for (Object col : row.getValues()) {
+        b.append(col).append("\t");
+      }
+      numRows++;
+      b.append("\n");
+    }
+    b.append(numRows).append(" rows ");
+    return b.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java
----------------------------------------------------------------------
diff --git a/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java 
b/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java
index af439ff..204ecee 100644
--- a/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java
+++ b/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java
@@ -27,10 +27,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 import org.apache.lens.api.LensConf;
 import org.apache.lens.api.Priority;
 
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
 /**
  * The Class LensQuery.
@@ -81,6 +78,7 @@ import lombok.NoArgsConstructor;
  * Instantiates a new lens query.
  */
 @NoArgsConstructor(access = AccessLevel.PROTECTED)
+@EqualsAndHashCode
 public class LensQuery {
 
   /**
@@ -214,7 +212,7 @@ public class LensQuery {
   }
 
   public String getErrorMessage() {
-    return (this.status!=null) ? this.status.getLensErrorTOErrorMsg() : null;
+    return (this.status != null) ? this.status.getLensErrorTOErrorMsg() : null;
   }
 
   public String getQueryHandleString() {

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java
----------------------------------------------------------------------
diff --git 
a/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java 
b/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java
index a2e10c6..cb72cdc 100644
--- 
a/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java
+++ 
b/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java
@@ -69,4 +69,22 @@ public class PersistentQueryResult extends QueryResult {
   @Getter
   private Long fileSize;
 
+  @XmlElement
+  @Getter
+  private String httpResultUrl;
+
+  @Override
+  public String toPrettyString() {
+    StringBuilder sb = new StringBuilder().append("Result available at 
").append(persistedURI).append(".");
+    if (numRows != null) {
+      sb.append(" Number of rows: ").append(numRows).append(".");
+    }
+    if (fileSize != null) {
+      sb.append(" File size: ").append(fileSize).append(".");
+    }
+    if (httpResultUrl != null) {
+      sb.append(" Downloadable from ").append(httpResultUrl).append(".");
+    }
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java
----------------------------------------------------------------------
diff --git a/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java 
b/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java
index 3ecf0c2..3bc7cc7 100644
--- a/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java
+++ b/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java
@@ -24,6 +24,8 @@ package org.apache.lens.api.query;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlSeeAlso;
 
+import org.apache.lens.api.result.PrettyPrintable;
+
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
@@ -36,5 +38,5 @@ import lombok.NoArgsConstructor;
  * Instantiates a new query result.
  */
 @NoArgsConstructor(access = AccessLevel.PROTECTED)
-public class QueryResult {
+public abstract class QueryResult implements PrettyPrintable {
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java
----------------------------------------------------------------------
diff --git a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java 
b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java
index 3c8531f..91cbe39 100644
--- a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java
+++ b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java
@@ -264,6 +264,7 @@ public class QueryStatus implements Serializable {
       break;
     case EXECUTED:
       switch (newState) {
+      case EXECUTED:
       case SUCCESSFUL:
       case FAILED:
       case CANCELED:

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java
----------------------------------------------------------------------
diff --git 
a/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java 
b/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java
index 006eaed..b6e3fcf 100644
--- a/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java
+++ b/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java
@@ -120,14 +120,7 @@ public class LensQueryCommands extends BaseLensCommand {
       QueryResult r = rs.getResultSet().getResult();
       if (r instanceof InMemoryQueryResult) {
         InMemoryQueryResult temp = (InMemoryQueryResult) r;
-        for (ResultRow row : temp.getRows()) {
-          for (Object col : row.getValues()) {
-            b.append(col).append("\t");
-          }
-          numRows++;
-          b.append("\n");
-        }
-        b.append(numRows + " rows ");
+        b.append(temp.toPrettyString());
       } else {
         PersistentQueryResult temp = (PersistentQueryResult) r;
         b.append("Results of query stored at : 
").append(temp.getPersistedURI()).append("  ");

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java 
b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
index 31c343a..4561ccf 100644
--- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
+++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
@@ -842,7 +842,7 @@ public class HiveDriver implements LensDriver {
     log.info("Creating result set for hiveHandle:{}", op);
     try {
       if (context.isDriverPersistent()) {
-        return new HivePersistentResultSet(new Path(context.getHdfsoutPath()), 
op, getClient());
+        return new HivePersistentResultSet(new 
Path(context.getDriverResultPath()), op, getClient());
       } else if (op.hasResultSet()) {
         return new HiveInMemoryResultSet(op, getClient(), closeAfterFetch);
       } else {
@@ -874,7 +874,8 @@ public class HiveDriver implements LensDriver {
       Path resultSetPath = context.getHDFSResultDir();
       // create query
       StringBuilder builder = new StringBuilder("INSERT OVERWRITE DIRECTORY ");
-      
context.setHdfsoutPath(resultSetPath.makeQualified(resultSetPath.getFileSystem(context.getConf())).toString());
+      context.setDriverResultPath(
+        
resultSetPath.makeQualified(resultSetPath.getFileSystem(context.getConf())).toString());
       builder.append('"').append(resultSetPath).append("\" ");
       String outputDirFormat = 
qdconf.get(LensConfConstants.QUERY_OUTPUT_DIRECTORY_FORMAT);
       if (outputDirFormat != null) {

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java
 
b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java
index 00e1e53..746e17e 100644
--- 
a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java
+++ 
b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java
@@ -64,7 +64,7 @@ public class HivePersistentResultSet extends 
PersistentResultSet {
   }
 
   @Override
-  public Long fileSize() throws LensException {
+  public Long getFileSize() throws LensException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
 
b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
index 78b3320..2cb3736 100644
--- 
a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
+++ 
b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
@@ -793,7 +793,7 @@ public class TestHiveDriver {
     assertEquals(0, driver.getHiveHandleSize());
     HivePersistentResultSet persistentResultSet = (HivePersistentResultSet) 
resultSet;
     String path = persistentResultSet.getOutputPath();
-    assertEquals(ctx.getHdfsoutPath(), path);
+    assertEquals(ctx.getDriverResultPath(), path);
     driver.closeQuery(plan2.getHandle());
   }
 

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
index 096d26e..f202603 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
@@ -774,14 +774,14 @@ public final class LensConfConstants {
   // Query Purge Configuration
 
   /**
-   * The Constant MAX_NUMBER_OF_FINISHED_QUERY.
+   * The Constant PURGE_INTERVAL.
    */
-  public static final String MAX_NUMBER_OF_FINISHED_QUERY = SERVER_PFX + 
"max.finished.queries";
+  public static final String PURGE_INTERVAL = SERVER_PFX + 
"querypurger.sleep.interval";
 
   /**
-   * The Constant DEFAULT_FINISHED_QUERIES.
+   * The Constant DEFAULT_PURGE_INTERVAL.
    */
-  public static final int DEFAULT_FINISHED_QUERIES = 100;
+  public static final int DEFAULT_PURGE_INTERVAL = 10000;
 
   // Server DB configuration
   /**

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
index 3b76126..c64a3dd 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
@@ -22,16 +22,31 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.lens.api.query.InMemoryQueryResult;
-import org.apache.lens.api.query.QueryResult;
 import org.apache.lens.api.query.ResultRow;
 import org.apache.lens.server.api.error.LensException;
 
+import lombok.Setter;
+
 /**
  * The Class InMemoryResultSet.
  */
 public abstract class InMemoryResultSet extends LensResultSet {
 
   public abstract boolean seekToStart() throws LensException;
+
+  @Setter
+  private boolean fullyAccessed = false;
+
+  @Override
+  public boolean canBePurged() {
+    return fullyAccessed;
+  }
+
+  @Override
+  public String getOutputPath() throws LensException {
+    return null;
+  }
+
   /**
    * Whether there is another result row available.
    *
@@ -60,12 +75,15 @@ public abstract class InMemoryResultSet extends 
LensResultSet {
    *
    * @see org.apache.lens.server.api.driver.LensResultSet#toQueryResult()
    */
-  public QueryResult toQueryResult() throws LensException {
+  public InMemoryQueryResult toQueryResult() throws LensException {
     List<ResultRow> rows = new ArrayList<ResultRow>();
     while (hasNext()) {
       rows.add(next());
     }
+    fullyAccessed = true;
     return new InMemoryQueryResult(rows);
   }
-
+  public boolean isHttpResultAvailable() throws LensException {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java
index 929a302..805b0c1 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java
@@ -25,6 +25,11 @@ import org.apache.lens.server.api.error.LensException;
  * Result set returned by driver.
  */
 public abstract class LensResultSet {
+  /**
+   *
+   * @return true if the result can be purged
+   */
+  public abstract boolean canBePurged();
 
   /**
    * Get the size of the result set.
@@ -39,8 +44,11 @@ public abstract class LensResultSet {
    *
    * @return Returns {@link LensResultSetMetadata}
    */
+
   public abstract LensResultSetMetadata getMetadata() throws LensException;
 
+  public abstract String getOutputPath() throws LensException;
+
   /**
    * Get the corresponding query result object.
    *
@@ -49,4 +57,5 @@ public abstract class LensResultSet {
    */
   public abstract QueryResult toQueryResult() throws LensException;
 
+  public abstract boolean isHttpResultAvailable() throws LensException;
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java
index bbde170..774f1ee 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java
@@ -19,7 +19,6 @@
 package org.apache.lens.server.api.driver;
 
 import org.apache.lens.api.query.PersistentQueryResult;
-import org.apache.lens.api.query.QueryResult;
 import org.apache.lens.server.api.error.LensException;
 
 /**
@@ -27,22 +26,32 @@ import org.apache.lens.server.api.error.LensException;
  */
 public abstract class PersistentResultSet extends LensResultSet {
 
+  @Override
+  public boolean canBePurged() {
+    return true;
+  }
+
   /**
    * Get the size of the result set file.
    *
    * @return The size if available, null if not available.
    * @throws LensException the lens exception
    */
-  public abstract Long fileSize() throws LensException;
+  public abstract Long getFileSize() throws LensException;
 
-  public abstract String getOutputPath() throws LensException;
+  public String getHttpResultUrl() {
+    return null;
+  }
 
   /*
    * (non-Javadoc)
    *
    * @see org.apache.lens.server.api.driver.LensResultSet#toQueryResult()
    */
-  public QueryResult toQueryResult() throws LensException {
-    return new PersistentQueryResult(getOutputPath(), size(), fileSize());
+  public PersistentQueryResult toQueryResult() throws LensException {
+    return new PersistentQueryResult(getOutputPath(), size(), getFileSize(), 
getHttpResultUrl());
+  }
+  public boolean isHttpResultAvailable() throws LensException {
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java
index d8c04db..7a06c44 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java
@@ -24,7 +24,6 @@ import org.apache.lens.api.LensConf;
 import org.apache.lens.api.query.QueryHandle;
 import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.server.api.driver.LensDriver;
-import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy;
 
 import org.apache.hadoop.conf.Configuration;
@@ -44,7 +43,7 @@ import lombok.ToString;
  *
  * @see java.lang.Object#hashCode()
  */
-@EqualsAndHashCode
+@EqualsAndHashCode(exclude = "selectedDriver")
 /*
  * (non-Javadoc)
  *
@@ -198,7 +197,7 @@ public class FinishedLensQuery {
     this.selectedDriver = ctx.getSelectedDriver();
   }
 
-  public QueryContext toQueryContext(Configuration conf, 
Collection<LensDriver> drivers) throws LensException {
+  public QueryContext toQueryContext(Configuration conf, 
Collection<LensDriver> drivers) {
     QueryContext qctx = new QueryContext(userQuery, submitter, new LensConf(), 
conf, drivers, null, submissionTime,
       false);
     qctx.setQueryHandle(QueryHandle.fromString(handle));

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java
index bc0465c..a473a47 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java
@@ -29,6 +29,7 @@ public class QueryCancelled extends QueryEnded {
   /**
    * Instantiates a new query cancelled.
    *
+   * @param ctx       the query context
    * @param eventTime the event time
    * @param prev      the prev
    * @param current   the current
@@ -36,10 +37,15 @@ public class QueryCancelled extends QueryEnded {
    * @param user      the user
    * @param cause     the cause
    */
-  public QueryCancelled(long eventTime, QueryStatus.Status prev, 
QueryStatus.Status current, QueryHandle handle,
+  public QueryCancelled(QueryContext ctx, long eventTime, QueryStatus.Status 
prev, QueryStatus.Status current,
+    QueryHandle handle,
     String user, String cause) {
-    super(eventTime, prev, current, handle, user, cause);
+    super(ctx, eventTime, prev, current, handle, user, cause);
     checkCurrentState(QueryStatus.Status.CANCELED);
   }
 
+  public QueryCancelled(QueryContext ctx, QueryStatus.Status prevState, 
QueryStatus.Status currState, String cause) {
+    // TODO: correct username. put who cancelled it, not the submitter. 
Similar for others
+    this(ctx, ctx.getEndTime(), prevState, currState, ctx.getQueryHandle(), 
ctx.getSubmittedUser(), cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java
index ea8f70f..3837087 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java
@@ -29,6 +29,7 @@ public class QueryClosed extends QueryEnded {
   /**
    * Instantiates a new query closed.
    *
+   * @param ctx       the query context
    * @param eventTime the event time
    * @param prev      the prev
    * @param current   the current
@@ -36,9 +37,14 @@ public class QueryClosed extends QueryEnded {
    * @param user      the user
    * @param cause     the cause
    */
-  public QueryClosed(long eventTime, QueryStatus.Status prev, 
QueryStatus.Status current, QueryHandle handle,
+  public QueryClosed(QueryContext ctx, long eventTime, QueryStatus.Status 
prev, QueryStatus.Status current,
+    QueryHandle handle,
     String user, String cause) {
-    super(eventTime, prev, current, handle, user, cause);
+    super(ctx, eventTime, prev, current, handle, user, cause);
     checkCurrentState(QueryStatus.Status.CLOSED);
   }
+
+  public QueryClosed(QueryContext ctx, QueryStatus.Status prevState, 
QueryStatus.Status currState, String cause) {
+    this(ctx, ctx.getClosedTime(), prevState, currState, ctx.getQueryHandle(), 
ctx.getSubmittedUser(), cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
index 12de0a5..bed79ac 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
@@ -42,10 +42,12 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 
 /**
  * The Class QueryContext.
  */
+@ToString
 public class QueryContext extends AbstractQueryContext {
 
   /**
@@ -95,7 +97,7 @@ public class QueryContext extends AbstractQueryContext {
    */
   @Getter
   @Setter
-  private String hdfsoutPath;
+  private String driverResultPath;
 
   /**
    * The submission time.
@@ -326,7 +328,7 @@ public class QueryContext extends AbstractQueryContext {
   /*
    * Introduced for Recovering finished query.
    */
-  public void setStatusSkippingTransitionTest(final QueryStatus newStatus) 
throws LensException {
+  public void setStatusSkippingTransitionTest(final QueryStatus newStatus) {
     this.status = newStatus;
   }
 

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java
index 3e9474c..e80da6d 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java
@@ -33,6 +33,8 @@ import lombok.Getter;
  */
 public class QueryEnded extends StatusChange {
 
+  @Getter
+  private final QueryContext queryContext;
   /**
    * The user.
    */
@@ -54,6 +56,7 @@ public class QueryEnded extends StatusChange {
   /**
    * Instantiates a new query ended.
    *
+   * @param ctx
    * @param eventTime the event time
    * @param prev      the prev
    * @param current   the current
@@ -61,9 +64,10 @@ public class QueryEnded extends StatusChange {
    * @param user      the user
    * @param cause     the cause
    */
-  public QueryEnded(long eventTime, QueryStatus.Status prev, 
QueryStatus.Status current, QueryHandle handle,
-    String user, String cause) {
+  public QueryEnded(QueryContext ctx, long eventTime, QueryStatus.Status prev, 
QueryStatus.Status current,
+    QueryHandle handle, String user, String cause) {
     super(eventTime, prev, current, handle);
+    this.queryContext = ctx;
     this.user = user;
     this.cause = cause;
     if (!END_STATES.contains(current)) {

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java
index 40a1c0f..bdffbc2 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java
@@ -29,6 +29,7 @@ public class QueryFailed extends QueryEnded {
   /**
    * Instantiates a new query failed.
    *
+   * @param ctx       the query context
    * @param eventTime the event time
    * @param prev      the prev
    * @param current   the current
@@ -36,9 +37,14 @@ public class QueryFailed extends QueryEnded {
    * @param user      the user
    * @param cause     the cause
    */
-  public QueryFailed(long eventTime, QueryStatus.Status prev, 
QueryStatus.Status current, QueryHandle handle,
+  public QueryFailed(QueryContext ctx, long eventTime, QueryStatus.Status 
prev, QueryStatus.Status current,
+    QueryHandle handle,
     String user, String cause) {
-    super(eventTime, prev, current, handle, user, cause);
+    super(ctx, eventTime, prev, current, handle, user, cause);
     checkCurrentState(QueryStatus.Status.FAILED);
   }
+
+  public QueryFailed(QueryContext ctx, QueryStatus.Status prevState, 
QueryStatus.Status currState, String cause) {
+    this(ctx, ctx.getEndTime(), prevState, currState, ctx.getQueryHandle(), 
ctx.getSubmittedUser(), cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java
index c551dba..298fdbb 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java
@@ -29,13 +29,19 @@ public class QuerySuccess extends QueryEnded {
   /**
    * Instantiates a new query success.
    *
+   * @param ctx       the query context
    * @param eventTime the event time
    * @param prev      the prev
    * @param current   the current
    * @param handle    the handle
    */
-  public QuerySuccess(long eventTime, QueryStatus.Status prev, 
QueryStatus.Status current, QueryHandle handle) {
-    super(eventTime, prev, current, handle, null, null);
+  public QuerySuccess(QueryContext ctx, long eventTime, QueryStatus.Status 
prev, QueryStatus.Status current,
+    QueryHandle handle) {
+    super(ctx, eventTime, prev, current, handle, null, null);
     checkCurrentState(QueryStatus.Status.SUCCESSFUL);
   }
+
+  public QuerySuccess(QueryContext ctx, QueryStatus.Status prevState, 
QueryStatus.Status currState) {
+    this(ctx, ctx.getEndTime(), prevState, currState, ctx.getQueryHandle());
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
 
b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
index b28669e..2d86589 100644
--- 
a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
+++ 
b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
@@ -255,7 +255,7 @@ public class MockDriver implements LensDriver {
       }
 
       @Override
-      public Long fileSize() throws LensException {
+      public Long getFileSize() throws LensException {
         // TODO Auto-generated method stub
         return null;
       }
@@ -297,7 +297,7 @@ public class MockDriver implements LensDriver {
    * @see 
org.apache.lens.server.api.driver.LensDriver#fetchResultSet(org.apache.lens.server.api.query.QueryContext)
    */
   @Override
-  public LensResultSet fetchResultSet(QueryContext context) throws 
LensException {
+  public LensResultSet fetchResultSet(final QueryContext context) throws 
LensException {
     return new InMemoryResultSet() {
 
       @Override
@@ -340,6 +340,11 @@ public class MockDriver implements LensDriver {
         // TODO Auto-generated method stub
         return false;
       }
+
+      @Override
+      public boolean canBePurged() {
+        return true;
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java
index b65a5f4..1e9a182 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java
@@ -18,13 +18,27 @@
  */
 package org.apache.lens.server.query;
 
+import java.io.IOException;
+
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.driver.LensResultSetMetadata;
 import org.apache.lens.server.api.driver.PersistentResultSet;
 import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.query.FinishedLensQuery;
+import org.apache.lens.server.api.query.QueryContext;
 
-/**
- * The Class LensPersistentResult.
- */
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/** The Class LensPersistentResult. */
+@Slf4j
 public class LensPersistentResult extends PersistentResultSet {
 
   /** The metadata. */
@@ -38,19 +52,45 @@ public class LensPersistentResult extends 
PersistentResultSet {
 
   /** The file size. */
   private final Long fileSize;
+  private final Configuration conf;
+  @Getter
+  private String httpResultUrl = null;
 
   /**
    * Instantiates a new lens persistent result.
-   *
-   * @param metadata   the metadata
-   * @param outputPath the output path
-   * @param numRows    the num rows
+   *  @param queryHandle the query handle
+   * @param metadata    the metadata
+   * @param outputPath  the output path
+   * @param numRows     the num rows
+   * @param conf        the lens server conf
    */
-  public LensPersistentResult(LensResultSetMetadata metadata, String 
outputPath, Integer numRows, Long fileSize) {
+  public LensPersistentResult(QueryHandle queryHandle, LensResultSetMetadata 
metadata, String outputPath, Integer
+    numRows, Long fileSize,
+    Configuration conf) {
     this.metadata = metadata;
     this.outputPath = outputPath;
     this.numRows = numRows;
     this.fileSize = fileSize;
+    this.conf = conf;
+    if (isHttpResultAvailable()) {
+      this.httpResultUrl = conf.get(LensConfConstants.SERVER_BASE_URL, 
LensConfConstants.DEFAULT_SERVER_BASE_URL)
+        + "queryapi/queries/" + queryHandle + "/httpresultset";
+    }
+  }
+
+  public LensPersistentResult(QueryContext ctx, Configuration conf) {
+    this(ctx.getQueryHandle(),
+      ctx.getQueryOutputFormatter().getMetadata(),
+      ctx.getQueryOutputFormatter().getFinalOutputPath(),
+      ctx.getQueryOutputFormatter().getNumRows(),
+      ctx.getQueryOutputFormatter().getFileSize(), conf);
+  }
+
+  public LensPersistentResult(FinishedLensQuery query, Configuration conf, 
ObjectMapper mapper) throws
+    ClassNotFoundException, IOException {
+    this(QueryHandle.fromString(query.getHandle()),
+      mapper.readValue(query.getMetadata(), (Class<LensResultSetMetadata>) 
Class.forName(query.getMetadataClass())),
+      query.getResult(), query.getRows(), query.getFileSize(), conf);
   }
 
   @Override
@@ -69,7 +109,7 @@ public class LensPersistentResult extends 
PersistentResultSet {
   }
 
   @Override
-  public Long fileSize() throws LensException {
+  public Long getFileSize() throws LensException {
     return fileSize;
   }
 
@@ -77,4 +117,19 @@ public class LensPersistentResult extends 
PersistentResultSet {
   public LensResultSetMetadata getMetadata() throws LensException {
     return metadata;
   }
+
+  @Override
+  public boolean isHttpResultAvailable() {
+    try {
+      final Path resultPath = new Path(getOutputPath());
+      FileSystem fs = resultPath.getFileSystem(conf);
+      if (fs.isDirectory(resultPath)) {
+        return false;
+      }
+    } catch (IOException | LensException e) {
+      log.warn("Unable to get status for Result Directory", e);
+      return false;
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
index 45ba7ac..110624a 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
@@ -18,6 +18,8 @@
  */
 package org.apache.lens.server.query;
 
+import static org.apache.lens.server.api.LensConfConstants.*;
+
 import java.util.Date;
 import java.util.Properties;
 
@@ -32,7 +34,7 @@ import javax.mail.internet.MimeMultipart;
 
 import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.server.LensServices;
-import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.events.AsyncEventListener;
 import org.apache.lens.server.api.metrics.MetricsService;
 import org.apache.lens.server.api.query.QueryContext;
@@ -58,9 +60,6 @@ public class QueryEndNotifier extends 
AsyncEventListener<QueryEnded> {
   /** The Constant EMAIL_ERROR_COUNTER. */
   public static final String EMAIL_ERROR_COUNTER = "email-send-errors";
 
-  /** The conf. */
-  private final HiveConf conf;
-
   /** The from. */
   private final String from;
 
@@ -78,37 +77,33 @@ public class QueryEndNotifier extends 
AsyncEventListener<QueryEnded> {
 
   private final LogSegregationContext logSegregationContext;
 
-  /**
-   * Instantiates a new query end notifier.
+  /** Instantiates a new query end notifier.
    *
    * @param queryService the query service
-   * @param hiveConf     the hive conf
-   */
+   * @param hiveConf     the hive conf */
   public QueryEndNotifier(QueryExecutionServiceImpl queryService, HiveConf 
hiveConf,
-      @NonNull final LogSegregationContext logSegregationContext) {
+    @NonNull final LogSegregationContext logSegregationContext) {
     this.queryService = queryService;
-    this.conf = hiveConf;
-    from = conf.get(LensConfConstants.MAIL_FROM_ADDRESS);
-    host = conf.get(LensConfConstants.MAIL_HOST);
-    port = conf.get(LensConfConstants.MAIL_PORT);
-    mailSmtpTimeout = Integer.parseInt(
-        conf.get(LensConfConstants.MAIL_SMTP_TIMEOUT, 
LensConfConstants.MAIL_DEFAULT_SMTP_TIMEOUT));
-    mailSmtpConnectionTimeout = 
Integer.parseInt(conf.get(LensConfConstants.MAIL_SMTP_CONNECTIONTIMEOUT,
-      LensConfConstants.MAIL_DEFAULT_SMTP_CONNECTIONTIMEOUT));
+    HiveConf conf = hiveConf;
+    from = conf.get(MAIL_FROM_ADDRESS);
+    host = conf.get(MAIL_HOST);
+    port = conf.get(MAIL_PORT);
+    mailSmtpTimeout = Integer.parseInt(conf.get(MAIL_SMTP_TIMEOUT, 
MAIL_DEFAULT_SMTP_TIMEOUT));
+    mailSmtpConnectionTimeout = 
Integer.parseInt(conf.get(MAIL_SMTP_CONNECTIONTIMEOUT,
+      MAIL_DEFAULT_SMTP_CONNECTIONTIMEOUT));
     this.logSegregationContext = logSegregationContext;
   }
 
   /*
    * (non-Javadoc)
    *
-   * @see 
org.apache.lens.server.api.events.AsyncEventListener#process(org.apache.lens.server.api.events.LensEvent)
-   */
+   * @see 
org.apache.lens.server.api.events.AsyncEventListener#process(org.apache.lens.server.api.events.LensEvent)
 */
   @Override
-  public void process(QueryEnded event) {
+  public void process(final QueryEnded event) {
     if (event.getCurrentValue() == QueryStatus.Status.CLOSED) {
       return;
     }
-    QueryContext queryContext = 
queryService.getQueryContext(event.getQueryHandle());
+    QueryContext queryContext = event.getQueryContext();
     if (queryContext == null) {
       log.warn("Could not find the context for {} for event:{}. No email 
generated", event.getQueryHandle(),
         event.getCurrentValue());
@@ -116,45 +111,36 @@ public class QueryEndNotifier extends 
AsyncEventListener<QueryEnded> {
     }
     
this.logSegregationContext.setLogSegragationAndQueryId(queryContext.getQueryHandleString());
 
-    boolean whetherMailNotify = 
Boolean.parseBoolean(queryContext.getConf().get(LensConfConstants.QUERY_MAIL_NOTIFY,
-      LensConfConstants.WHETHER_MAIL_NOTIFY_DEFAULT));
+    boolean whetherMailNotify = 
Boolean.parseBoolean(queryContext.getConf().get(QUERY_MAIL_NOTIFY,
+      WHETHER_MAIL_NOTIFY_DEFAULT));
 
     if (!whetherMailNotify) {
       return;
     }
 
     String queryName = queryContext.getQueryName();
-    queryName = queryName == null ? "" : queryName;
-    String mailSubject = "Query " + queryName + " " + 
queryContext.getStatus().getStatus() + ": "
-      + event.getQueryHandle();
+    String mailSubject = "Query " + (StringUtils.isBlank(queryName) ? "" : 
(queryName + " "))
+      + queryContext.getStatus().getStatus() + ": " + event.getQueryHandle();
 
     String mailMessage = createMailMessage(queryContext);
 
     String to = queryContext.getSubmittedUser() + "@" + 
queryService.getServerDomain();
 
-    String cc = 
queryContext.getConf().get(LensConfConstants.QUERY_RESULT_EMAIL_CC,
-      LensConfConstants.QUERY_RESULT_DEFAULT_EMAIL_CC);
+    String cc = queryContext.getConf().get(QUERY_RESULT_EMAIL_CC, 
QUERY_RESULT_DEFAULT_EMAIL_CC);
 
     log.info("Sending completion email for query handle: {}", 
event.getQueryHandle());
     sendMail(host, port, new Email(from, to, cc, mailSubject, mailMessage), 
mailSmtpTimeout, mailSmtpConnectionTimeout);
   }
 
-  /**
-   * Creates the mail message.
+  /** Creates the mail message.
    *
    * @param queryContext the query context
-   * @return the string
-   */
+   * @return the string */
   private String createMailMessage(QueryContext queryContext) {
     StringBuilder msgBuilder = new StringBuilder();
     switch (queryContext.getStatus().getStatus()) {
     case SUCCESSFUL:
-      msgBuilder.append("Result available at ");
-      String baseURI = conf.get(LensConfConstants.SERVER_BASE_URL, 
LensConfConstants.DEFAULT_SERVER_BASE_URL);
-      msgBuilder.append(baseURI);
-      msgBuilder.append("queryapi/queries/");
-      msgBuilder.append(queryContext.getQueryHandle());
-      msgBuilder.append("/httpresultset");
+      msgBuilder.append(getResultMessage(queryContext));
       break;
     case FAILED:
       msgBuilder.append(queryContext.getStatus().getStatusMessage());
@@ -166,13 +152,21 @@ public class QueryEndNotifier extends 
AsyncEventListener<QueryEnded> {
     case CANCELED:
       msgBuilder.append(queryContext.getStatus().getStatusMessage());
       break;
-    case CLOSED:
     default:
       break;
     }
     return msgBuilder.toString();
   }
 
+  private String getResultMessage(QueryContext queryContext) {
+    try {
+      return 
queryService.getResultset(queryContext.getQueryHandle()).toQueryResult().toPrettyString();
+    } catch (LensException e) {
+      log.error("Error retrieving result of query handle {} for sending 
e-mail", queryContext.getQueryHandle(), e);
+      return "Error retrieving result.";
+    }
+  }
+
   @Data
   public static class Email {
     private final String from;
@@ -181,15 +175,14 @@ public class QueryEndNotifier extends 
AsyncEventListener<QueryEnded> {
     private final String subject;
     private final String message;
   }
-  /**
-   * Send mail.
+
+  /** Send mail.
    *
    * @param host                      the host
    * @param port                      the port
    * @param email                     the email
    * @param mailSmtpTimeout           the mail smtp timeout
-   * @param mailSmtpConnectionTimeout the mail smtp connection timeout
-   */
+   * @param mailSmtpConnectionTimeout the mail smtp connection timeout */
   public static void sendMail(String host, String port,
     Email email, int mailSmtpTimeout, int mailSmtpConnectionTimeout) {
     Properties props = System.getProperties();

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 9e27dd4..2ba54e0 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -88,10 +88,12 @@ import org.codehaus.jackson.map.module.SimpleModule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.NonNull;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -159,18 +161,17 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   /**
    * The finished queries.
    */
-  private DelayQueue<FinishedQuery> finishedQueries = new 
DelayQueue<FinishedQuery>();
+  ConcurrentLinkedQueue<FinishedQuery> finishedQueries = new 
ConcurrentLinkedQueue<>();
 
   /**
    * The prepared query queue.
    */
-  private DelayQueue<PreparedQueryContext> preparedQueryQueue = new 
DelayQueue<PreparedQueryContext>();
+  private DelayQueue<PreparedQueryContext> preparedQueryQueue = new 
DelayQueue<>();
 
   /**
    * The prepared queries.
    */
-  private Map<QueryPrepareHandle, PreparedQueryContext> preparedQueries
-    = new HashMap<QueryPrepareHandle, PreparedQueryContext>();
+  private Map<QueryPrepareHandle, PreparedQueryContext> preparedQueries = new 
HashMap<>();
 
   /**
    * The all queries.
@@ -180,7 +181,8 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   /**
    * The conf.
    */
-  private Configuration conf;
+  @VisibleForTesting
+  Configuration conf;
 
   /**
    * The query submitter runnable.
@@ -240,7 +242,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   /**
    * The max finished queries.
    */
-  private int maxFinishedQueries;
+  int purgeInterval;
 
   /**
    * The lens server dao.
@@ -328,7 +330,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
     // Add result formatter
     getEventService().addListenerForType(new ResultFormatter(this, 
this.logSegregationContext), QueryExecuted.class);
-    getEventService().addListenerForType(new 
QueryExecutionStatisticsGenerator(this, getEventService()),
+    getEventService().addListenerForType(new 
QueryExecutionStatisticsGenerator(getEventService()),
       QueryEnded.class);
     getEventService().addListenerForType(
       new QueryEndNotifier(this, getCliService().getHiveConf(), 
this.logSegregationContext), QueryEnded.class);
@@ -435,11 +437,13 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   /**
    * The Class FinishedQuery.
    */
-  private class FinishedQuery implements Delayed {
+  @ToString
+  public class FinishedQuery {
 
     /**
      * The ctx.
      */
+    @Getter
     private final QueryContext ctx;
 
     /**
@@ -462,29 +466,25 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       }
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see java.lang.Comparable#compareTo(java.lang.Object)
-     */
-    @Override
-    public int compareTo(Delayed o) {
-      return (int) (this.finishTime.getTime() - ((FinishedQuery) 
o).finishTime.getTime());
+    public boolean canBePurged() {
+      try {
+        if (getCtx().getStatus().getStatus().equals(SUCCESSFUL)) {
+          if (getCtx().getStatus().isResultSetAvailable()) {
+            LensResultSet rs = getResultset();
+            log.info("Resultset for {} is {}", getQueryHandle(), rs);
+            return rs.canBePurged();
+          }
+        }
+        return true;
+      } catch (Throwable e) {
+        log.error("Error while accessing result set for query handle while 
purging: {}."
+          + " Hence, going ahead with purge", getQueryHandle(), e);
+        return true;
+      }
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see 
java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
-     */
-    @Override
-    public long getDelay(TimeUnit units) {
-      int size = finishedQueries.size();
-      if (size > maxFinishedQueries) {
-        return 0;
-      } else {
-        return Integer.MAX_VALUE;
-      }
+    private LensResultSet getResultset() throws LensException {
+      return QueryExecutionServiceImpl.this.getResultset(getQueryHandle());
     }
 
     /**
@@ -494,16 +494,13 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       return finishTime;
     }
 
-    /**
-     * @return the ctx
-     */
-    public QueryContext getCtx() {
-      return ctx;
-    }
-
     public String getQueryHandleString() {
       return ctx.getQueryHandleString();
     }
+
+    public QueryHandle getQueryHandle() {
+      return ctx.getQueryHandle();
+    }
   }
 
   /**
@@ -523,8 +520,8 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     private final QueryLaunchingConstraintsChecker constraintsChecker;
 
     public QuerySubmitter(@NonNull final ErrorCollection errorCollection,
-        @NonNull final EstimatedQueryCollection waitingQueries,
-        @NonNull final QueryLaunchingConstraintsChecker constraintsChecker) {
+      @NonNull final EstimatedQueryCollection waitingQueries,
+      @NonNull final QueryLaunchingConstraintsChecker constraintsChecker) {
 
       this.errorCollection = errorCollection;
       this.waitingQueries = waitingQueries;
@@ -639,9 +636,9 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
 
     private void checkEstimatedQueriesState(final QueryContext query) throws 
LensException {
-      if  (query.getSelectedDriver() == null || 
query.getSelectedDriverQueryCost() == null) {
-        throw new LensException("selected driver: " + 
query.getSelectedDriver()  +" OR selected driver query cost: "
-            + query.getSelectedDriverQueryCost() + " is null. Query doesn't 
appear to be an estimated query.");
+      if (query.getSelectedDriver() == null || 
query.getSelectedDriverQueryCost() == null) {
+        throw new LensException("selected driver: " + 
query.getSelectedDriver() + " OR selected driver query cost: "
+          + query.getSelectedDriverQueryCost() + " is null. Query doesn't 
appear to be an estimated query.");
       }
     }
   }
@@ -789,10 +786,9 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
             ctx.setStatus(ctx.getDriverStatus().toQueryStatus());
           } catch (LensException exc) {
             // Driver gave exception while updating status
-
             setFailedStatus(ctx, "Status update failed", exc.getMessage(), 
exc.buildLensErrorTO(this.errorCollection));
             log.error("Status update failed for {}", handle, exc);
-
+            return;
           }
           // query is successfully executed by driver and
           // if query result need not be persisted or there is no result 
available in driver, move the query to
@@ -832,10 +828,9 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     QueryHandle query = ctx.getQueryHandle();
     switch (currState) {
     case CANCELED:
-      // TODO: correct username. put who cancelled it, not the submitter. 
Similar for others
-      return new QueryCancelled(ctx.getEndTime(), prevState, currState, query, 
ctx.getSubmittedUser(), null);
+      return new QueryCancelled(ctx, prevState, currState, null);
     case CLOSED:
-      return new QueryClosed(ctx.getClosedTime(), prevState, currState, query, 
ctx.getSubmittedUser(), null);
+      return new QueryClosed(ctx, prevState, currState, null);
     case FAILED:
       StringBuilder msgBuilder = new StringBuilder();
       msgBuilder.append(ctx.getStatus().getStatusMessage());
@@ -843,8 +838,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
         msgBuilder.append("\n Reason:\n");
         msgBuilder.append(ctx.getStatus().getErrorMessage());
       }
-      return new QueryFailed(ctx.getEndTime(), prevState, currState, query, 
ctx.getSubmittedUser(),
-        msgBuilder.toString());
+      return new QueryFailed(ctx, prevState, currState, msgBuilder.toString());
     case LAUNCHED:
       return new QueryLaunched(ctx.getLaunchTime(), prevState, currState, 
query);
     case QUEUED:
@@ -855,7 +849,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     case EXECUTED:
       return new QueryExecuted(ctx.getDriverStatus().getDriverFinishTime(), 
prevState, currState, query);
     case SUCCESSFUL:
-      return new QuerySuccess(ctx.getEndTime(), prevState, currState, query);
+      return new QuerySuccess(ctx, prevState, currState);
     default:
       log.warn("Query {} transitioned to {} state from {} state", query, 
currState, prevState);
       return null;
@@ -905,64 +899,67 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     public void run() {
       log.info("Starting Query purger thread");
       while (!stopped && !queryPurger.isInterrupted()) {
-        FinishedQuery finished = null;
         try {
-          finished = finishedQueries.take();
-          
logSegregationContext.setLogSegragationAndQueryId(finished.getQueryHandleString());
-        } catch (InterruptedException e) {
-          log.info("QueryPurger has been interrupted, exiting");
-          return;
-        }
-        try {
-          FinishedLensQuery finishedQuery = new 
FinishedLensQuery(finished.getCtx());
-          if (finished.ctx.getStatus().getStatus() == SUCCESSFUL) {
-            if (finished.ctx.getStatus().isResultSetAvailable()) {
-              LensResultSet set = 
getResultset(finished.getCtx().getQueryHandle());
-              if (set != null && 
PersistentResultSet.class.isAssignableFrom(set.getClass())) {
-                LensResultSetMetadata metadata = set.getMetadata();
-                String outputPath = ((PersistentResultSet) 
set).getOutputPath();
-                Long fileSize = ((PersistentResultSet) set).fileSize();
-                Integer rows = set.size();
-                finishedQuery.setMetadataClass(metadata.getClass().getName());
-                finishedQuery.setResult(outputPath);
-                finishedQuery.setMetadata(MAPPER.writeValueAsString(metadata));
-                finishedQuery.setRows(rows);
-                finishedQuery.setFileSize(fileSize);
+          Iterator<FinishedQuery> iter = finishedQueries.iterator();
+          FinishedQuery finished;
+          while (iter.hasNext()) {
+            finished = iter.next();
+            if (finished.canBePurged()) {
+              try {
+                FinishedLensQuery finishedQuery = new 
FinishedLensQuery(finished.getCtx());
+                if (finished.ctx.getStatus().getStatus() == SUCCESSFUL) {
+                  if (finished.ctx.getStatus().isResultSetAvailable()) {
+                    try {
+                      LensResultSet set = finished.getResultset();
+                      if (set != null && 
PersistentResultSet.class.isAssignableFrom(set.getClass())) {
+                        LensResultSetMetadata metadata = set.getMetadata();
+                        String outputPath = set.getOutputPath();
+                        Long fileSize = ((PersistentResultSet) 
set).getFileSize();
+                        Integer rows = set.size();
+                        
finishedQuery.setMetadataClass(metadata.getClass().getName());
+                        finishedQuery.setResult(outputPath);
+                        
finishedQuery.setMetadata(MAPPER.writeValueAsString(metadata));
+                        finishedQuery.setRows(rows);
+                        finishedQuery.setFileSize(fileSize);
+                      }
+                    } catch (Exception e) {
+                      log.error("Couldn't obtain result set info for the 
query: {}. Going ahead with purge",
+                        finished.getQueryHandle(), e);
+                    }
+                  }
+                }
+                lensServerDao.insertFinishedQuery(finishedQuery);
+                log.info("Saved query {} to DB", finishedQuery.getHandle());
+                iter.remove();
+              } catch (Exception e) {
+                log.warn("Exception while purging query {}", 
finished.getQueryHandle(), e);
+                continue;
               }
-            }
-          }
-          try {
-            lensServerDao.insertFinishedQuery(finishedQuery);
-            log.info("Saved query {} to DB", finishedQuery.getHandle());
-          } catch (Exception e) {
-            log.warn("Exception while purging query ", e);
-            finishedQueries.add(finished);
-            continue;
-          }
-
-          synchronized (finished.ctx) {
-            finished.ctx.setFinishedQueryPersisted(true);
-            try {
-              if (finished.getCtx().getSelectedDriver() != null) {
-                
finished.getCtx().getSelectedDriver().closeQuery(finished.getCtx().getQueryHandle());
+              synchronized (finished.ctx) {
+                finished.ctx.setFinishedQueryPersisted(true);
+                try {
+                  if (finished.getCtx().getSelectedDriver() != null) {
+                    
finished.getCtx().getSelectedDriver().closeQuery(finished.getQueryHandle());
+                  }
+                } catch (Exception e) {
+                  log.warn("Exception while closing query with selected 
driver.", e);
+                }
+                log.info("Purging: {}", finished.getQueryHandle());
+                allQueries.remove(finished.getQueryHandle());
+                resultSets.remove(finished.getQueryHandle());
               }
-            } catch (Exception e) {
-              log.warn("Exception while closing query with selected driver.", 
e);
+              fireStatusChangeEvent(finished.getCtx(),
+                new QueryStatus(1f, null, CLOSED, "Query purged", false, null, 
null, null), finished.getCtx()
+                  .getStatus());
+              log.info("Query purged: {}", finished.getQueryHandle());
             }
-            log.info("Purging: {}", finished.getCtx().getQueryHandle());
-            allQueries.remove(finished.getCtx().getQueryHandle());
-            resultSets.remove(finished.getCtx().getQueryHandle());
           }
-          fireStatusChangeEvent(finished.getCtx(),
-            new QueryStatus(1f, null, CLOSED, "Query purged", false, null, 
null, null), finished.getCtx().getStatus());
-          log.info("Query purged: {}", finished.getCtx().getQueryHandle());
-
-        } catch (LensException e) {
-          incrCounter(QUERY_PURGER_COUNTER);
-          log.error("Error closing  query ", e);
-        } catch (Exception e) {
+          Thread.sleep(purgeInterval);
+        } catch (InterruptedException e) {
+          log.error("purger interrupted", e);
+        } catch (Throwable e) {
+          log.error("Purger giving error", e);
           incrCounter(QUERY_PURGER_COUNTER);
-          log.error("Error in query purger", e);
         }
       }
       log.info("QueryPurger exited");
@@ -1018,16 +1015,16 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       new DefaultQueryCollection(new TreeSet<QueryContext>(new 
QueryContextPriorityComparator()))));
 
     ImmutableSet<QueryLaunchingConstraint> queryConstraints = 
getImplementations(
-        QUERY_LAUNCHING_CONSTRAINT_FACTORIES_KEY, hiveConf);
+      QUERY_LAUNCHING_CONSTRAINT_FACTORIES_KEY, hiveConf);
 
     this.queryConstraintsChecker = new 
DefaultQueryLaunchingConstraintsChecker(queryConstraints);
 
     this.querySubmitterRunnable = new 
QuerySubmitter(LensServices.get().getErrorCollection(), this.waitingQueries,
-        this.queryConstraintsChecker);
+      this.queryConstraintsChecker);
     this.querySubmitter = new Thread(querySubmitterRunnable, "QuerySubmitter");
 
     ImmutableSet<WaitingQueriesSelectionPolicy> selectionPolicies = 
getImplementations(
-        WAITING_QUERIES_SELECTION_POLICY_FACTORIES_KEY, hiveConf);
+      WAITING_QUERIES_SELECTION_POLICY_FACTORIES_KEY, hiveConf);
 
     this.waitingQueriesSelector = new 
IntersectingWaitingQueriesSelector(selectionPolicies);
 
@@ -1048,8 +1045,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       log.error("Error while loading drivers", e);
       throw new IllegalStateException("Could not load drivers", e);
     }
-    maxFinishedQueries = conf.getInt(MAX_NUMBER_OF_FINISHED_QUERY,
-      DEFAULT_FINISHED_QUERIES);
+    purgeInterval = conf.getInt(PURGE_INTERVAL, DEFAULT_PURGE_INTERVAL);
     initalizeFinishedQueryStore(conf);
     log.info("Query execution service initialized");
   }
@@ -1207,7 +1203,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
    */
   private void rewriteAndSelect(final AbstractQueryContext ctx) throws 
LensException {
     MethodMetricsContext parallelCallGauge = 
MethodMetricsFactory.createMethodGauge(ctx.getConf(), false,
-        PARALLEL_CALL_GAUGE);
+      PARALLEL_CALL_GAUGE);
     try {
       userQueryToCubeQueryRewriter.rewrite(ctx);
       // Initially we obtain individual runnables for rewrite and estimate 
calls
@@ -1293,7 +1289,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       }
 
       MethodMetricsContext selectGauge = 
MethodMetricsFactory.createMethodGauge(ctx.getConf(), false,
-          DRIVER_SELECTOR_GAUGE);
+        DRIVER_SELECTOR_GAUGE);
       // 2. select driver to run the query
       LensDriver driver = driverSelector.select(ctx, conf);
       ctx.setSelectedDriver(driver);
@@ -1420,16 +1416,14 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
    * @return the resultset from dao
    * @throws LensException the lens exception
    */
-  private LensResultSet getResultsetFromDAO(QueryHandle queryHandle) throws 
LensException {
+  private LensPersistentResult getResultsetFromDAO(QueryHandle queryHandle) 
throws LensException {
     FinishedLensQuery query = lensServerDao.getQuery(queryHandle.toString());
     if (query != null) {
       if (query.getResult() == null) {
         throw new NotFoundException("InMemory Query result purged " + 
queryHandle);
       }
       try {
-        Class<LensResultSetMetadata> mdKlass = (Class<LensResultSetMetadata>) 
Class.forName(query.getMetadataClass());
-        return new LensPersistentResult(MAPPER.readValue(query.getMetadata(), 
mdKlass), query.getResult(),
-          query.getRows(), query.getFileSize());
+        return new LensPersistentResult(query, conf, MAPPER);
       } catch (Exception e) {
         throw new LensException(e);
       }
@@ -1444,7 +1438,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
    * @return the resultset
    * @throws LensException the lens exception
    */
-  private LensResultSet getResultset(QueryHandle queryHandle) throws 
LensException {
+  LensResultSet getResultset(QueryHandle queryHandle) throws LensException {
     QueryContext ctx = allQueries.get(queryHandle);
     if (ctx == null) {
       return getResultsetFromDAO(queryHandle);
@@ -1456,15 +1450,9 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
         LensResultSet resultSet = resultSets.get(queryHandle);
         if (resultSet == null) {
           if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) {
-            resultSets
-              .put(queryHandle,
-                new LensPersistentResult(
-                  ctx.getQueryOutputFormatter().getMetadata(),
-                  ctx.getQueryOutputFormatter().getFinalOutputPath(),
-                  ctx.getQueryOutputFormatter().getNumRows(),
-                  ctx.getQueryOutputFormatter().getFileSize()));
+            resultSets.put(queryHandle, new LensPersistentResult(ctx, conf));
           } else if (allQueries.get(queryHandle).isResultAvailableInDriver()) {
-            resultSet = 
allQueries.get(queryHandle).getSelectedDriver().fetchResultSet(allQueries.get(queryHandle));
+            resultSet = getDriverResultset(queryHandle);
             resultSets.put(queryHandle, resultSet);
           } else {
             throw new NotFoundException("Result set not available for query:" 
+ queryHandle);
@@ -1585,7 +1573,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   public QueryHandle executePrepareAsync(LensSessionHandle sessionHandle, 
QueryPrepareHandle prepareHandle,
     LensConf conf, String queryName) throws LensException {
     try {
-      log.info("ExecutePrepareAsync: session:{} prepareHandle:{}", 
sessionHandle,  prepareHandle.getPrepareHandleId());
+      log.info("ExecutePrepareAsync: session:{} prepareHandle:{}", 
sessionHandle, prepareHandle.getPrepareHandleId());
       acquire(sessionHandle);
       PreparedQueryContext pctx = getPreparedQueryContext(sessionHandle, 
prepareHandle);
       Configuration qconf = getLensConf(sessionHandle, conf);
@@ -1771,13 +1759,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       acquire(sessionHandle);
       QueryContext ctx = allQueries.get(queryHandle);
       if (ctx == null) {
-        FinishedLensQuery query = 
lensServerDao.getQuery(queryHandle.toString());
-        log.info("FinishedLensQuery:{}", query);
-        if (query == null) {
-          throw new NotFoundException("Query not found " + queryHandle);
-        }
-        // pass the query conf instead of service conf
-        return query.toQueryContext(conf, drivers.values());
+        return getQueryContextOfFinishedQuery(queryHandle);
       }
       updateStatus(queryHandle);
       return ctx;
@@ -1786,6 +1768,16 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
   }
 
+  QueryContext getQueryContextOfFinishedQuery(QueryHandle queryHandle) {
+    FinishedLensQuery query = lensServerDao.getQuery(queryHandle.toString());
+    log.info("FinishedLensQuery:{}", query);
+    if (query == null) {
+      throw new NotFoundException("Query not found " + queryHandle);
+    }
+    // pass the query conf instead of service conf
+    return query.toQueryContext(conf, drivers.values());
+  }
+
   /**
    * Gets the query context.
    *
@@ -1981,6 +1973,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
         this.notify();
       }
     }
+
   }
 
   /*
@@ -1994,7 +1987,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   public QueryResultSetMetadata getResultSetMetadata(LensSessionHandle 
sessionHandle, QueryHandle queryHandle)
     throws LensException {
     try {
-      log.info("GetResultSetMetadata: session:{} query: {}", sessionHandle,  
queryHandle);
+      log.info("GetResultSetMetadata: session:{} query: {}", sessionHandle, 
queryHandle);
       acquire(sessionHandle);
       LensResultSet resultSet = getResultset(queryHandle);
       if (resultSet != null) {
@@ -2267,7 +2260,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       acquire(sessionHandle);
       Configuration qconf = getLensConf(sessionHandle, lensConf);
       ExplainQueryContext explainQueryContext = new 
ExplainQueryContext(requestId, query, getSession(sessionHandle)
-          .getLoggedInUser(), lensConf, qconf, drivers.values());
+        .getLoggedInUser(), lensConf, qconf, drivers.values());
       
explainQueryContext.setLensSessionIdentifier(sessionHandle.getPublicId().toString());
       accept(query, qconf, SubmitOp.EXPLAIN);
       rewriteAndSelect(explainQueryContext);
@@ -2383,6 +2376,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
           break;
         case LAUNCHED:
         case RUNNING:
+        case EXECUTED:
           try {
             launchedQueries.add(ctx);
           } catch (final Exception e) {
@@ -2486,69 +2480,54 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
 
     return isHealthy
-        ? new HealthStatus(isHealthy, "QueryExecution service is healthy.")
-        : new HealthStatus(isHealthy, details.toString());
+      ? new HealthStatus(isHealthy, "QueryExecution service is healthy.")
+      : new HealthStatus(isHealthy, details.toString());
   }
-
   /*
    * (non-Javadoc)
    *
    * @see 
org.apache.lens.server.api.query.QueryExecutionService#getHttpResultSet(org.apache.lens.api.LensSessionHandle,
    * org.apache.lens.api.query.QueryHandle)
    */
+
   @Override
   public Response getHttpResultSet(LensSessionHandle sessionHandle, 
QueryHandle queryHandle) throws LensException {
+    LensResultSet resultSet = getResultset(queryHandle);
+    if (!resultSet.isHttpResultAvailable()) {
+      throw new NotFoundException("http result not available");
+    }
+    final Path resultPath = new Path(resultSet.getOutputPath());
     final QueryContext ctx = getQueryContext(sessionHandle, queryHandle);
-    LensResultSet result = getResultset(queryHandle);
-    if (result instanceof LensPersistentResult) {
-      final Path resultPath = new Path(((PersistentResultSet) 
result).getOutputPath());
+    String resultFSReadUrl = conf.get(RESULT_FS_READ_URL);
+    if (resultFSReadUrl != null) {
       try {
-        FileSystem fs = resultPath.getFileSystem(conf);
-        if (fs.isDirectory(resultPath)) {
-          throw new NotFoundException("Http result not available for query:" + 
queryHandle.toString());
-        }
-      } catch (IOException e) {
-        log.warn("Unable to get status for Result Directory", e);
-        throw new NotFoundException("Http result not available for query:" + 
queryHandle.toString());
-      }
-      String resultFSReadUrl = ctx.getConf().get(RESULT_FS_READ_URL);
-      if (resultFSReadUrl != null) {
-        try {
-          URI resultReadPath = new URI(resultFSReadUrl + 
resultPath.toUri().getPath() + "?op=OPEN&user.name="
-            + getSession(sessionHandle).getClusterUser());
-          return Response.seeOther(resultReadPath)
-            .header("content-disposition", "attachment; filename = " + 
resultPath.getName())
-            .type(MediaType.APPLICATION_OCTET_STREAM).build();
-        } catch (URISyntaxException e) {
-          throw new LensException(e);
-        }
-      } else {
-        StreamingOutput stream = new StreamingOutput() {
-          @Override
-          public void write(OutputStream os) throws IOException {
-            FSDataInputStream fin = null;
-            try {
-              FileSystem fs = resultPath.getFileSystem(ctx.getConf());
-              fin = fs.open(resultPath);
-              UtilityMethods.pipe(fin, os);
-            } finally {
-              if (fin != null) {
-                fin.close();
-              }
-
-            }
-          }
-        };
-        return Response.ok(stream).header("content-disposition", "attachment; 
filename = " + resultPath.getName())
+        URI resultReadPath = new URI(resultFSReadUrl + 
resultPath.toUri().getPath() + "?op=OPEN&user.name="
+          + getSession(sessionHandle).getClusterUser());
+        return Response.seeOther(resultReadPath)
+          .header("content-disposition", "attachment; filename = " + 
resultPath.getName())
           .type(MediaType.APPLICATION_OCTET_STREAM).build();
+      } catch (URISyntaxException e) {
+        throw new LensException(e);
       }
     } else {
-      String entity = "";
-      if (result instanceof InMemoryResultSet || result instanceof 
PersistentResultSet) {
-        entity = "Result is available in driver's "
-          + (result instanceof InMemoryResultSet ? "memory" : "persistence") + 
".";
-      }
-      return Response.status(Response.Status.NOT_FOUND).entity(entity).build();
+      StreamingOutput stream = new StreamingOutput() {
+        @Override
+        public void write(OutputStream os) throws IOException {
+          FSDataInputStream fin = null;
+          try {
+            FileSystem fs = resultPath.getFileSystem(ctx.getConf());
+            fin = fs.open(resultPath);
+            UtilityMethods.pipe(fin, os);
+          } finally {
+            if (fin != null) {
+              fin.close();
+            }
+
+          }
+        }
+      };
+      return Response.ok(stream).header("content-disposition", "attachment; 
filename = " + resultPath.getName())
+        .type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
   }
 
@@ -2698,7 +2677,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     Collection<ResourceEntry> pendingResources =
       session.getPendingSessionResourcesForDatabase(ctx.getDatabase());
     log.info("Adding pending {} session resources for session {} for database 
{}", pendingResources.size(),
-      sessionIdentifier,  ctx.getDatabase());
+      sessionIdentifier, ctx.getDatabase());
     List<ResourceEntry> failedResources = addResources(pendingResources, 
sessionHandle, hiveDriver);
     // Mark added resources so that we don't add them again. If any of the 
resources failed
     // to be added, then they will be added again
@@ -2761,7 +2740,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
 
     log.debug("launchedQueries.remove(finishedQuery) has returned [{}] for 
finished query with query id:[{}]", modified,
-        finishedQuery.getQueryHandleString());
+      finishedQuery.getQueryHandleString());
     return modified;
   }
 
@@ -2801,7 +2780,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   private void processWaitingQueries(final FinishedLensQuery finishedQuery) {
 
     Set<QueryContext> eligibleWaitingQueries = this.waitingQueriesSelector
-        .selectQueries(finishedQuery, this.waitingQueries);
+      .selectQueries(finishedQuery, this.waitingQueries);
 
     if (eligibleWaitingQueries.isEmpty()) {
       log.debug("No queries eligible to move out of waiting state.");

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
index b57bc64..55cabe2 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java
@@ -39,20 +39,15 @@ public class QueryExecutionStatisticsGenerator extends 
AsyncEventListener<QueryE
   /** The Constant LOG. */
   private static final Logger LOG = 
LoggerFactory.getLogger(QueryExecutionStatisticsGenerator.class);
 
-  /** The query service. */
-  private final QueryExecutionServiceImpl queryService;
-
   /** The event service. */
   private final LensEventService eventService;
 
   /**
    * Instantiates a new query execution statistics generator.
    *
-   * @param queryService the query service
    * @param eventService the event service
    */
-  public QueryExecutionStatisticsGenerator(QueryExecutionServiceImpl 
queryService, LensEventService eventService) {
-    this.queryService = queryService;
+  public QueryExecutionStatisticsGenerator(LensEventService eventService) {
     this.eventService = eventService;
   }
 
@@ -68,7 +63,7 @@ public class QueryExecutionStatisticsGenerator extends 
AsyncEventListener<QueryE
     }
     QueryHandle handle = ended.getQueryHandle();
     QueryExecutionStatistics event = new 
QueryExecutionStatistics(System.currentTimeMillis());
-    QueryContext ctx = queryService.getQueryContext(handle);
+    QueryContext ctx = ended.getQueryContext();
     if (ctx == null) {
       LOG.warn("Could not find the context for " + handle + " for event:" + 
ended.getCurrentValue()
         + ". No stat generated");
@@ -88,7 +83,7 @@ public class QueryExecutionStatisticsGenerator extends 
AsyncEventListener<QueryE
     QueryDriverStatistics driverStats = new QueryDriverStatistics();
     driverStats.setDriverQuery(ctx.getSelectedDriverQuery());
     driverStats.setStartTime(ctx.getDriverStatus().getDriverStartTime());
-    driverStats.setEndTime(ctx.getDriverStatus().getDriverStartTime());
+    driverStats.setEndTime(ctx.getDriverStatus().getDriverFinishTime());
     event.setDriverStats(driverStats);
     try {
       if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java 
b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
index af42eb0..f568b17 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
@@ -65,17 +65,16 @@ public class ResultFormatter extends 
AsyncEventListener<QueryExecuted> {
    */
   @Override
   public void process(QueryExecuted event) {
-    formatOutput(event);
+    formatOutput(queryService.getQueryContext(event.getQueryHandle()));
   }
 
   /**
    * Format output.
    *
-   * @param event the event
+   * @param ctx the query context
    */
-  private void formatOutput(QueryExecuted event) {
-    QueryHandle queryHandle = event.getQueryHandle();
-    QueryContext ctx = queryService.getQueryContext(queryHandle);
+  private void formatOutput(QueryContext ctx) {
+    QueryHandle queryHandle = ctx.getQueryHandle();
     
this.logSegregationContext.setLogSegragationAndQueryId(ctx.getQueryHandleString());
     try {
       if (!ctx.isPersistent()) {
@@ -86,9 +85,8 @@ public class ResultFormatter extends 
AsyncEventListener<QueryExecuted> {
         log.info("Result formatter for {}", queryHandle);
         LensResultSet resultSet = queryService.getDriverResultset(queryHandle);
         boolean isPersistedInDriver = resultSet instanceof PersistentResultSet;
-        if (isPersistedInDriver) {
-          // skip result formatting if persisted size is huge
-          Path persistedDirectory = new Path(ctx.getHdfsoutPath());
+        if (isPersistedInDriver) {          // skip result formatting if 
persisted size is huge
+          Path persistedDirectory = new Path(ctx.getDriverResultPath());
           FileSystem fs = persistedDirectory.getFileSystem(ctx.getConf());
           long size = fs.getContentSummary(persistedDirectory).getLength();
           long threshold = 
ctx.getConf().getLong(LensConfConstants.RESULT_FORMAT_SIZE_THRESHOLD,
@@ -112,7 +110,7 @@ public class ResultFormatter extends 
AsyncEventListener<QueryExecuted> {
           }
           if (isPersistedInDriver) {
             log.info("Result formatter for {} in persistent result", 
queryHandle);
-            Path persistedDirectory = new Path(ctx.getHdfsoutPath());
+            Path persistedDirectory = new Path(ctx.getDriverResultPath());
             // write all files from persistent directory
             ((PersistedOutputFormatter) 
formatter).addRowsFromPersistedPath(persistedDirectory);
           } else {
@@ -121,6 +119,7 @@ public class ResultFormatter extends 
AsyncEventListener<QueryExecuted> {
             while (inmemory.hasNext()) {
               ((InMemoryOutputFormatter) formatter).writeRow(inmemory.next());
             }
+            inmemory.setFullyAccessed(true);
           }
           if 
(ctx.getConf().getBoolean(LensConfConstants.QUERY_OUTPUT_WRITE_FOOTER,
             LensConfConstants.DEFAULT_OUTPUT_WRITE_FOOTER)) {

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/resources/lensserver-default.xml
----------------------------------------------------------------------
diff --git a/lens-server/src/main/resources/lensserver-default.xml 
b/lens-server/src/main/resources/lensserver-default.xml
index 3c8fc09..1e6bd10 100644
--- a/lens-server/src/main/resources/lensserver-default.xml
+++ b/lens-server/src/main/resources/lensserver-default.xml
@@ -279,9 +279,9 @@
 
   <!-- Finished Query Purging Configurations -->
   <property>
-    <name>lens.server.max.finished.queries</name>
-    <value>100</value>
-    <description>Maximum number of finished queries which lens server will 
keep in memory before purging.</description>
+    <name>lens.server.querypurger.sleep.interval</name>
+    <value>10000</value>
+    <description>The interval(milliseconds) with which purger to run 
periodically. Default 10 sec. </description>
   </property>
   <property>
     <name>lens.server.domain</name>

http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java 
b/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java
index 05bd99f..3dad050 100644
--- a/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java
+++ b/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java
@@ -23,15 +23,17 @@ import static org.testng.Assert.*;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.URI;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.ws.rs.core.UriBuilder;
 
 import org.apache.lens.driver.hive.TestRemoteHiveDriver;
-import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.metrics.LensMetricsUtil;
 import org.apache.lens.server.api.metrics.MetricsService;
 import org.apache.lens.server.model.LogSegregationContext;
 import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext;
+import org.apache.lens.server.query.QueryExecutionServiceImpl;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.Service;
@@ -41,6 +43,7 @@ import org.glassfish.jersey.test.JerseyTest;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -186,7 +189,6 @@ public abstract class LensJerseyTest extends JerseyTest {
    */
   public void restartLensServer() {
     HiveConf h = getServerConf();
-    h.set(LensConfConstants.MAX_NUMBER_OF_FINISHED_QUERY, "0");
     restartLensServer(h);
   }
 
@@ -204,4 +206,19 @@ public abstract class LensJerseyTest extends JerseyTest {
     LensServices.get().start();
     System.out.println("Lens services restarted!");
   }
+  public static void waitForPurge(int allowUnpurgable,
+    ConcurrentLinkedQueue<QueryExecutionServiceImpl.FinishedQuery> 
finishedQueries) throws InterruptedException {
+    List<QueryExecutionServiceImpl.FinishedQuery> unPurgable = 
Lists.newArrayList();
+    for (QueryExecutionServiceImpl.FinishedQuery finishedQuery : 
finishedQueries) {
+      if (!finishedQuery.canBePurged()) {
+        unPurgable.add(finishedQuery);
+      }
+    }
+    if (unPurgable.size() > allowUnpurgable) {
+      throw new RuntimeException("finished queries can't be purged: " + 
unPurgable);
+    }
+    while (finishedQueries.size() > allowUnpurgable) {
+      Thread.sleep(5000);
+    }
+  }
 }

Reply via email to