LENS-693 : Queries get purged as soon as they are finished
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/669e8727 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/669e8727 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/669e8727 Branch: refs/heads/current-release-line Commit: 669e8727292a297487dbeb2b3edc6f17c2cea651 Parents: d457dd0 Author: Rajat Khandelwal <pro...@apache.org> Authored: Wed Sep 23 14:36:40 2015 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Wed Sep 23 14:36:40 2015 +0530 ---------------------------------------------------------------------- .../main/java/org/apache/lens/api/LensConf.java | 7 +- .../lens/api/query/InMemoryQueryResult.java | 24 +- .../org/apache/lens/api/query/LensQuery.java | 8 +- .../lens/api/query/PersistentQueryResult.java | 18 + .../org/apache/lens/api/query/QueryResult.java | 4 +- .../org/apache/lens/api/query/QueryStatus.java | 1 + .../lens/cli/commands/LensQueryCommands.java | 9 +- .../org/apache/lens/driver/hive/HiveDriver.java | 5 +- .../driver/hive/HivePersistentResultSet.java | 2 +- .../apache/lens/driver/hive/TestHiveDriver.java | 2 +- .../lens/server/api/LensConfConstants.java | 8 +- .../server/api/driver/InMemoryResultSet.java | 24 +- .../lens/server/api/driver/LensResultSet.java | 9 + .../server/api/driver/PersistentResultSet.java | 19 +- .../server/api/query/FinishedLensQuery.java | 5 +- .../lens/server/api/query/QueryCancelled.java | 10 +- .../lens/server/api/query/QueryClosed.java | 10 +- .../lens/server/api/query/QueryContext.java | 6 +- .../lens/server/api/query/QueryEnded.java | 8 +- .../lens/server/api/query/QueryFailed.java | 10 +- .../lens/server/api/query/QuerySuccess.java | 10 +- .../lens/server/api/driver/MockDriver.java | 9 +- .../lens/server/query/LensPersistentResult.java | 73 +++- .../lens/server/query/QueryEndNotifier.java | 79 ++--- .../server/query/QueryExecutionServiceImpl.java | 345 +++++++++---------- .../QueryExecutionStatisticsGenerator.java | 11 +- .../lens/server/query/ResultFormatter.java | 17 +- .../src/main/resources/lensserver-default.xml | 6 +- .../org/apache/lens/server/LensJerseyTest.java | 21 +- .../lens/server/common/RestAPITestUtil.java | 122 +++++-- .../lens/server/query/TestEventService.java | 11 +- .../server/query/TestQueryEndEmailNotifier.java | 196 +++++------ .../lens/server/query/TestQueryService.java | 253 +++++--------- .../lens/server/query/TestResultFormatting.java | 11 +- lens-server/src/test/resources/lens-site.xml | 6 +- src/site/apt/admin/config.apt | 42 +-- 36 files changed, 754 insertions(+), 647 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/LensConf.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/LensConf.java b/lens-api/src/main/java/org/apache/lens/api/LensConf.java index 3b1ad87..ff965d6 100644 --- a/lens-api/src/main/java/org/apache/lens/api/LensConf.java +++ b/lens-api/src/main/java/org/apache/lens/api/LensConf.java @@ -63,11 +63,14 @@ public class LensConf implements Serializable { properties.put(key, value); } + public void addProperty(Object key, Object value) { + properties.put(String.valueOf(key), String.valueOf(value)); + } + /** * Adds Map of properties. * - * @param key the key - * @param value the value + * @param props the properties */ public void addProperties(Map<String, String> props) { properties.putAll(props); http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java b/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java index 6a9f455..36d0ba0 100644 --- a/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java +++ b/lens-api/src/main/java/org/apache/lens/api/query/InMemoryQueryResult.java @@ -27,10 +27,7 @@ import java.util.List; import javax.xml.bind.annotation.XmlElementWrapper; import javax.xml.bind.annotation.XmlRootElement; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; +import lombok.*; /** * The Class InMemoryQueryResult. @@ -48,11 +45,26 @@ import lombok.NoArgsConstructor; */ @NoArgsConstructor(access = AccessLevel.PROTECTED) public class InMemoryQueryResult extends QueryResult { - + public static final String DECLARATION = "Result available in memory, attaching here: \n\n"; /** * The rows. */ @XmlElementWrapper @Getter - private List<ResultRow> rows = new ArrayList<ResultRow>(); + private List<ResultRow> rows = new ArrayList<>(); + + public String toPrettyString() { + StringBuilder b = new StringBuilder(); + b.append(DECLARATION); + int numRows = 0; + for (ResultRow row : getRows()) { + for (Object col : row.getValues()) { + b.append(col).append("\t"); + } + numRows++; + b.append("\n"); + } + b.append(numRows).append(" rows "); + return b.toString(); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java b/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java index af439ff..204ecee 100644 --- a/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java +++ b/lens-api/src/main/java/org/apache/lens/api/query/LensQuery.java @@ -27,10 +27,7 @@ import javax.xml.bind.annotation.XmlRootElement; import org.apache.lens.api.LensConf; import org.apache.lens.api.Priority; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; +import lombok.*; /** * The Class LensQuery. @@ -81,6 +78,7 @@ import lombok.NoArgsConstructor; * Instantiates a new lens query. */ @NoArgsConstructor(access = AccessLevel.PROTECTED) +@EqualsAndHashCode public class LensQuery { /** @@ -214,7 +212,7 @@ public class LensQuery { } public String getErrorMessage() { - return (this.status!=null) ? this.status.getLensErrorTOErrorMsg() : null; + return (this.status != null) ? this.status.getLensErrorTOErrorMsg() : null; } public String getQueryHandleString() { http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java b/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java index a2e10c6..cb72cdc 100644 --- a/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java +++ b/lens-api/src/main/java/org/apache/lens/api/query/PersistentQueryResult.java @@ -69,4 +69,22 @@ public class PersistentQueryResult extends QueryResult { @Getter private Long fileSize; + @XmlElement + @Getter + private String httpResultUrl; + + @Override + public String toPrettyString() { + StringBuilder sb = new StringBuilder().append("Result available at ").append(persistedURI).append("."); + if (numRows != null) { + sb.append(" Number of rows: ").append(numRows).append("."); + } + if (fileSize != null) { + sb.append(" File size: ").append(fileSize).append("."); + } + if (httpResultUrl != null) { + sb.append(" Downloadable from ").append(httpResultUrl).append("."); + } + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java b/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java index 3ecf0c2..3bc7cc7 100644 --- a/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java +++ b/lens-api/src/main/java/org/apache/lens/api/query/QueryResult.java @@ -24,6 +24,8 @@ package org.apache.lens.api.query; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlSeeAlso; +import org.apache.lens.api.result.PrettyPrintable; + import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -36,5 +38,5 @@ import lombok.NoArgsConstructor; * Instantiates a new query result. */ @NoArgsConstructor(access = AccessLevel.PROTECTED) -public class QueryResult { +public abstract class QueryResult implements PrettyPrintable { } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java index 3c8531f..91cbe39 100644 --- a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java +++ b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java @@ -264,6 +264,7 @@ public class QueryStatus implements Serializable { break; case EXECUTED: switch (newState) { + case EXECUTED: case SUCCESSFUL: case FAILED: case CANCELED: http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java ---------------------------------------------------------------------- diff --git a/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java b/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java index 006eaed..b6e3fcf 100644 --- a/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java +++ b/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java @@ -120,14 +120,7 @@ public class LensQueryCommands extends BaseLensCommand { QueryResult r = rs.getResultSet().getResult(); if (r instanceof InMemoryQueryResult) { InMemoryQueryResult temp = (InMemoryQueryResult) r; - for (ResultRow row : temp.getRows()) { - for (Object col : row.getValues()) { - b.append(col).append("\t"); - } - numRows++; - b.append("\n"); - } - b.append(numRows + " rows "); + b.append(temp.toPrettyString()); } else { PersistentQueryResult temp = (PersistentQueryResult) r; b.append("Results of query stored at : ").append(temp.getPersistedURI()).append(" "); http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java index 31c343a..4561ccf 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java @@ -842,7 +842,7 @@ public class HiveDriver implements LensDriver { log.info("Creating result set for hiveHandle:{}", op); try { if (context.isDriverPersistent()) { - return new HivePersistentResultSet(new Path(context.getHdfsoutPath()), op, getClient()); + return new HivePersistentResultSet(new Path(context.getDriverResultPath()), op, getClient()); } else if (op.hasResultSet()) { return new HiveInMemoryResultSet(op, getClient(), closeAfterFetch); } else { @@ -874,7 +874,8 @@ public class HiveDriver implements LensDriver { Path resultSetPath = context.getHDFSResultDir(); // create query StringBuilder builder = new StringBuilder("INSERT OVERWRITE DIRECTORY "); - context.setHdfsoutPath(resultSetPath.makeQualified(resultSetPath.getFileSystem(context.getConf())).toString()); + context.setDriverResultPath( + resultSetPath.makeQualified(resultSetPath.getFileSystem(context.getConf())).toString()); builder.append('"').append(resultSetPath).append("\" "); String outputDirFormat = qdconf.get(LensConfConstants.QUERY_OUTPUT_DIRECTORY_FORMAT); if (outputDirFormat != null) { http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java index 00e1e53..746e17e 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HivePersistentResultSet.java @@ -64,7 +64,7 @@ public class HivePersistentResultSet extends PersistentResultSet { } @Override - public Long fileSize() throws LensException { + public Long getFileSize() throws LensException { return null; } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java index 78b3320..2cb3736 100644 --- a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java +++ b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java @@ -793,7 +793,7 @@ public class TestHiveDriver { assertEquals(0, driver.getHiveHandleSize()); HivePersistentResultSet persistentResultSet = (HivePersistentResultSet) resultSet; String path = persistentResultSet.getOutputPath(); - assertEquals(ctx.getHdfsoutPath(), path); + assertEquals(ctx.getDriverResultPath(), path); driver.closeQuery(plan2.getHandle()); } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java index 096d26e..f202603 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java @@ -774,14 +774,14 @@ public final class LensConfConstants { // Query Purge Configuration /** - * The Constant MAX_NUMBER_OF_FINISHED_QUERY. + * The Constant PURGE_INTERVAL. */ - public static final String MAX_NUMBER_OF_FINISHED_QUERY = SERVER_PFX + "max.finished.queries"; + public static final String PURGE_INTERVAL = SERVER_PFX + "querypurger.sleep.interval"; /** - * The Constant DEFAULT_FINISHED_QUERIES. + * The Constant DEFAULT_PURGE_INTERVAL. */ - public static final int DEFAULT_FINISHED_QUERIES = 100; + public static final int DEFAULT_PURGE_INTERVAL = 10000; // Server DB configuration /** http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java index 3b76126..c64a3dd 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java @@ -22,16 +22,31 @@ import java.util.ArrayList; import java.util.List; import org.apache.lens.api.query.InMemoryQueryResult; -import org.apache.lens.api.query.QueryResult; import org.apache.lens.api.query.ResultRow; import org.apache.lens.server.api.error.LensException; +import lombok.Setter; + /** * The Class InMemoryResultSet. */ public abstract class InMemoryResultSet extends LensResultSet { public abstract boolean seekToStart() throws LensException; + + @Setter + private boolean fullyAccessed = false; + + @Override + public boolean canBePurged() { + return fullyAccessed; + } + + @Override + public String getOutputPath() throws LensException { + return null; + } + /** * Whether there is another result row available. * @@ -60,12 +75,15 @@ public abstract class InMemoryResultSet extends LensResultSet { * * @see org.apache.lens.server.api.driver.LensResultSet#toQueryResult() */ - public QueryResult toQueryResult() throws LensException { + public InMemoryQueryResult toQueryResult() throws LensException { List<ResultRow> rows = new ArrayList<ResultRow>(); while (hasNext()) { rows.add(next()); } + fullyAccessed = true; return new InMemoryQueryResult(rows); } - + public boolean isHttpResultAvailable() throws LensException { + return false; + } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java index 929a302..805b0c1 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensResultSet.java @@ -25,6 +25,11 @@ import org.apache.lens.server.api.error.LensException; * Result set returned by driver. */ public abstract class LensResultSet { + /** + * + * @return true if the result can be purged + */ + public abstract boolean canBePurged(); /** * Get the size of the result set. @@ -39,8 +44,11 @@ public abstract class LensResultSet { * * @return Returns {@link LensResultSetMetadata} */ + public abstract LensResultSetMetadata getMetadata() throws LensException; + public abstract String getOutputPath() throws LensException; + /** * Get the corresponding query result object. * @@ -49,4 +57,5 @@ public abstract class LensResultSet { */ public abstract QueryResult toQueryResult() throws LensException; + public abstract boolean isHttpResultAvailable() throws LensException; } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java index bbde170..774f1ee 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PersistentResultSet.java @@ -19,7 +19,6 @@ package org.apache.lens.server.api.driver; import org.apache.lens.api.query.PersistentQueryResult; -import org.apache.lens.api.query.QueryResult; import org.apache.lens.server.api.error.LensException; /** @@ -27,22 +26,32 @@ import org.apache.lens.server.api.error.LensException; */ public abstract class PersistentResultSet extends LensResultSet { + @Override + public boolean canBePurged() { + return true; + } + /** * Get the size of the result set file. * * @return The size if available, null if not available. * @throws LensException the lens exception */ - public abstract Long fileSize() throws LensException; + public abstract Long getFileSize() throws LensException; - public abstract String getOutputPath() throws LensException; + public String getHttpResultUrl() { + return null; + } /* * (non-Javadoc) * * @see org.apache.lens.server.api.driver.LensResultSet#toQueryResult() */ - public QueryResult toQueryResult() throws LensException { - return new PersistentQueryResult(getOutputPath(), size(), fileSize()); + public PersistentQueryResult toQueryResult() throws LensException { + return new PersistentQueryResult(getOutputPath(), size(), getFileSize(), getHttpResultUrl()); + } + public boolean isHttpResultAvailable() throws LensException { + return false; } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java index d8c04db..7a06c44 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java @@ -24,7 +24,6 @@ import org.apache.lens.api.LensConf; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryStatus; import org.apache.lens.server.api.driver.LensDriver; -import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy; import org.apache.hadoop.conf.Configuration; @@ -44,7 +43,7 @@ import lombok.ToString; * * @see java.lang.Object#hashCode() */ -@EqualsAndHashCode +@EqualsAndHashCode(exclude = "selectedDriver") /* * (non-Javadoc) * @@ -198,7 +197,7 @@ public class FinishedLensQuery { this.selectedDriver = ctx.getSelectedDriver(); } - public QueryContext toQueryContext(Configuration conf, Collection<LensDriver> drivers) throws LensException { + public QueryContext toQueryContext(Configuration conf, Collection<LensDriver> drivers) { QueryContext qctx = new QueryContext(userQuery, submitter, new LensConf(), conf, drivers, null, submissionTime, false); qctx.setQueryHandle(QueryHandle.fromString(handle)); http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java index bc0465c..a473a47 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryCancelled.java @@ -29,6 +29,7 @@ public class QueryCancelled extends QueryEnded { /** * Instantiates a new query cancelled. * + * @param ctx the query context * @param eventTime the event time * @param prev the prev * @param current the current @@ -36,10 +37,15 @@ public class QueryCancelled extends QueryEnded { * @param user the user * @param cause the cause */ - public QueryCancelled(long eventTime, QueryStatus.Status prev, QueryStatus.Status current, QueryHandle handle, + public QueryCancelled(QueryContext ctx, long eventTime, QueryStatus.Status prev, QueryStatus.Status current, + QueryHandle handle, String user, String cause) { - super(eventTime, prev, current, handle, user, cause); + super(ctx, eventTime, prev, current, handle, user, cause); checkCurrentState(QueryStatus.Status.CANCELED); } + public QueryCancelled(QueryContext ctx, QueryStatus.Status prevState, QueryStatus.Status currState, String cause) { + // TODO: correct username. put who cancelled it, not the submitter. Similar for others + this(ctx, ctx.getEndTime(), prevState, currState, ctx.getQueryHandle(), ctx.getSubmittedUser(), cause); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java index ea8f70f..3837087 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryClosed.java @@ -29,6 +29,7 @@ public class QueryClosed extends QueryEnded { /** * Instantiates a new query closed. * + * @param ctx the query context * @param eventTime the event time * @param prev the prev * @param current the current @@ -36,9 +37,14 @@ public class QueryClosed extends QueryEnded { * @param user the user * @param cause the cause */ - public QueryClosed(long eventTime, QueryStatus.Status prev, QueryStatus.Status current, QueryHandle handle, + public QueryClosed(QueryContext ctx, long eventTime, QueryStatus.Status prev, QueryStatus.Status current, + QueryHandle handle, String user, String cause) { - super(eventTime, prev, current, handle, user, cause); + super(ctx, eventTime, prev, current, handle, user, cause); checkCurrentState(QueryStatus.Status.CLOSED); } + + public QueryClosed(QueryContext ctx, QueryStatus.Status prevState, QueryStatus.Status currState, String cause) { + this(ctx, ctx.getClosedTime(), prevState, currState, ctx.getQueryHandle(), ctx.getSubmittedUser(), cause); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index 12de0a5..bed79ac 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -42,10 +42,12 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import lombok.Getter; import lombok.Setter; +import lombok.ToString; /** * The Class QueryContext. */ +@ToString public class QueryContext extends AbstractQueryContext { /** @@ -95,7 +97,7 @@ public class QueryContext extends AbstractQueryContext { */ @Getter @Setter - private String hdfsoutPath; + private String driverResultPath; /** * The submission time. @@ -326,7 +328,7 @@ public class QueryContext extends AbstractQueryContext { /* * Introduced for Recovering finished query. */ - public void setStatusSkippingTransitionTest(final QueryStatus newStatus) throws LensException { + public void setStatusSkippingTransitionTest(final QueryStatus newStatus) { this.status = newStatus; } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java index 3e9474c..e80da6d 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryEnded.java @@ -33,6 +33,8 @@ import lombok.Getter; */ public class QueryEnded extends StatusChange { + @Getter + private final QueryContext queryContext; /** * The user. */ @@ -54,6 +56,7 @@ public class QueryEnded extends StatusChange { /** * Instantiates a new query ended. * + * @param ctx * @param eventTime the event time * @param prev the prev * @param current the current @@ -61,9 +64,10 @@ public class QueryEnded extends StatusChange { * @param user the user * @param cause the cause */ - public QueryEnded(long eventTime, QueryStatus.Status prev, QueryStatus.Status current, QueryHandle handle, - String user, String cause) { + public QueryEnded(QueryContext ctx, long eventTime, QueryStatus.Status prev, QueryStatus.Status current, + QueryHandle handle, String user, String cause) { super(eventTime, prev, current, handle); + this.queryContext = ctx; this.user = user; this.cause = cause; if (!END_STATES.contains(current)) { http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java index 40a1c0f..bdffbc2 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryFailed.java @@ -29,6 +29,7 @@ public class QueryFailed extends QueryEnded { /** * Instantiates a new query failed. * + * @param ctx the query context * @param eventTime the event time * @param prev the prev * @param current the current @@ -36,9 +37,14 @@ public class QueryFailed extends QueryEnded { * @param user the user * @param cause the cause */ - public QueryFailed(long eventTime, QueryStatus.Status prev, QueryStatus.Status current, QueryHandle handle, + public QueryFailed(QueryContext ctx, long eventTime, QueryStatus.Status prev, QueryStatus.Status current, + QueryHandle handle, String user, String cause) { - super(eventTime, prev, current, handle, user, cause); + super(ctx, eventTime, prev, current, handle, user, cause); checkCurrentState(QueryStatus.Status.FAILED); } + + public QueryFailed(QueryContext ctx, QueryStatus.Status prevState, QueryStatus.Status currState, String cause) { + this(ctx, ctx.getEndTime(), prevState, currState, ctx.getQueryHandle(), ctx.getSubmittedUser(), cause); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java index c551dba..298fdbb 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QuerySuccess.java @@ -29,13 +29,19 @@ public class QuerySuccess extends QueryEnded { /** * Instantiates a new query success. * + * @param ctx the query context * @param eventTime the event time * @param prev the prev * @param current the current * @param handle the handle */ - public QuerySuccess(long eventTime, QueryStatus.Status prev, QueryStatus.Status current, QueryHandle handle) { - super(eventTime, prev, current, handle, null, null); + public QuerySuccess(QueryContext ctx, long eventTime, QueryStatus.Status prev, QueryStatus.Status current, + QueryHandle handle) { + super(ctx, eventTime, prev, current, handle, null, null); checkCurrentState(QueryStatus.Status.SUCCESSFUL); } + + public QuerySuccess(QueryContext ctx, QueryStatus.Status prevState, QueryStatus.Status currState) { + this(ctx, ctx.getEndTime(), prevState, currState, ctx.getQueryHandle()); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java index b28669e..2d86589 100644 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java @@ -255,7 +255,7 @@ public class MockDriver implements LensDriver { } @Override - public Long fileSize() throws LensException { + public Long getFileSize() throws LensException { // TODO Auto-generated method stub return null; } @@ -297,7 +297,7 @@ public class MockDriver implements LensDriver { * @see org.apache.lens.server.api.driver.LensDriver#fetchResultSet(org.apache.lens.server.api.query.QueryContext) */ @Override - public LensResultSet fetchResultSet(QueryContext context) throws LensException { + public LensResultSet fetchResultSet(final QueryContext context) throws LensException { return new InMemoryResultSet() { @Override @@ -340,6 +340,11 @@ public class MockDriver implements LensDriver { // TODO Auto-generated method stub return false; } + + @Override + public boolean canBePurged() { + return true; + } }; } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java b/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java index b65a5f4..1e9a182 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/LensPersistentResult.java @@ -18,13 +18,27 @@ */ package org.apache.lens.server.query; +import java.io.IOException; + +import org.apache.lens.api.query.QueryHandle; +import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.driver.LensResultSetMetadata; import org.apache.lens.server.api.driver.PersistentResultSet; import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.query.FinishedLensQuery; +import org.apache.lens.server.api.query.QueryContext; -/** - * The Class LensPersistentResult. - */ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.codehaus.jackson.map.ObjectMapper; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** The Class LensPersistentResult. */ +@Slf4j public class LensPersistentResult extends PersistentResultSet { /** The metadata. */ @@ -38,19 +52,45 @@ public class LensPersistentResult extends PersistentResultSet { /** The file size. */ private final Long fileSize; + private final Configuration conf; + @Getter + private String httpResultUrl = null; /** * Instantiates a new lens persistent result. - * - * @param metadata the metadata - * @param outputPath the output path - * @param numRows the num rows + * @param queryHandle the query handle + * @param metadata the metadata + * @param outputPath the output path + * @param numRows the num rows + * @param conf the lens server conf */ - public LensPersistentResult(LensResultSetMetadata metadata, String outputPath, Integer numRows, Long fileSize) { + public LensPersistentResult(QueryHandle queryHandle, LensResultSetMetadata metadata, String outputPath, Integer + numRows, Long fileSize, + Configuration conf) { this.metadata = metadata; this.outputPath = outputPath; this.numRows = numRows; this.fileSize = fileSize; + this.conf = conf; + if (isHttpResultAvailable()) { + this.httpResultUrl = conf.get(LensConfConstants.SERVER_BASE_URL, LensConfConstants.DEFAULT_SERVER_BASE_URL) + + "queryapi/queries/" + queryHandle + "/httpresultset"; + } + } + + public LensPersistentResult(QueryContext ctx, Configuration conf) { + this(ctx.getQueryHandle(), + ctx.getQueryOutputFormatter().getMetadata(), + ctx.getQueryOutputFormatter().getFinalOutputPath(), + ctx.getQueryOutputFormatter().getNumRows(), + ctx.getQueryOutputFormatter().getFileSize(), conf); + } + + public LensPersistentResult(FinishedLensQuery query, Configuration conf, ObjectMapper mapper) throws + ClassNotFoundException, IOException { + this(QueryHandle.fromString(query.getHandle()), + mapper.readValue(query.getMetadata(), (Class<LensResultSetMetadata>) Class.forName(query.getMetadataClass())), + query.getResult(), query.getRows(), query.getFileSize(), conf); } @Override @@ -69,7 +109,7 @@ public class LensPersistentResult extends PersistentResultSet { } @Override - public Long fileSize() throws LensException { + public Long getFileSize() throws LensException { return fileSize; } @@ -77,4 +117,19 @@ public class LensPersistentResult extends PersistentResultSet { public LensResultSetMetadata getMetadata() throws LensException { return metadata; } + + @Override + public boolean isHttpResultAvailable() { + try { + final Path resultPath = new Path(getOutputPath()); + FileSystem fs = resultPath.getFileSystem(conf); + if (fs.isDirectory(resultPath)) { + return false; + } + } catch (IOException | LensException e) { + log.warn("Unable to get status for Result Directory", e); + return false; + } + return true; + } } http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java index 45ba7ac..110624a 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java @@ -18,6 +18,8 @@ */ package org.apache.lens.server.query; +import static org.apache.lens.server.api.LensConfConstants.*; + import java.util.Date; import java.util.Properties; @@ -32,7 +34,7 @@ import javax.mail.internet.MimeMultipart; import org.apache.lens.api.query.QueryStatus; import org.apache.lens.server.LensServices; -import org.apache.lens.server.api.LensConfConstants; +import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.events.AsyncEventListener; import org.apache.lens.server.api.metrics.MetricsService; import org.apache.lens.server.api.query.QueryContext; @@ -58,9 +60,6 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { /** The Constant EMAIL_ERROR_COUNTER. */ public static final String EMAIL_ERROR_COUNTER = "email-send-errors"; - /** The conf. */ - private final HiveConf conf; - /** The from. */ private final String from; @@ -78,37 +77,33 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { private final LogSegregationContext logSegregationContext; - /** - * Instantiates a new query end notifier. + /** Instantiates a new query end notifier. * * @param queryService the query service - * @param hiveConf the hive conf - */ + * @param hiveConf the hive conf */ public QueryEndNotifier(QueryExecutionServiceImpl queryService, HiveConf hiveConf, - @NonNull final LogSegregationContext logSegregationContext) { + @NonNull final LogSegregationContext logSegregationContext) { this.queryService = queryService; - this.conf = hiveConf; - from = conf.get(LensConfConstants.MAIL_FROM_ADDRESS); - host = conf.get(LensConfConstants.MAIL_HOST); - port = conf.get(LensConfConstants.MAIL_PORT); - mailSmtpTimeout = Integer.parseInt( - conf.get(LensConfConstants.MAIL_SMTP_TIMEOUT, LensConfConstants.MAIL_DEFAULT_SMTP_TIMEOUT)); - mailSmtpConnectionTimeout = Integer.parseInt(conf.get(LensConfConstants.MAIL_SMTP_CONNECTIONTIMEOUT, - LensConfConstants.MAIL_DEFAULT_SMTP_CONNECTIONTIMEOUT)); + HiveConf conf = hiveConf; + from = conf.get(MAIL_FROM_ADDRESS); + host = conf.get(MAIL_HOST); + port = conf.get(MAIL_PORT); + mailSmtpTimeout = Integer.parseInt(conf.get(MAIL_SMTP_TIMEOUT, MAIL_DEFAULT_SMTP_TIMEOUT)); + mailSmtpConnectionTimeout = Integer.parseInt(conf.get(MAIL_SMTP_CONNECTIONTIMEOUT, + MAIL_DEFAULT_SMTP_CONNECTIONTIMEOUT)); this.logSegregationContext = logSegregationContext; } /* * (non-Javadoc) * - * @see org.apache.lens.server.api.events.AsyncEventListener#process(org.apache.lens.server.api.events.LensEvent) - */ + * @see org.apache.lens.server.api.events.AsyncEventListener#process(org.apache.lens.server.api.events.LensEvent) */ @Override - public void process(QueryEnded event) { + public void process(final QueryEnded event) { if (event.getCurrentValue() == QueryStatus.Status.CLOSED) { return; } - QueryContext queryContext = queryService.getQueryContext(event.getQueryHandle()); + QueryContext queryContext = event.getQueryContext(); if (queryContext == null) { log.warn("Could not find the context for {} for event:{}. No email generated", event.getQueryHandle(), event.getCurrentValue()); @@ -116,45 +111,36 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { } this.logSegregationContext.setLogSegragationAndQueryId(queryContext.getQueryHandleString()); - boolean whetherMailNotify = Boolean.parseBoolean(queryContext.getConf().get(LensConfConstants.QUERY_MAIL_NOTIFY, - LensConfConstants.WHETHER_MAIL_NOTIFY_DEFAULT)); + boolean whetherMailNotify = Boolean.parseBoolean(queryContext.getConf().get(QUERY_MAIL_NOTIFY, + WHETHER_MAIL_NOTIFY_DEFAULT)); if (!whetherMailNotify) { return; } String queryName = queryContext.getQueryName(); - queryName = queryName == null ? "" : queryName; - String mailSubject = "Query " + queryName + " " + queryContext.getStatus().getStatus() + ": " - + event.getQueryHandle(); + String mailSubject = "Query " + (StringUtils.isBlank(queryName) ? "" : (queryName + " ")) + + queryContext.getStatus().getStatus() + ": " + event.getQueryHandle(); String mailMessage = createMailMessage(queryContext); String to = queryContext.getSubmittedUser() + "@" + queryService.getServerDomain(); - String cc = queryContext.getConf().get(LensConfConstants.QUERY_RESULT_EMAIL_CC, - LensConfConstants.QUERY_RESULT_DEFAULT_EMAIL_CC); + String cc = queryContext.getConf().get(QUERY_RESULT_EMAIL_CC, QUERY_RESULT_DEFAULT_EMAIL_CC); log.info("Sending completion email for query handle: {}", event.getQueryHandle()); sendMail(host, port, new Email(from, to, cc, mailSubject, mailMessage), mailSmtpTimeout, mailSmtpConnectionTimeout); } - /** - * Creates the mail message. + /** Creates the mail message. * * @param queryContext the query context - * @return the string - */ + * @return the string */ private String createMailMessage(QueryContext queryContext) { StringBuilder msgBuilder = new StringBuilder(); switch (queryContext.getStatus().getStatus()) { case SUCCESSFUL: - msgBuilder.append("Result available at "); - String baseURI = conf.get(LensConfConstants.SERVER_BASE_URL, LensConfConstants.DEFAULT_SERVER_BASE_URL); - msgBuilder.append(baseURI); - msgBuilder.append("queryapi/queries/"); - msgBuilder.append(queryContext.getQueryHandle()); - msgBuilder.append("/httpresultset"); + msgBuilder.append(getResultMessage(queryContext)); break; case FAILED: msgBuilder.append(queryContext.getStatus().getStatusMessage()); @@ -166,13 +152,21 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { case CANCELED: msgBuilder.append(queryContext.getStatus().getStatusMessage()); break; - case CLOSED: default: break; } return msgBuilder.toString(); } + private String getResultMessage(QueryContext queryContext) { + try { + return queryService.getResultset(queryContext.getQueryHandle()).toQueryResult().toPrettyString(); + } catch (LensException e) { + log.error("Error retrieving result of query handle {} for sending e-mail", queryContext.getQueryHandle(), e); + return "Error retrieving result."; + } + } + @Data public static class Email { private final String from; @@ -181,15 +175,14 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { private final String subject; private final String message; } - /** - * Send mail. + + /** Send mail. * * @param host the host * @param port the port * @param email the email * @param mailSmtpTimeout the mail smtp timeout - * @param mailSmtpConnectionTimeout the mail smtp connection timeout - */ + * @param mailSmtpConnectionTimeout the mail smtp connection timeout */ public static void sendMail(String host, String port, Email email, int mailSmtpTimeout, int mailSmtpConnectionTimeout) { Properties props = System.getProperties(); http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 9e27dd4..2ba54e0 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -88,10 +88,12 @@ import org.codehaus.jackson.map.module.SimpleModule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import lombok.Getter; import lombok.NonNull; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; /** @@ -159,18 +161,17 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE /** * The finished queries. */ - private DelayQueue<FinishedQuery> finishedQueries = new DelayQueue<FinishedQuery>(); + ConcurrentLinkedQueue<FinishedQuery> finishedQueries = new ConcurrentLinkedQueue<>(); /** * The prepared query queue. */ - private DelayQueue<PreparedQueryContext> preparedQueryQueue = new DelayQueue<PreparedQueryContext>(); + private DelayQueue<PreparedQueryContext> preparedQueryQueue = new DelayQueue<>(); /** * The prepared queries. */ - private Map<QueryPrepareHandle, PreparedQueryContext> preparedQueries - = new HashMap<QueryPrepareHandle, PreparedQueryContext>(); + private Map<QueryPrepareHandle, PreparedQueryContext> preparedQueries = new HashMap<>(); /** * The all queries. @@ -180,7 +181,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE /** * The conf. */ - private Configuration conf; + @VisibleForTesting + Configuration conf; /** * The query submitter runnable. @@ -240,7 +242,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE /** * The max finished queries. */ - private int maxFinishedQueries; + int purgeInterval; /** * The lens server dao. @@ -328,7 +330,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } // Add result formatter getEventService().addListenerForType(new ResultFormatter(this, this.logSegregationContext), QueryExecuted.class); - getEventService().addListenerForType(new QueryExecutionStatisticsGenerator(this, getEventService()), + getEventService().addListenerForType(new QueryExecutionStatisticsGenerator(getEventService()), QueryEnded.class); getEventService().addListenerForType( new QueryEndNotifier(this, getCliService().getHiveConf(), this.logSegregationContext), QueryEnded.class); @@ -435,11 +437,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE /** * The Class FinishedQuery. */ - private class FinishedQuery implements Delayed { + @ToString + public class FinishedQuery { /** * The ctx. */ + @Getter private final QueryContext ctx; /** @@ -462,29 +466,25 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } } - /* - * (non-Javadoc) - * - * @see java.lang.Comparable#compareTo(java.lang.Object) - */ - @Override - public int compareTo(Delayed o) { - return (int) (this.finishTime.getTime() - ((FinishedQuery) o).finishTime.getTime()); + public boolean canBePurged() { + try { + if (getCtx().getStatus().getStatus().equals(SUCCESSFUL)) { + if (getCtx().getStatus().isResultSetAvailable()) { + LensResultSet rs = getResultset(); + log.info("Resultset for {} is {}", getQueryHandle(), rs); + return rs.canBePurged(); + } + } + return true; + } catch (Throwable e) { + log.error("Error while accessing result set for query handle while purging: {}." + + " Hence, going ahead with purge", getQueryHandle(), e); + return true; + } } - /* - * (non-Javadoc) - * - * @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit) - */ - @Override - public long getDelay(TimeUnit units) { - int size = finishedQueries.size(); - if (size > maxFinishedQueries) { - return 0; - } else { - return Integer.MAX_VALUE; - } + private LensResultSet getResultset() throws LensException { + return QueryExecutionServiceImpl.this.getResultset(getQueryHandle()); } /** @@ -494,16 +494,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE return finishTime; } - /** - * @return the ctx - */ - public QueryContext getCtx() { - return ctx; - } - public String getQueryHandleString() { return ctx.getQueryHandleString(); } + + public QueryHandle getQueryHandle() { + return ctx.getQueryHandle(); + } } /** @@ -523,8 +520,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE private final QueryLaunchingConstraintsChecker constraintsChecker; public QuerySubmitter(@NonNull final ErrorCollection errorCollection, - @NonNull final EstimatedQueryCollection waitingQueries, - @NonNull final QueryLaunchingConstraintsChecker constraintsChecker) { + @NonNull final EstimatedQueryCollection waitingQueries, + @NonNull final QueryLaunchingConstraintsChecker constraintsChecker) { this.errorCollection = errorCollection; this.waitingQueries = waitingQueries; @@ -639,9 +636,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } private void checkEstimatedQueriesState(final QueryContext query) throws LensException { - if (query.getSelectedDriver() == null || query.getSelectedDriverQueryCost() == null) { - throw new LensException("selected driver: " + query.getSelectedDriver() +" OR selected driver query cost: " - + query.getSelectedDriverQueryCost() + " is null. Query doesn't appear to be an estimated query."); + if (query.getSelectedDriver() == null || query.getSelectedDriverQueryCost() == null) { + throw new LensException("selected driver: " + query.getSelectedDriver() + " OR selected driver query cost: " + + query.getSelectedDriverQueryCost() + " is null. Query doesn't appear to be an estimated query."); } } } @@ -789,10 +786,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE ctx.setStatus(ctx.getDriverStatus().toQueryStatus()); } catch (LensException exc) { // Driver gave exception while updating status - setFailedStatus(ctx, "Status update failed", exc.getMessage(), exc.buildLensErrorTO(this.errorCollection)); log.error("Status update failed for {}", handle, exc); - + return; } // query is successfully executed by driver and // if query result need not be persisted or there is no result available in driver, move the query to @@ -832,10 +828,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE QueryHandle query = ctx.getQueryHandle(); switch (currState) { case CANCELED: - // TODO: correct username. put who cancelled it, not the submitter. Similar for others - return new QueryCancelled(ctx.getEndTime(), prevState, currState, query, ctx.getSubmittedUser(), null); + return new QueryCancelled(ctx, prevState, currState, null); case CLOSED: - return new QueryClosed(ctx.getClosedTime(), prevState, currState, query, ctx.getSubmittedUser(), null); + return new QueryClosed(ctx, prevState, currState, null); case FAILED: StringBuilder msgBuilder = new StringBuilder(); msgBuilder.append(ctx.getStatus().getStatusMessage()); @@ -843,8 +838,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE msgBuilder.append("\n Reason:\n"); msgBuilder.append(ctx.getStatus().getErrorMessage()); } - return new QueryFailed(ctx.getEndTime(), prevState, currState, query, ctx.getSubmittedUser(), - msgBuilder.toString()); + return new QueryFailed(ctx, prevState, currState, msgBuilder.toString()); case LAUNCHED: return new QueryLaunched(ctx.getLaunchTime(), prevState, currState, query); case QUEUED: @@ -855,7 +849,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE case EXECUTED: return new QueryExecuted(ctx.getDriverStatus().getDriverFinishTime(), prevState, currState, query); case SUCCESSFUL: - return new QuerySuccess(ctx.getEndTime(), prevState, currState, query); + return new QuerySuccess(ctx, prevState, currState); default: log.warn("Query {} transitioned to {} state from {} state", query, currState, prevState); return null; @@ -905,64 +899,67 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE public void run() { log.info("Starting Query purger thread"); while (!stopped && !queryPurger.isInterrupted()) { - FinishedQuery finished = null; try { - finished = finishedQueries.take(); - logSegregationContext.setLogSegragationAndQueryId(finished.getQueryHandleString()); - } catch (InterruptedException e) { - log.info("QueryPurger has been interrupted, exiting"); - return; - } - try { - FinishedLensQuery finishedQuery = new FinishedLensQuery(finished.getCtx()); - if (finished.ctx.getStatus().getStatus() == SUCCESSFUL) { - if (finished.ctx.getStatus().isResultSetAvailable()) { - LensResultSet set = getResultset(finished.getCtx().getQueryHandle()); - if (set != null && PersistentResultSet.class.isAssignableFrom(set.getClass())) { - LensResultSetMetadata metadata = set.getMetadata(); - String outputPath = ((PersistentResultSet) set).getOutputPath(); - Long fileSize = ((PersistentResultSet) set).fileSize(); - Integer rows = set.size(); - finishedQuery.setMetadataClass(metadata.getClass().getName()); - finishedQuery.setResult(outputPath); - finishedQuery.setMetadata(MAPPER.writeValueAsString(metadata)); - finishedQuery.setRows(rows); - finishedQuery.setFileSize(fileSize); + Iterator<FinishedQuery> iter = finishedQueries.iterator(); + FinishedQuery finished; + while (iter.hasNext()) { + finished = iter.next(); + if (finished.canBePurged()) { + try { + FinishedLensQuery finishedQuery = new FinishedLensQuery(finished.getCtx()); + if (finished.ctx.getStatus().getStatus() == SUCCESSFUL) { + if (finished.ctx.getStatus().isResultSetAvailable()) { + try { + LensResultSet set = finished.getResultset(); + if (set != null && PersistentResultSet.class.isAssignableFrom(set.getClass())) { + LensResultSetMetadata metadata = set.getMetadata(); + String outputPath = set.getOutputPath(); + Long fileSize = ((PersistentResultSet) set).getFileSize(); + Integer rows = set.size(); + finishedQuery.setMetadataClass(metadata.getClass().getName()); + finishedQuery.setResult(outputPath); + finishedQuery.setMetadata(MAPPER.writeValueAsString(metadata)); + finishedQuery.setRows(rows); + finishedQuery.setFileSize(fileSize); + } + } catch (Exception e) { + log.error("Couldn't obtain result set info for the query: {}. Going ahead with purge", + finished.getQueryHandle(), e); + } + } + } + lensServerDao.insertFinishedQuery(finishedQuery); + log.info("Saved query {} to DB", finishedQuery.getHandle()); + iter.remove(); + } catch (Exception e) { + log.warn("Exception while purging query {}", finished.getQueryHandle(), e); + continue; } - } - } - try { - lensServerDao.insertFinishedQuery(finishedQuery); - log.info("Saved query {} to DB", finishedQuery.getHandle()); - } catch (Exception e) { - log.warn("Exception while purging query ", e); - finishedQueries.add(finished); - continue; - } - - synchronized (finished.ctx) { - finished.ctx.setFinishedQueryPersisted(true); - try { - if (finished.getCtx().getSelectedDriver() != null) { - finished.getCtx().getSelectedDriver().closeQuery(finished.getCtx().getQueryHandle()); + synchronized (finished.ctx) { + finished.ctx.setFinishedQueryPersisted(true); + try { + if (finished.getCtx().getSelectedDriver() != null) { + finished.getCtx().getSelectedDriver().closeQuery(finished.getQueryHandle()); + } + } catch (Exception e) { + log.warn("Exception while closing query with selected driver.", e); + } + log.info("Purging: {}", finished.getQueryHandle()); + allQueries.remove(finished.getQueryHandle()); + resultSets.remove(finished.getQueryHandle()); } - } catch (Exception e) { - log.warn("Exception while closing query with selected driver.", e); + fireStatusChangeEvent(finished.getCtx(), + new QueryStatus(1f, null, CLOSED, "Query purged", false, null, null, null), finished.getCtx() + .getStatus()); + log.info("Query purged: {}", finished.getQueryHandle()); } - log.info("Purging: {}", finished.getCtx().getQueryHandle()); - allQueries.remove(finished.getCtx().getQueryHandle()); - resultSets.remove(finished.getCtx().getQueryHandle()); } - fireStatusChangeEvent(finished.getCtx(), - new QueryStatus(1f, null, CLOSED, "Query purged", false, null, null, null), finished.getCtx().getStatus()); - log.info("Query purged: {}", finished.getCtx().getQueryHandle()); - - } catch (LensException e) { - incrCounter(QUERY_PURGER_COUNTER); - log.error("Error closing query ", e); - } catch (Exception e) { + Thread.sleep(purgeInterval); + } catch (InterruptedException e) { + log.error("purger interrupted", e); + } catch (Throwable e) { + log.error("Purger giving error", e); incrCounter(QUERY_PURGER_COUNTER); - log.error("Error in query purger", e); } } log.info("QueryPurger exited"); @@ -1018,16 +1015,16 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE new DefaultQueryCollection(new TreeSet<QueryContext>(new QueryContextPriorityComparator())))); ImmutableSet<QueryLaunchingConstraint> queryConstraints = getImplementations( - QUERY_LAUNCHING_CONSTRAINT_FACTORIES_KEY, hiveConf); + QUERY_LAUNCHING_CONSTRAINT_FACTORIES_KEY, hiveConf); this.queryConstraintsChecker = new DefaultQueryLaunchingConstraintsChecker(queryConstraints); this.querySubmitterRunnable = new QuerySubmitter(LensServices.get().getErrorCollection(), this.waitingQueries, - this.queryConstraintsChecker); + this.queryConstraintsChecker); this.querySubmitter = new Thread(querySubmitterRunnable, "QuerySubmitter"); ImmutableSet<WaitingQueriesSelectionPolicy> selectionPolicies = getImplementations( - WAITING_QUERIES_SELECTION_POLICY_FACTORIES_KEY, hiveConf); + WAITING_QUERIES_SELECTION_POLICY_FACTORIES_KEY, hiveConf); this.waitingQueriesSelector = new IntersectingWaitingQueriesSelector(selectionPolicies); @@ -1048,8 +1045,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE log.error("Error while loading drivers", e); throw new IllegalStateException("Could not load drivers", e); } - maxFinishedQueries = conf.getInt(MAX_NUMBER_OF_FINISHED_QUERY, - DEFAULT_FINISHED_QUERIES); + purgeInterval = conf.getInt(PURGE_INTERVAL, DEFAULT_PURGE_INTERVAL); initalizeFinishedQueryStore(conf); log.info("Query execution service initialized"); } @@ -1207,7 +1203,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE */ private void rewriteAndSelect(final AbstractQueryContext ctx) throws LensException { MethodMetricsContext parallelCallGauge = MethodMetricsFactory.createMethodGauge(ctx.getConf(), false, - PARALLEL_CALL_GAUGE); + PARALLEL_CALL_GAUGE); try { userQueryToCubeQueryRewriter.rewrite(ctx); // Initially we obtain individual runnables for rewrite and estimate calls @@ -1293,7 +1289,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } MethodMetricsContext selectGauge = MethodMetricsFactory.createMethodGauge(ctx.getConf(), false, - DRIVER_SELECTOR_GAUGE); + DRIVER_SELECTOR_GAUGE); // 2. select driver to run the query LensDriver driver = driverSelector.select(ctx, conf); ctx.setSelectedDriver(driver); @@ -1420,16 +1416,14 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE * @return the resultset from dao * @throws LensException the lens exception */ - private LensResultSet getResultsetFromDAO(QueryHandle queryHandle) throws LensException { + private LensPersistentResult getResultsetFromDAO(QueryHandle queryHandle) throws LensException { FinishedLensQuery query = lensServerDao.getQuery(queryHandle.toString()); if (query != null) { if (query.getResult() == null) { throw new NotFoundException("InMemory Query result purged " + queryHandle); } try { - Class<LensResultSetMetadata> mdKlass = (Class<LensResultSetMetadata>) Class.forName(query.getMetadataClass()); - return new LensPersistentResult(MAPPER.readValue(query.getMetadata(), mdKlass), query.getResult(), - query.getRows(), query.getFileSize()); + return new LensPersistentResult(query, conf, MAPPER); } catch (Exception e) { throw new LensException(e); } @@ -1444,7 +1438,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE * @return the resultset * @throws LensException the lens exception */ - private LensResultSet getResultset(QueryHandle queryHandle) throws LensException { + LensResultSet getResultset(QueryHandle queryHandle) throws LensException { QueryContext ctx = allQueries.get(queryHandle); if (ctx == null) { return getResultsetFromDAO(queryHandle); @@ -1456,15 +1450,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE LensResultSet resultSet = resultSets.get(queryHandle); if (resultSet == null) { if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) { - resultSets - .put(queryHandle, - new LensPersistentResult( - ctx.getQueryOutputFormatter().getMetadata(), - ctx.getQueryOutputFormatter().getFinalOutputPath(), - ctx.getQueryOutputFormatter().getNumRows(), - ctx.getQueryOutputFormatter().getFileSize())); + resultSets.put(queryHandle, new LensPersistentResult(ctx, conf)); } else if (allQueries.get(queryHandle).isResultAvailableInDriver()) { - resultSet = allQueries.get(queryHandle).getSelectedDriver().fetchResultSet(allQueries.get(queryHandle)); + resultSet = getDriverResultset(queryHandle); resultSets.put(queryHandle, resultSet); } else { throw new NotFoundException("Result set not available for query:" + queryHandle); @@ -1585,7 +1573,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE public QueryHandle executePrepareAsync(LensSessionHandle sessionHandle, QueryPrepareHandle prepareHandle, LensConf conf, String queryName) throws LensException { try { - log.info("ExecutePrepareAsync: session:{} prepareHandle:{}", sessionHandle, prepareHandle.getPrepareHandleId()); + log.info("ExecutePrepareAsync: session:{} prepareHandle:{}", sessionHandle, prepareHandle.getPrepareHandleId()); acquire(sessionHandle); PreparedQueryContext pctx = getPreparedQueryContext(sessionHandle, prepareHandle); Configuration qconf = getLensConf(sessionHandle, conf); @@ -1771,13 +1759,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE acquire(sessionHandle); QueryContext ctx = allQueries.get(queryHandle); if (ctx == null) { - FinishedLensQuery query = lensServerDao.getQuery(queryHandle.toString()); - log.info("FinishedLensQuery:{}", query); - if (query == null) { - throw new NotFoundException("Query not found " + queryHandle); - } - // pass the query conf instead of service conf - return query.toQueryContext(conf, drivers.values()); + return getQueryContextOfFinishedQuery(queryHandle); } updateStatus(queryHandle); return ctx; @@ -1786,6 +1768,16 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } } + QueryContext getQueryContextOfFinishedQuery(QueryHandle queryHandle) { + FinishedLensQuery query = lensServerDao.getQuery(queryHandle.toString()); + log.info("FinishedLensQuery:{}", query); + if (query == null) { + throw new NotFoundException("Query not found " + queryHandle); + } + // pass the query conf instead of service conf + return query.toQueryContext(conf, drivers.values()); + } + /** * Gets the query context. * @@ -1981,6 +1973,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE this.notify(); } } + } /* @@ -1994,7 +1987,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE public QueryResultSetMetadata getResultSetMetadata(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException { try { - log.info("GetResultSetMetadata: session:{} query: {}", sessionHandle, queryHandle); + log.info("GetResultSetMetadata: session:{} query: {}", sessionHandle, queryHandle); acquire(sessionHandle); LensResultSet resultSet = getResultset(queryHandle); if (resultSet != null) { @@ -2267,7 +2260,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE acquire(sessionHandle); Configuration qconf = getLensConf(sessionHandle, lensConf); ExplainQueryContext explainQueryContext = new ExplainQueryContext(requestId, query, getSession(sessionHandle) - .getLoggedInUser(), lensConf, qconf, drivers.values()); + .getLoggedInUser(), lensConf, qconf, drivers.values()); explainQueryContext.setLensSessionIdentifier(sessionHandle.getPublicId().toString()); accept(query, qconf, SubmitOp.EXPLAIN); rewriteAndSelect(explainQueryContext); @@ -2383,6 +2376,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE break; case LAUNCHED: case RUNNING: + case EXECUTED: try { launchedQueries.add(ctx); } catch (final Exception e) { @@ -2486,69 +2480,54 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } return isHealthy - ? new HealthStatus(isHealthy, "QueryExecution service is healthy.") - : new HealthStatus(isHealthy, details.toString()); + ? new HealthStatus(isHealthy, "QueryExecution service is healthy.") + : new HealthStatus(isHealthy, details.toString()); } - /* * (non-Javadoc) * * @see org.apache.lens.server.api.query.QueryExecutionService#getHttpResultSet(org.apache.lens.api.LensSessionHandle, * org.apache.lens.api.query.QueryHandle) */ + @Override public Response getHttpResultSet(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException { + LensResultSet resultSet = getResultset(queryHandle); + if (!resultSet.isHttpResultAvailable()) { + throw new NotFoundException("http result not available"); + } + final Path resultPath = new Path(resultSet.getOutputPath()); final QueryContext ctx = getQueryContext(sessionHandle, queryHandle); - LensResultSet result = getResultset(queryHandle); - if (result instanceof LensPersistentResult) { - final Path resultPath = new Path(((PersistentResultSet) result).getOutputPath()); + String resultFSReadUrl = conf.get(RESULT_FS_READ_URL); + if (resultFSReadUrl != null) { try { - FileSystem fs = resultPath.getFileSystem(conf); - if (fs.isDirectory(resultPath)) { - throw new NotFoundException("Http result not available for query:" + queryHandle.toString()); - } - } catch (IOException e) { - log.warn("Unable to get status for Result Directory", e); - throw new NotFoundException("Http result not available for query:" + queryHandle.toString()); - } - String resultFSReadUrl = ctx.getConf().get(RESULT_FS_READ_URL); - if (resultFSReadUrl != null) { - try { - URI resultReadPath = new URI(resultFSReadUrl + resultPath.toUri().getPath() + "?op=OPEN&user.name=" - + getSession(sessionHandle).getClusterUser()); - return Response.seeOther(resultReadPath) - .header("content-disposition", "attachment; filename = " + resultPath.getName()) - .type(MediaType.APPLICATION_OCTET_STREAM).build(); - } catch (URISyntaxException e) { - throw new LensException(e); - } - } else { - StreamingOutput stream = new StreamingOutput() { - @Override - public void write(OutputStream os) throws IOException { - FSDataInputStream fin = null; - try { - FileSystem fs = resultPath.getFileSystem(ctx.getConf()); - fin = fs.open(resultPath); - UtilityMethods.pipe(fin, os); - } finally { - if (fin != null) { - fin.close(); - } - - } - } - }; - return Response.ok(stream).header("content-disposition", "attachment; filename = " + resultPath.getName()) + URI resultReadPath = new URI(resultFSReadUrl + resultPath.toUri().getPath() + "?op=OPEN&user.name=" + + getSession(sessionHandle).getClusterUser()); + return Response.seeOther(resultReadPath) + .header("content-disposition", "attachment; filename = " + resultPath.getName()) .type(MediaType.APPLICATION_OCTET_STREAM).build(); + } catch (URISyntaxException e) { + throw new LensException(e); } } else { - String entity = ""; - if (result instanceof InMemoryResultSet || result instanceof PersistentResultSet) { - entity = "Result is available in driver's " - + (result instanceof InMemoryResultSet ? "memory" : "persistence") + "."; - } - return Response.status(Response.Status.NOT_FOUND).entity(entity).build(); + StreamingOutput stream = new StreamingOutput() { + @Override + public void write(OutputStream os) throws IOException { + FSDataInputStream fin = null; + try { + FileSystem fs = resultPath.getFileSystem(ctx.getConf()); + fin = fs.open(resultPath); + UtilityMethods.pipe(fin, os); + } finally { + if (fin != null) { + fin.close(); + } + + } + } + }; + return Response.ok(stream).header("content-disposition", "attachment; filename = " + resultPath.getName()) + .type(MediaType.APPLICATION_OCTET_STREAM).build(); } } @@ -2698,7 +2677,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE Collection<ResourceEntry> pendingResources = session.getPendingSessionResourcesForDatabase(ctx.getDatabase()); log.info("Adding pending {} session resources for session {} for database {}", pendingResources.size(), - sessionIdentifier, ctx.getDatabase()); + sessionIdentifier, ctx.getDatabase()); List<ResourceEntry> failedResources = addResources(pendingResources, sessionHandle, hiveDriver); // Mark added resources so that we don't add them again. If any of the resources failed // to be added, then they will be added again @@ -2761,7 +2740,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } log.debug("launchedQueries.remove(finishedQuery) has returned [{}] for finished query with query id:[{}]", modified, - finishedQuery.getQueryHandleString()); + finishedQuery.getQueryHandleString()); return modified; } @@ -2801,7 +2780,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE private void processWaitingQueries(final FinishedLensQuery finishedQuery) { Set<QueryContext> eligibleWaitingQueries = this.waitingQueriesSelector - .selectQueries(finishedQuery, this.waitingQueries); + .selectQueries(finishedQuery, this.waitingQueries); if (eligibleWaitingQueries.isEmpty()) { log.debug("No queries eligible to move out of waiting state."); http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java index b57bc64..55cabe2 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java @@ -39,20 +39,15 @@ public class QueryExecutionStatisticsGenerator extends AsyncEventListener<QueryE /** The Constant LOG. */ private static final Logger LOG = LoggerFactory.getLogger(QueryExecutionStatisticsGenerator.class); - /** The query service. */ - private final QueryExecutionServiceImpl queryService; - /** The event service. */ private final LensEventService eventService; /** * Instantiates a new query execution statistics generator. * - * @param queryService the query service * @param eventService the event service */ - public QueryExecutionStatisticsGenerator(QueryExecutionServiceImpl queryService, LensEventService eventService) { - this.queryService = queryService; + public QueryExecutionStatisticsGenerator(LensEventService eventService) { this.eventService = eventService; } @@ -68,7 +63,7 @@ public class QueryExecutionStatisticsGenerator extends AsyncEventListener<QueryE } QueryHandle handle = ended.getQueryHandle(); QueryExecutionStatistics event = new QueryExecutionStatistics(System.currentTimeMillis()); - QueryContext ctx = queryService.getQueryContext(handle); + QueryContext ctx = ended.getQueryContext(); if (ctx == null) { LOG.warn("Could not find the context for " + handle + " for event:" + ended.getCurrentValue() + ". No stat generated"); @@ -88,7 +83,7 @@ public class QueryExecutionStatisticsGenerator extends AsyncEventListener<QueryE QueryDriverStatistics driverStats = new QueryDriverStatistics(); driverStats.setDriverQuery(ctx.getSelectedDriverQuery()); driverStats.setStartTime(ctx.getDriverStatus().getDriverStartTime()); - driverStats.setEndTime(ctx.getDriverStatus().getDriverStartTime()); + driverStats.setEndTime(ctx.getDriverStatus().getDriverFinishTime()); event.setDriverStats(driverStats); try { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java index af42eb0..f568b17 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java @@ -65,17 +65,16 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> { */ @Override public void process(QueryExecuted event) { - formatOutput(event); + formatOutput(queryService.getQueryContext(event.getQueryHandle())); } /** * Format output. * - * @param event the event + * @param ctx the query context */ - private void formatOutput(QueryExecuted event) { - QueryHandle queryHandle = event.getQueryHandle(); - QueryContext ctx = queryService.getQueryContext(queryHandle); + private void formatOutput(QueryContext ctx) { + QueryHandle queryHandle = ctx.getQueryHandle(); this.logSegregationContext.setLogSegragationAndQueryId(ctx.getQueryHandleString()); try { if (!ctx.isPersistent()) { @@ -86,9 +85,8 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> { log.info("Result formatter for {}", queryHandle); LensResultSet resultSet = queryService.getDriverResultset(queryHandle); boolean isPersistedInDriver = resultSet instanceof PersistentResultSet; - if (isPersistedInDriver) { - // skip result formatting if persisted size is huge - Path persistedDirectory = new Path(ctx.getHdfsoutPath()); + if (isPersistedInDriver) { // skip result formatting if persisted size is huge + Path persistedDirectory = new Path(ctx.getDriverResultPath()); FileSystem fs = persistedDirectory.getFileSystem(ctx.getConf()); long size = fs.getContentSummary(persistedDirectory).getLength(); long threshold = ctx.getConf().getLong(LensConfConstants.RESULT_FORMAT_SIZE_THRESHOLD, @@ -112,7 +110,7 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> { } if (isPersistedInDriver) { log.info("Result formatter for {} in persistent result", queryHandle); - Path persistedDirectory = new Path(ctx.getHdfsoutPath()); + Path persistedDirectory = new Path(ctx.getDriverResultPath()); // write all files from persistent directory ((PersistedOutputFormatter) formatter).addRowsFromPersistedPath(persistedDirectory); } else { @@ -121,6 +119,7 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> { while (inmemory.hasNext()) { ((InMemoryOutputFormatter) formatter).writeRow(inmemory.next()); } + inmemory.setFullyAccessed(true); } if (ctx.getConf().getBoolean(LensConfConstants.QUERY_OUTPUT_WRITE_FOOTER, LensConfConstants.DEFAULT_OUTPUT_WRITE_FOOTER)) { http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/main/resources/lensserver-default.xml ---------------------------------------------------------------------- diff --git a/lens-server/src/main/resources/lensserver-default.xml b/lens-server/src/main/resources/lensserver-default.xml index 3c8fc09..1e6bd10 100644 --- a/lens-server/src/main/resources/lensserver-default.xml +++ b/lens-server/src/main/resources/lensserver-default.xml @@ -279,9 +279,9 @@ <!-- Finished Query Purging Configurations --> <property> - <name>lens.server.max.finished.queries</name> - <value>100</value> - <description>Maximum number of finished queries which lens server will keep in memory before purging.</description> + <name>lens.server.querypurger.sleep.interval</name> + <value>10000</value> + <description>The interval(milliseconds) with which purger to run periodically. Default 10 sec. </description> </property> <property> <name>lens.server.domain</name> http://git-wip-us.apache.org/repos/asf/lens/blob/669e8727/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java b/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java index 05bd99f..3dad050 100644 --- a/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java +++ b/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java @@ -23,15 +23,17 @@ import static org.testng.Assert.*; import java.io.IOException; import java.net.ServerSocket; import java.net.URI; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.ws.rs.core.UriBuilder; import org.apache.lens.driver.hive.TestRemoteHiveDriver; -import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.metrics.LensMetricsUtil; import org.apache.lens.server.api.metrics.MetricsService; import org.apache.lens.server.model.LogSegregationContext; import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext; +import org.apache.lens.server.query.QueryExecutionServiceImpl; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.Service; @@ -41,6 +43,7 @@ import org.glassfish.jersey.test.JerseyTest; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; /** @@ -186,7 +189,6 @@ public abstract class LensJerseyTest extends JerseyTest { */ public void restartLensServer() { HiveConf h = getServerConf(); - h.set(LensConfConstants.MAX_NUMBER_OF_FINISHED_QUERY, "0"); restartLensServer(h); } @@ -204,4 +206,19 @@ public abstract class LensJerseyTest extends JerseyTest { LensServices.get().start(); System.out.println("Lens services restarted!"); } + public static void waitForPurge(int allowUnpurgable, + ConcurrentLinkedQueue<QueryExecutionServiceImpl.FinishedQuery> finishedQueries) throws InterruptedException { + List<QueryExecutionServiceImpl.FinishedQuery> unPurgable = Lists.newArrayList(); + for (QueryExecutionServiceImpl.FinishedQuery finishedQuery : finishedQueries) { + if (!finishedQuery.canBePurged()) { + unPurgable.add(finishedQuery); + } + } + if (unPurgable.size() > allowUnpurgable) { + throw new RuntimeException("finished queries can't be purged: " + unPurgable); + } + while (finishedQueries.size() > allowUnpurgable) { + Thread.sleep(5000); + } + } }