Repository: lens Updated Branches: refs/heads/master e0c495e28 -> 8cd7c2022
LENS-919 : Persist Query Priority in finished queries in Lens DB Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/8cd7c202 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/8cd7c202 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/8cd7c202 Branch: refs/heads/master Commit: 8cd7c2022742fba5f33daeaf183b8eede3b6fcc3 Parents: e0c495e Author: Puneet Gupta <[email protected]> Authored: Thu Mar 17 14:22:41 2016 +0530 Committer: Amareshwari Sriramadasu <[email protected]> Committed: Thu Mar 17 14:22:41 2016 +0530 ---------------------------------------------------------------------- .../org/apache/lens/driver/hive/HiveDriver.java | 2 +- .../server/api/driver/AbstractLensDriver.java | 7 ++-- .../lens/server/api/driver/LensDriver.java | 4 +- .../server/api/query/AbstractQueryContext.java | 21 ++++++++++ .../server/api/query/FinishedLensQuery.java | 13 ++++++ .../lens/server/api/query/QueryContext.java | 25 +----------- .../apache/lens/server/query/LensServerDAO.java | 20 +++++----- .../server/query/QueryExecutionServiceImpl.java | 3 +- .../apache/lens/server/query/TestLensDAO.java | 3 ++ .../lens/server/query/TestQueryService.java | 42 ++++++++++++++------ 10 files changed, 85 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java index f422543..aa37dcc 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java @@ -800,7 +800,7 @@ public class HiveDriver extends AbstractLensDriver { } @Override - public Priority decidePriority(QueryContext ctx) { + public Priority decidePriority(AbstractQueryContext ctx) { if (whetherCalculatePriority && ctx.getDriverConf(this).get("mapred.job.priority") == null) { try { // Inside try since non-data fetching queries can also be executed by async method. http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java index d447417..883ad9d 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java @@ -21,10 +21,10 @@ package org.apache.lens.server.api.driver; import org.apache.lens.api.Priority; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.query.AbstractQueryContext; import org.apache.lens.server.api.query.QueryContext; import org.apache.commons.lang.StringUtils; - import org.apache.hadoop.conf.Configuration; import lombok.Getter; @@ -98,9 +98,8 @@ public abstract class AbstractLensDriver implements LensDriver { } @Override - public Priority decidePriority(QueryContext queryContext) { - // no-op by default - return null; + public Priority decidePriority(AbstractQueryContext queryContext) { + return Priority.NORMAL; } @Override http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java index ed97673..69295d9 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java @@ -215,8 +215,8 @@ public interface LensDriver extends Externalizable { /** * decide priority based on query's cost. The cost should be already computed by estimate call, but it's * not guaranteed to be pre-computed. It's up to the driver to do an on-demand computation of cost. - * @see QueryContext#decidePriority(LensDriver, QueryPriorityDecider) that handles this on-demand computation. + * @see AbstractQueryContext#decidePriority(LensDriver, QueryPriorityDecider) that handles this on-demand computation. * @param queryContext */ - Priority decidePriority(QueryContext queryContext); + Priority decidePriority(AbstractQueryContext queryContext); } http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java index 62ed293..b568ffb 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.lens.api.LensConf; +import org.apache.lens.api.Priority; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.driver.DriverQueryPlan; import org.apache.lens.server.api.driver.LensDriver; @@ -35,6 +36,7 @@ import org.apache.lens.server.api.metrics.MethodMetricsContext; import org.apache.lens.server.api.metrics.MethodMetricsFactory; import org.apache.lens.server.api.query.DriverSelectorQueryContext.DriverQueryContext; import org.apache.lens.server.api.query.cost.QueryCost; +import org.apache.lens.server.api.query.priority.QueryPriorityDecider; import org.apache.lens.server.api.util.LensUtil; import org.apache.hadoop.conf.Configuration; @@ -129,6 +131,13 @@ public abstract class AbstractQueryContext implements Serializable { /** Lock used to synchronize HiveConf access */ private transient Lock hiveConfLock = new ReentrantLock(); + /** + * The priority. + */ + @Getter + @Setter + private Priority priority; + protected AbstractQueryContext(final String query, final String user, final LensConf qconf, final Configuration conf, final Collection<LensDriver> drivers, boolean mergeDriverConf) { if (conf.getBoolean(LensConfConstants.ENABLE_QUERY_METRICS, LensConfConstants.DEFAULT_ENABLE_QUERY_METRICS)) { @@ -469,4 +478,16 @@ public abstract class AbstractQueryContext implements Serializable { driverContext.clearTransientStateAfterCompleted(); hiveConf = null; } + + public Priority decidePriority(LensDriver driver, QueryPriorityDecider queryPriorityDecider) throws LensException { + // On-demand re-computation of cost, in case it's not alredy set by a previous estimate call. + // In driver test cases, estimate doesn't happen. Hence this code path ensures cost is computed and + // priority is set based on correct cost. + if (getDriverQueryCost(driver) == null) { + setDriverCost(driver, driver.estimate(this)); + } + priority = queryPriorityDecider.decidePriority(getDriverQueryCost(driver)); + return priority; + } + } http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java index a57a6e4..e1ce415 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/FinishedLensQuery.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Iterator; import org.apache.lens.api.LensConf; +import org.apache.lens.api.Priority; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryStatus; import org.apache.lens.server.api.driver.LensDriver; @@ -30,6 +31,7 @@ import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy; import org.apache.hadoop.conf.Configuration; import com.google.common.collect.ImmutableSet; + import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -168,6 +170,10 @@ public class FinishedLensQuery { @Getter private LensDriver selectedDriver; + @Getter + @Setter + private String priority; + /** * Instantiates a new finished lens query. */ @@ -199,6 +205,10 @@ public class FinishedLensQuery { if (null != ctx.getSelectedDriver()) { this.driverName = ctx.getSelectedDriver().getFullyQualifiedName(); } + //Priority can be null in case no driver is fit to execute a query and launch fails. + if (null != ctx.getPriority()) { + this.priority = ctx.getPriority().toString(); + } } public QueryContext toQueryContext(Configuration conf, Collection<LensDriver> drivers) { @@ -220,6 +230,9 @@ public class FinishedLensQuery { qctx.getDriverStatus().setDriverFinishTime(getDriverEndTime()); qctx.setResultSetPath(getResult()); qctx.setQueryName(getQueryName()); + if (getPriority() != null) { + qctx.setPriority(Priority.valueOf(getPriority())); + } return qctx; } http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index 96846c1..d01e4a4 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.UUID; import org.apache.lens.api.LensConf; -import org.apache.lens.api.Priority; import org.apache.lens.api.query.LensQuery; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryStatus; @@ -42,7 +41,6 @@ import org.apache.lens.server.api.driver.PartiallyFetchedInMemoryResultSet; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy; import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; -import org.apache.lens.server.api.query.priority.QueryPriorityDecider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -73,12 +71,6 @@ public class QueryContext extends AbstractQueryContext { private QueryHandle queryHandle; /** - * The priority. - */ - @Getter - private Priority priority; - - /** * The is persistent. */ @Getter @@ -332,7 +324,7 @@ public class QueryContext extends AbstractQueryContext { * @return the lens query */ public LensQuery toLensQuery() { - return new LensQuery(queryHandle, userQuery, super.getSubmittedUser(), priority, isPersistent, + return new LensQuery(queryHandle, userQuery, super.getSubmittedUser(), getPriority(), isPersistent, getSelectedDriver() != null ? getSelectedDriver().getFullyQualifiedName() : null, getSelectedDriverQuery(), status, @@ -458,21 +450,6 @@ public class QueryContext extends AbstractQueryContext { return getSelectedDriver().getWaitingQuerySelectionPolicies(); } - public Priority decidePriority(LensDriver driver, QueryPriorityDecider queryPriorityDecider) throws LensException { - // On-demand re-computation of cost, in case it's not alredy set by a previous estimate call. - // In driver test cases, estimate doesn't happen. Hence this code path ensures cost is computed and - // priority is set based on correct cost. - calculateCost(driver); - priority = queryPriorityDecider.decidePriority(getDriverQueryCost(driver)); - return priority; - } - - private void calculateCost(LensDriver driver) throws LensException { - if (getDriverQueryCost(driver) == null) { - setDriverCost(driver, driver.estimate(this)); - } - } - public synchronized void registerDriverResult(LensResultSet result) throws LensException { if (isDriverResultRegistered) { return; //already registered http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java index d8e654d..1d6125c 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java @@ -82,10 +82,10 @@ public class LensServerDAO { */ public void createFinishedQueriesTable() throws Exception { String sql = "CREATE TABLE if not exists finished_queries (handle varchar(255) not null unique," - + "userquery varchar(10000) not null," + "submitter varchar(255) not null," + "starttime bigint, " - + "endtime bigint," + "result varchar(255)," + "status varchar(255), " + "metadata varchar(100000), " - + "rows int, " + "filesize bigint, " + "errormessage varchar(10000), " + "driverstarttime bigint, " - + "driverendtime bigint, " + "drivername varchar(10000), " + + "userquery varchar(10000) not null," + "submitter varchar(255) not null," + "priority varchar(255), " + + "starttime bigint, " + "endtime bigint," + "result varchar(255)," + "status varchar(255), " + + "metadata varchar(100000), " + "rows int, " + "filesize bigint, " + "errormessage varchar(10000), " + + "driverstarttime bigint, " + "driverendtime bigint, " + "drivername varchar(10000), " + "queryname varchar(255), " + "submissiontime bigint" + ")"; try { QueryRunner runner = new QueryRunner(ds); @@ -107,17 +107,17 @@ public class LensServerDAO { if (alreadyExisting == null) { // The expected case Connection conn = null; - String sql = "insert into finished_queries (handle, userquery,submitter," + String sql = "insert into finished_queries (handle, userquery, submitter, priority, " + "starttime,endtime,result,status,metadata,rows,filesize," + "errormessage,driverstarttime,driverendtime, drivername, queryname, submissiontime)" - + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; try { conn = getConnection(); QueryRunner runner = new QueryRunner(); - runner.update(conn, sql, query.getHandle(), query.getUserQuery(), query.getSubmitter(), query.getStartTime(), - query.getEndTime(), query.getResult(), query.getStatus(), query.getMetadata(), query.getRows(), - query.getFileSize(), query.getErrorMessage(), query.getDriverStartTime(), query.getDriverEndTime(), - query.getDriverName(), query.getQueryName(), query.getSubmissionTime()); + runner.update(conn, sql, query.getHandle(), query.getUserQuery(), query.getSubmitter(), query.getPriority(), + query.getStartTime(), query.getEndTime(), query.getResult(), query.getStatus(), query.getMetadata(), + query.getRows(), query.getFileSize(), query.getErrorMessage(), query.getDriverStartTime(), + query.getDriverEndTime(), query.getDriverName(), query.getQueryName(), query.getSubmissionTime()); conn.commit(); } finally { DbUtils.closeQuietly(conn); http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 49ab241..4c95506 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -1387,6 +1387,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE ctx.setSelectedDriver(driver); QueryCost selectedDriverQueryCost = ctx.getDriverContext().getDriverQueryCost(driver); ctx.setSelectedDriverQueryCost(selectedDriverQueryCost); + driver.decidePriority(ctx); selectGauge.markSuccess(); } finally { parallelCallGauge.markSuccess(); @@ -1782,7 +1783,6 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE ctx.setLensSessionIdentifier(sessionHandle.getPublicId().toString()); rewriteAndSelect(ctx); - ctx.getSelectedDriver().decidePriority(ctx); return submitQuery(ctx); } @@ -2000,7 +2000,6 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE result.setStatus(queryCtx.getStatus()); return result; } - QueryCompletionListenerImpl listener = new QueryCompletionListenerImpl(handle); synchronized (queryCtx) { if (!queryCtx.getStatus().finished()) { http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java b/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java index 760e306..7679a06 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.Priority; import org.apache.lens.api.query.LensQuery; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryStatus; @@ -69,6 +70,7 @@ public class TestLensDAO { queryContext.getDriverContext().setSelectedDriver(new MockDriver()); FinishedLensQuery finishedLensQuery = new FinishedLensQuery(queryContext); finishedLensQuery.setStatus(QueryStatus.Status.SUCCESSFUL.name()); + finishedLensQuery.setPriority(Priority.NORMAL.toString()); // Validate JDBC driver RS Meta can be deserialized @@ -120,6 +122,7 @@ public class TestLensDAO { Assert.assertEquals(actualRsMeta.getColumns().get(0).getName().toLowerCase(), "handle"); Assert.assertEquals(actual.getHandle(), finishedHandle); + Assert.assertEquals(Priority.valueOf(actual.getPriority()), Priority.NORMAL); // Test find finished queries LensSessionHandle session = service.openSession("foo@localhost", "bar", new HashMap<String, String>()); http://git-wip-us.apache.org/repos/asf/lens/blob/8cd7c202/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java index 699fa68..49de59c 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java @@ -38,6 +38,7 @@ import javax.ws.rs.core.*; import org.apache.lens.api.APIResult; import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.Priority; import org.apache.lens.api.jaxb.LensJAXBContextResolver; import org.apache.lens.api.query.*; import org.apache.lens.api.query.QueryStatus.Status; @@ -269,7 +270,8 @@ public class TestQueryService extends LensJerseyTest { long runningQueries = metricsSvc.getRunningQueries(); long finishedQueries = metricsSvc.getFinishedQueries(); - QueryHandle handle = executeAndGetHandle(target(), Optional.of(lensSessionId), Optional.of("select ID from " + int noOfQueriesBeforeExecution = queryService.allQueries.size(); + QueryHandle theHandle = executeAndGetHandle(target(), Optional.of(lensSessionId), Optional.of("select ID from " + TEST_TABLE), Optional.<LensConf>absent(), mt); // Get all queries @@ -281,24 +283,25 @@ public class TestQueryService extends LensJerseyTest { List<QueryHandle> allQueries = target.queryParam("sessionid", lensSessionId).request(mt) .get(new GenericType<List<QueryHandle>>() {}); assertTrue(allQueries.size() >= 1); - assertTrue(allQueries.contains(handle)); + assertTrue(allQueries.contains(theHandle)); - String queryXML = target.path(handle.toString()).queryParam("sessionid", lensSessionId) + String queryXML = target.path(theHandle.toString()).queryParam("sessionid", lensSessionId) .request(MediaType.APPLICATION_XML).get(String.class); log.debug("query XML:{}", queryXML); - Response response = target.path(handle.toString() + "001").queryParam("sessionid", lensSessionId).request(mt).get(); + Response response = + target.path(theHandle.toString() + "001").queryParam("sessionid", lensSessionId).request(mt).get(); assertEquals(response.getStatus(), 404); - LensQuery ctx = target.path(handle.toString()).queryParam("sessionid", lensSessionId).request(mt) + LensQuery query = target.path(theHandle.toString()).queryParam("sessionid", lensSessionId).request(mt) .get(LensQuery.class); // wait till the query finishes - QueryStatus stat = ctx.getStatus(); + QueryStatus stat = query.getStatus(); while (!stat.finished()) { Thread.sleep(1000); - ctx = target.path(handle.toString()).queryParam("sessionid", lensSessionId).request(mt).get(LensQuery.class); - stat = ctx.getStatus(); + query = target.path(theHandle.toString()).queryParam("sessionid", lensSessionId).request(mt).get(LensQuery.class); + stat = query.getStatus(); /* Commented due to same issue as: https://issues.apache.org/jira/browse/LENS-683 switch (stat.getStatus()) { @@ -312,9 +315,24 @@ public class TestQueryService extends LensJerseyTest { }*/ } - assertTrue(ctx.getSubmissionTime() > 0); - assertTrue(ctx.getFinishTime() > 0); - assertEquals(ctx.getStatus().getStatus(), Status.SUCCESSFUL); + assertTrue(query.getSubmissionTime() > 0); + assertTrue(query.getFinishTime() > 0); + assertEquals(query.getStatus().getStatus(), Status.SUCCESSFUL); + + assertEquals(query.getPriority(), Priority.LOW); + //Check Query Priority can be read even after query is purged i,e query details are read from DB. + boolean isPurged = false; + while (!isPurged) { + isPurged = true; + for (QueryHandle aHandle : queryService.allQueries.keySet()) { + if (aHandle.equals(theHandle)) { + isPurged = false; //current query is still not purged + Thread.sleep(1000); + break; + } + } + } + assertEquals(query.getPriority(), Priority.LOW); // Update conf for query final FormDataMultiPart confpart = new FormDataMultiPart(); @@ -324,7 +342,7 @@ public class TestQueryService extends LensJerseyTest { mt)); confpart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, mt)); - APIResult updateConf = target.path(handle.toString()).request(mt) + APIResult updateConf = target.path(theHandle.toString()).request(mt) .put(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), APIResult.class); assertEquals(updateConf.getStatus(), APIResult.Status.FAILED); }
