Repository: lens Updated Branches: refs/heads/master fa8e5206f -> 2f22f60b2
LENS-753: Queue number for queries in submitted queue Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/2f22f60b Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/2f22f60b Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/2f22f60b Branch: refs/heads/master Commit: 2f22f60b2d50c20ec9291483d73d4fd735eb8ef9 Parents: fa8e520 Author: Akshay Goyal <akshaygoyal2...@gmail.com> Authored: Thu Aug 27 20:52:52 2015 +0530 Committer: Rajat Khandelwal <rajatgupt...@gmail.com> Committed: Thu Aug 27 20:52:52 2015 +0530 ---------------------------------------------------------------------- .../org/apache/lens/api/query/QueryStatus.java | 11 +++++++ .../lens/cli/commands/LensQueryCommands.java | 3 ++ .../server/api/driver/DriverQueryStatus.java | 14 +++++++-- .../server/api/query/FinishedLensQuery.java | 2 +- .../lens/server/api/query/QueryContext.java | 2 +- .../query/collect/ImmutableQueryCollection.java | 6 ++++ .../server/query/QueryExecutionServiceImpl.java | 15 +++++----- .../DefaultEstimatedQueryCollection.java | 5 ++++ .../query/collect/DefaultQueryCollection.java | 20 +++++++++++++ .../query/collect/MutableQueryCollection.java | 1 + .../ThreadSafeEstimatedQueryCollection.java | 5 ++++ .../collect/ThreadSafeQueryCollection.java | 5 ++++ .../collect/DefaultQueryCollectionTest.java | 23 +++++++++++++++ .../server/query/collect/QueryCollectUtil.java | 30 ++++++++++++++++++++ 14 files changed, 130 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java index 9614caa..0d73c3f 100644 --- a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java +++ b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java @@ -39,6 +39,8 @@ import lombok.*; * * @param progress * the progress + * @param queueNumber + * the queue number * @param status * the status * @param statusMessage @@ -121,6 +123,14 @@ public class QueryStatus implements Serializable { private double progress; /** + * The running queue number. A non zero value gives the queue number. Queue number zero mean either the query is in + * waiting or completed state. + */ + @XmlElement + @Getter + private int queueNumber; + + /** * The status. */ @XmlElement @@ -168,6 +178,7 @@ public class QueryStatus implements Serializable { StringBuilder str = new StringBuilder(status.toString()).append(':').append(statusMessage); if (status.equals(Status.RUNNING)) { str.append(" - Progress:").append(progress).append(":").append(progressMessage); + str.append(" - Queue number:").append(queueNumber); } if (status.equals(Status.SUCCESSFUL)) { if (isResultSetAvailable) { http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java ---------------------------------------------------------------------- diff --git a/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java b/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java index 7a5b177..5f90060 100644 --- a/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java +++ b/lens-cli/src/main/java/org/apache/lens/cli/commands/LensQueryCommands.java @@ -170,6 +170,9 @@ public class LensQueryCommands extends BaseLensCommand { sb.append("Progress Message : ").append(status.getProgressMessage()).append("\n"); } } + if (status.getQueueNumber() != 0) { + sb.append("Queue Number : ").append(status.getQueueNumber()).append("\n"); + } if (status.getErrorMessage() != null) { sb.append("Error : ").append(status.getErrorMessage()).append("\n"); http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java index f78b7c3..48a841b 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java @@ -89,6 +89,13 @@ public class DriverQueryStatus implements Serializable { private double progress = 0.0f; /** + * The running queue number. + */ + @Getter + @Setter + private int queueNumber = 0; + + /** * The state. */ @Getter @@ -167,7 +174,8 @@ public class DriverQueryStatus implements Serializable { break; } - return new QueryStatus(progress, qstate, statusMessage, isResultSetAvailable, progressMessage, errorMessage, null); + return new QueryStatus(progress, queueNumber, qstate, statusMessage, isResultSetAvailable, progressMessage, + errorMessage, null); } /** @@ -178,8 +186,8 @@ public class DriverQueryStatus implements Serializable { * @return the query status */ public static QueryStatus createQueryStatus(QueryStatus.Status state, DriverQueryStatus dstatus) { - return new QueryStatus(dstatus.progress, state, dstatus.statusMessage, dstatus.isResultSetAvailable, - dstatus.progressMessage, dstatus.errorMessage, null); + return new QueryStatus(dstatus.progress, dstatus.queueNumber, state, dstatus.statusMessage, + dstatus.isResultSetAvailable, dstatus.progressMessage, dstatus.errorMessage, null); } /* http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java index 6cecf7e..c9b8854 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java @@ -204,7 +204,7 @@ public class FinishedLensQuery { qctx.setQueryHandle(QueryHandle.fromString(handle)); qctx.setLaunchTime(this.startTime); qctx.setEndTime(getEndTime()); - qctx.setStatusSkippingTransitionTest(new QueryStatus(0.0, QueryStatus.Status.valueOf(getStatus()), + qctx.setStatusSkippingTransitionTest(new QueryStatus(0.0, 0, QueryStatus.Status.valueOf(getStatus()), getErrorMessage() == null ? "" : getErrorMessage(), getResult() != null, null, null, null)); qctx.getDriverStatus().setDriverStartTime(getDriverStartTime()); qctx.getDriverStatus().setDriverFinishTime(getDriverEndTime()); http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index beaa72f..0906b83 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -217,7 +217,7 @@ public class QueryContext extends AbstractQueryContext { super(userQuery, user, qconf, conf, drivers, mergeDriverConf); this.submissionTime = submissionTime; this.queryHandle = new QueryHandle(UUID.randomUUID()); - this.status = new QueryStatus(0.0f, Status.NEW, "Query just got created", false, null, null, null); + this.status = new QueryStatus(0.0f, 0, Status.NEW, "Query just got created", false, null, null, null); this.priority = Priority.NORMAL; this.lensConf = qconf; this.conf = conf; http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server-api/src/main/java/org/apache/lens/server/api/query/collect/ImmutableQueryCollection.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/collect/ImmutableQueryCollection.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/collect/ImmutableQueryCollection.java index 344ff10..4e3c95b 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/collect/ImmutableQueryCollection.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/collect/ImmutableQueryCollection.java @@ -50,4 +50,10 @@ public interface ImmutableQueryCollection { * @return Count of existing queries */ int getQueriesCount(); + + /** + * + * @return Index of a query within collection + */ + int getQueryIndex(final QueryContext query); } http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index c29a1ac..b23e0df 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -615,8 +615,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE checkEstimatedQueriesState(query); QueryStatus oldStatus = query.getStatus(); - QueryStatus newStatus = new QueryStatus(query.getStatus().getProgress(), QueryStatus.Status.LAUNCHED, - "Query is launched on driver", false, null, null, null); + QueryStatus newStatus = new QueryStatus(query.getStatus().getProgress(), query.getStatus().getQueueNumber(), + QueryStatus.Status.LAUNCHED, "Query is launched on driver", false, null, null, null); query.validateTransition(newStatus); // Check if we need to pass session's effective resources to selected driver @@ -717,7 +717,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE throws LensException { QueryStatus before = ctx.getStatus(); - ctx.setStatus(new QueryStatus(0.0f, FAILED, statusMsg, false, null, reason, lensErrorTO)); + ctx.setStatus(new QueryStatus(0.0f, 0, FAILED, statusMsg, false, null, reason, lensErrorTO)); updateFinishedQuery(ctx, before); fireStatusChangeEvent(ctx, ctx.getStatus(), before); } @@ -731,7 +731,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE */ private void setCancelledStatus(QueryContext ctx, String statusMsg) throws LensException { QueryStatus before = ctx.getStatus(); - ctx.setStatus(new QueryStatus(0.0f, CANCELED, statusMsg, false, null, null, null)); + ctx.setStatus(new QueryStatus(0.0f, 0, CANCELED, statusMsg, false, null, null, null)); updateFinishedQuery(ctx, before); fireStatusChangeEvent(ctx, ctx.getStatus(), before); } @@ -765,7 +765,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE void setSuccessState(QueryContext ctx) throws LensException { QueryStatus before = ctx.getStatus(); - ctx.setStatus(new QueryStatus(1.0f, SUCCESSFUL, "Query is successful!", ctx + ctx.setStatus(new QueryStatus(1.0f, 0, SUCCESSFUL, "Query is successful!", ctx .isResultAvailableInDriver(), null, null, null)); updateFinishedQuery(ctx, before); fireStatusChangeEvent(ctx, ctx.getStatus(), before); @@ -786,6 +786,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE log.info("Updating status for {}", ctx.getQueryHandle()); try { ctx.getSelectedDriver().updateStatus(ctx); + ctx.getDriverStatus().setQueueNumber(launchedQueries.getQueryIndex(ctx)); ctx.setStatus(ctx.getDriverStatus().toQueryStatus()); } catch (LensException exc) { // Driver gave exception while updating status @@ -946,7 +947,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE resultSets.remove(finished.getCtx().getQueryHandle()); } fireStatusChangeEvent(finished.getCtx(), - new QueryStatus(1f, CLOSED, "Query purged", false, null, null, null), finished.getCtx().getStatus()); + new QueryStatus(1f, 0, CLOSED, "Query purged", false, null, null, null), finished.getCtx().getStatus()); log.info("Query purged: {}", finished.getCtx().getQueryHandle()); } catch (LensException e) { @@ -1700,7 +1701,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE private QueryHandle submitQuery(final QueryContext ctx) throws LensException { QueryStatus before = ctx.getStatus(); - ctx.setStatus(new QueryStatus(0.0, QUEUED, "Query is queued", false, null, null, null)); + ctx.setStatus(new QueryStatus(0.0, 0, QUEUED, "Query is queued", false, null, null, null)); queuedQueries.add(ctx); log.debug("Added to Queued Queries:{}", ctx.getQueryHandleString()); allQueries.put(ctx.getQueryHandle(), ctx); http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultEstimatedQueryCollection.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultEstimatedQueryCollection.java b/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultEstimatedQueryCollection.java index e3505bb..bd27de8 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultEstimatedQueryCollection.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultEstimatedQueryCollection.java @@ -138,6 +138,11 @@ public class DefaultEstimatedQueryCollection implements EstimatedQueryCollection return this.queries.getQueriesCount(); } + @Override + public int getQueryIndex(QueryContext query) { + return this.queries.getQueryIndex(query); + } + @VisibleForTesting void checkState(final QueryContext query) { Preconditions.checkState(query.getSelectedDriver() != null); http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultQueryCollection.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultQueryCollection.java b/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultQueryCollection.java index f9e7701..0387cce 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultQueryCollection.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/collect/DefaultQueryCollection.java @@ -96,6 +96,26 @@ public class DefaultQueryCollection implements QueryCollection { return queries.size(); } + + /** + * Since the collection is a linkedHashSet, the order of queries is always maintained. + * @param query + * @return + */ + @Override + public int getQueryIndex(QueryContext query) { + Iterator iterator = queries.iterator(); + int index = 1; + while (iterator.hasNext()) { + QueryContext queuedQuery = (QueryContext) iterator.next(); + if (queuedQuery.getQueryHandle().equals(query.getQueryHandle())) { + return index; + } + index += 1; + } + return 0; + } + private Collection<QueryContext> getQueriesCollectionForUser(final String user) { final Collection<QueryContext> userQueries = queriesByUser.getCollection(user); http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server/src/main/java/org/apache/lens/server/query/collect/MutableQueryCollection.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/collect/MutableQueryCollection.java b/lens-server/src/main/java/org/apache/lens/server/query/collect/MutableQueryCollection.java index e6e777c..de0779a 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/collect/MutableQueryCollection.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/collect/MutableQueryCollection.java @@ -62,4 +62,5 @@ public interface MutableQueryCollection { * @param queries */ boolean removeAll(final Set<QueryContext> queries); + } http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeEstimatedQueryCollection.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeEstimatedQueryCollection.java b/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeEstimatedQueryCollection.java index cdbd2ad..5b80296 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeEstimatedQueryCollection.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeEstimatedQueryCollection.java @@ -70,6 +70,11 @@ public class ThreadSafeEstimatedQueryCollection implements EstimatedQueryCollect } @Override + public synchronized int getQueryIndex(QueryContext query) { + return this.estimatedQueries.getQueryIndex(query); + } + + @Override public synchronized boolean add(QueryContext query) { return this.estimatedQueries.add(query); } http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeQueryCollection.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeQueryCollection.java b/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeQueryCollection.java index 7b43a38..4d5c7a1 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeQueryCollection.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/collect/ThreadSafeQueryCollection.java @@ -72,4 +72,9 @@ public class ThreadSafeQueryCollection implements QueryCollection { public synchronized int getQueriesCount() { return this.queries.getQueriesCount(); } + + @Override + public synchronized int getQueryIndex(QueryContext query) { + return this.queries.getQueryIndex(query); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server/src/test/java/org/apache/lens/server/query/collect/DefaultQueryCollectionTest.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/collect/DefaultQueryCollectionTest.java b/lens-server/src/test/java/org/apache/lens/server/query/collect/DefaultQueryCollectionTest.java index 7a81e83..f55094a 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/collect/DefaultQueryCollectionTest.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/collect/DefaultQueryCollectionTest.java @@ -34,6 +34,7 @@ import org.testng.annotations.Test; public class DefaultQueryCollectionTest { private static final String MOCK_USER = "MockUserEmail"; + private static final String MOCK_HANDLE = "0-0-0-0-"; /* Note: Since verification of addition/removal required calling get methods, hence methods getQueriesCount and getQueries(user) are indirectly getting tested in these tests */ @@ -76,6 +77,28 @@ public class DefaultQueryCollectionTest { } @Test + public void testRemoveMethodMustChangeQueryIndices() { + + /* Initialization */ + final int noOfQueriesUsedInTest = 10; + QueryCollection queries = createQueriesInstanceWithQueryHandleStubbing(noOfQueriesUsedInTest, MOCK_HANDLE); + + QueryContext completedQuery = getMockedQueryFromQueries(queries.getQueries(), MOCK_HANDLE, 4); + QueryContext runningQuery = getMockedQueryFromQueries(queries.getQueries(), MOCK_HANDLE, 5); + + /* Execution */ + queries.remove(completedQuery); + + /* Verification 1: Verifies that queries were removed from queries list by calling getQueriesCount which gets + results from queries list */ + assertEquals(queries.getQueriesCount(), 9); + + /* Verification 2: Verifies that query index is decreased after removal of queries which were present before + them in the queries list */ + assertEquals(queries.getQueryIndex(runningQuery), 4); + } + + @Test public void testGetQueriesMustReturnCopyOfUnderlyingCollection() { /* Initialization */ http://git-wip-us.apache.org/repos/asf/lens/blob/2f22f60b/lens-server/src/test/java/org/apache/lens/server/query/collect/QueryCollectUtil.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/collect/QueryCollectUtil.java b/lens-server/src/test/java/org/apache/lens/server/query/collect/QueryCollectUtil.java index 51fcf00..63d1508 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/collect/QueryCollectUtil.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/collect/QueryCollectUtil.java @@ -27,8 +27,10 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; import java.lang.reflect.Method; +import java.util.Iterator; import java.util.Set; +import org.apache.lens.api.query.QueryHandle; import org.apache.lens.server.api.query.QueryContext; import com.google.common.collect.Sets; @@ -77,6 +79,34 @@ public class QueryCollectUtil { return new DefaultQueryCollection(mockQueries); } + public static QueryCollection createQueriesInstanceWithQueryHandleStubbing(final int reqNoOfMockQueries, + final String handlePrefix) { + + Set<QueryContext> mockQueries = Sets.newLinkedHashSet(); + + for (int index = 1; index <= reqNoOfMockQueries; ++index) { + mockQueries.add(createQueryInstanceWithQueryHandleStubbing(handlePrefix, index)); + } + return new DefaultQueryCollection(mockQueries); + } + + public static QueryContext createQueryInstanceWithQueryHandleStubbing(String handlePrefix, int index) { + QueryContext mockQuery = mock(QueryContext.class); + when(mockQuery.getQueryHandle()).thenReturn(QueryHandle.fromString(handlePrefix + index)); + return mockQuery; + } + + public static QueryContext getMockedQueryFromQueries(Set<QueryContext> queries, String mockHandle, int index) { + Iterator iterator = queries.iterator(); + while (iterator.hasNext()) { + QueryContext queuedQuery = (QueryContext) iterator.next(); + if (queuedQuery.getQueryHandle().equals(QueryHandle.fromString(mockHandle + index))) { + return queuedQuery; + } + } + return null; + } + public static QueryCollection stubMockQueryAndCreateQueriesInstance(final QueryContext mockQuery, final String mockUser) {