Repository: lens Updated Branches: refs/heads/master 61ee6bfc8 -> 9ef7ce736
LENS-1345: Fixing deadlock in jdbc query status update flow Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/9ef7ce73 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/9ef7ce73 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/9ef7ce73 Branch: refs/heads/master Commit: 9ef7ce73693d039ab5a197b4227c5c09c2efcaa4 Parents: 61ee6bf Author: Rajat Khandelwal <[email protected]> Authored: Mon Oct 10 14:20:31 2016 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Mon Oct 10 14:20:31 2016 +0530 ---------------------------------------------------------------------- .../org/apache/lens/driver/jdbc/JDBCDriver.java | 58 ++++++++------------ .../apache/lens/driver/jdbc/TestJdbcDriver.java | 3 + .../server/api/driver/AbstractLensDriver.java | 4 ++ .../server/api/driver/DriverQueryStatus.java | 7 ++- .../lens/server/api/query/QueryContext.java | 20 ++++--- .../server/query/QueryExecutionServiceImpl.java | 5 +- .../lens/server/common/RestAPITestUtil.java | 6 +- .../lens/server/query/TestQueryService.java | 22 ++++++-- 8 files changed, 72 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java index f805ec6..e41077c 100644 --- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java +++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java @@ -292,11 +292,11 @@ public class JDBCDriver extends AbstractLensDriver { if (queryContext.getLensContext().getDriverStatus().isCanceled()) { return result; } - queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable); - queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL); if (isResultAvailable) { result.resultSet = stmt.getResultSet(); } + queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable); + queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL); } catch (Exception e) { if (queryContext.getLensContext().getDriverStatus().isCanceled()) { return result; @@ -887,7 +887,6 @@ public class JDBCDriver extends AbstractLensDriver { queryContext.setPrepared(false); queryContext.setRewrittenQuery(rewrittenQuery); return new QueryCallable(queryContext, logSegregationContext).call(); - // LOG.info("Execute " + context.getQueryHandle()); } /** @@ -930,51 +929,40 @@ public class JDBCDriver extends AbstractLensDriver { return; } if (ctx.getResultFuture().isCancelled()) { - context.getDriverStatus().setProgress(1.0); - context.getDriverStatus().setState(DriverQueryState.CANCELED); - context.getDriverStatus().setStatusMessage("Query Canceled"); + if (!context.getDriverStatus().isCanceled()) { + context.getDriverStatus().setProgress(1.0); + context.getDriverStatus().setState(DriverQueryState.CANCELED); + context.getDriverStatus().setStatusMessage("Query Canceled"); + } } else if (ctx.getResultFuture().isDone()) { context.getDriverStatus().setProgress(1.0); // Since future is already done, this call should not block if (ctx.getQueryResult() != null && ctx.getQueryResult().error != null) { - context.getDriverStatus().setState(DriverQueryState.FAILED); - context.getDriverStatus().setStatusMessage("Query execution failed!"); - context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage()); + if (!context.getDriverStatus().isFailed()) { + context.getDriverStatus().setState(DriverQueryState.FAILED); + context.getDriverStatus().setStatusMessage("Query execution failed!"); + context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage()); + } } else { - context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL); - context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " successful"); - context.getDriverStatus().setResultSetAvailable(true); + if (!context.getDriverStatus().isFinished()) { + // assuming successful + context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL); + context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " successful"); + context.getDriverStatus().setResultSetAvailable(true); + } } } else { - context.getDriverStatus().setState(DriverQueryState.RUNNING); - context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " is running"); + if (!context.getDriverStatus().isRunning()) { + context.getDriverStatus().setState(DriverQueryState.RUNNING); + context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " is running"); + } } } @Override protected LensResultSet createResultSet(QueryContext ctx) throws LensException { checkConfigured(); - return getDriverResult(ctx); - } - - private LensResultSet getDriverResult(QueryContext context) throws LensException { - JdbcQueryContext ctx = getQueryContext(context.getQueryHandle()); - if (ctx.getLensContext().getDriverStatus().isCanceled()) { - throw new LensException("Result set not available for canceled query " + context.getQueryHandle()); - } - - Future<QueryResult> future = ctx.getResultFuture(); - QueryHandle queryHandle = context.getQueryHandle(); - - try { - return future.get().getLensResultSet(true); - } catch (InterruptedException e) { - throw new LensException("Interrupted while getting resultset for query " + queryHandle.getHandleId(), e); - } catch (ExecutionException e) { - throw new LensException("Error while executing query " + queryHandle.getHandleId() + " in background", e); - } catch (CancellationException e) { - throw new LensException("Query was already canceled " + queryHandle.getHandleId(), e); - } + return getQueryContext(ctx.getQueryHandle()).getQueryResult().getLensResultSet(true); } /** http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java index 6e9086f..2ad7f76 100644 --- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java +++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java @@ -496,6 +496,9 @@ public class TestJdbcDriver { QueryContext context = createQueryContext(query, conf); context.setExecuteTimeoutMillis(executeTimeoutMillis); driver.executeAsync(context); + while (!context.getDriverStatus().isFinished()) { + Thread.sleep(1000); + } LensResultSet resultSet = driver.fetchResultSet(context); assertNotNull(resultSet); http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java index e498479..365a619 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java @@ -66,6 +66,10 @@ public abstract class AbstractLensDriver implements LensDriver { @Override public LensResultSet fetchResultSet(QueryContext ctx) throws LensException { log.info("FetchResultSet: {}", ctx.getQueryHandle()); + if (!ctx.getDriverStatus().isSuccessful()) { + throw new LensException("Can't fetch results for a " + ctx.getQueryHandleString() + " because it's status is " + + ctx.getStatus()); + } ctx.registerDriverResult(createResultSet(ctx)); // registerDriverResult makes sure registration happens ony once return ctx.getDriverResult(); } http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java index 033f677..fc24fc6 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java @@ -218,5 +218,10 @@ public class DriverQueryStatus implements Serializable { public boolean isCanceled() { return state.equals(DriverQueryState.CANCELED); } - + public boolean isFailed() { + return state.equals(DriverQueryState.FAILED); + } + public boolean isRunning() { + return state.equals(DriverQueryState.RUNNING); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index b584c6a..d0662f4 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -23,10 +23,7 @@ import static org.apache.lens.server.api.LensConfConstants.DEFAULT_PREFETCH_INME import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET; import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Future; import org.apache.lens.api.LensConf; @@ -203,7 +200,7 @@ public class QueryContext extends AbstractQueryContext { @Getter @Setter private transient Future queryLauncher; - private List<QueryDriverStatusUpdateListener> driverStatusUpdateListener = Lists.newArrayList(); + private final List<QueryDriverStatusUpdateListener> driverStatusUpdateListeners = Lists.newArrayList(); /** * Creates context from query @@ -469,6 +466,9 @@ public class QueryContext extends AbstractQueryContext { public boolean successful() { return this.status.successful(); } + public boolean executed() { + return this.status.executed(); + } public boolean launched() { return this.status.launched(); @@ -558,8 +558,10 @@ public class QueryContext extends AbstractQueryContext { getDriverStatus().setStatusMessage("Query " + getQueryHandleString() + " " + state.name().toLowerCase()); } getDriverStatus().setState(state); - for (QueryDriverStatusUpdateListener listener: this.driverStatusUpdateListener) { - listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus()); + synchronized (this.driverStatusUpdateListeners) { + for (QueryDriverStatusUpdateListener listener : this.driverStatusUpdateListeners) { + listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus()); + } } } @@ -573,6 +575,8 @@ public class QueryContext extends AbstractQueryContext { public void registerStatusUpdateListener(QueryDriverStatusUpdateListener driverStatusUpdateListener) { - this.driverStatusUpdateListener.add(driverStatusUpdateListener); + synchronized (this.driverStatusUpdateListeners) { + this.driverStatusUpdateListeners.add(driverStatusUpdateListener); + } } } http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 87d7cb0..cb5961f 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -582,12 +582,11 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE driverRS = ctx.getSelectedDriver().fetchResultSet(getCtx()); } catch (Exception e) { log.error( - "Error while getting result set form driver {}. Driver result set based purging logic will be ignored", - ctx.getSelectedDriver(), e); + "Error while getting result set form driver {}. Driver result set based purging logic will be ignored", + ctx.getSelectedDriver(), e); } } } - public boolean canBePurged() { try { if (getCtx().getStatus().getStatus().equals(SUCCESSFUL) && getCtx().getStatus().isResultSetAvailable()) { http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java b/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java index 57786e6..02e2f8b 100644 --- a/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java +++ b/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java @@ -249,9 +249,13 @@ public class RestAPITestUtil { public static PersistentQueryResult getLensQueryResult(final WebTarget target, final LensSessionHandle lensSessionHandle, final QueryHandle handle, MediaType mt) throws InterruptedException { + return getLensQueryResult(target, lensSessionHandle, handle, PersistentQueryResult.class, mt); + } + public static <T> T getLensQueryResult(final WebTarget target, final LensSessionHandle lensSessionHandle, + final QueryHandle handle, Class<T> clazz, MediaType mt) throws InterruptedException { waitForQueryToFinish(target, lensSessionHandle, handle, QueryStatus.Status.SUCCESSFUL, mt); return target.path("queryapi/queries").path(handle.toString()).path("resultset") - .queryParam("sessionid", lensSessionHandle).request(mt).get(PersistentQueryResult.class); + .queryParam("sessionid", lensSessionHandle).request(mt).get(clazz); } public static Response getLensQueryHttpResult(final WebTarget target, final LensSessionHandle lensSessionHandle, http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java index 3f71aef..440c30b 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java @@ -66,6 +66,7 @@ import org.apache.lens.server.api.query.QueryContext; import org.apache.lens.server.api.query.QueryExecutionService; import org.apache.lens.server.api.session.SessionService; import org.apache.lens.server.common.ErrorResponseExpectedData; +import org.apache.lens.server.common.RestAPITestUtil; import org.apache.lens.server.common.TestDataUtils; import org.apache.lens.server.common.TestResourceFile; import org.apache.lens.server.error.GenericExceptionMapper; @@ -1426,6 +1427,17 @@ public class TestQueryService extends LensJerseyTest { }; } + @Test + public void testExecuteAsyncJDBCQuery() throws InterruptedException { + String query = "select ID, IDSTR from " + TEST_JDBC_TABLE; + QueryHandle handle = RestAPITestUtil.executeAndGetHandle(target(), Optional.of(lensSessionId), Optional.of(query), + Optional.of(getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, false)), APPLICATION_XML_TYPE); + // fetch results so that it can be purged + InMemoryQueryResult queryResult = RestAPITestUtil.getLensQueryResult(target(), lensSessionId, handle, + InMemoryQueryResult.class, APPLICATION_XML_TYPE); + assertEquals(queryResult.getRows().size(), 5); + } + /** * @param timeOutMillis : wait time for execute with timeout api * @param preFetchRows : number of rows to pre-fetch in case of InMemoryResultSet @@ -1457,7 +1469,7 @@ public class TestQueryService extends LensJerseyTest { conf.addProperty("deferPersistenceByMillis", deferPersistenceByMillis); // property used for test only mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, APPLICATION_XML_TYPE)); - QueryHandleWithResultSet result =target.request(APPLICATION_XML_TYPE) + QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE) .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData(); QueryHandle handle = result.getQueryHandle(); @@ -1945,16 +1957,16 @@ public class TestQueryService extends LensJerseyTest { WebTarget target = target().path("queryapi/queries"); final FormDataMultiPart mp = new FormDataMultiPart(); mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, - MediaType.APPLICATION_XML_TYPE)); + APPLICATION_XML_TYPE)); mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " + TEST_TABLE)); mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout")); mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), "300000")); mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("queryName").build(), queryName.toString())); mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), - MediaType.APPLICATION_XML_TYPE)); + APPLICATION_XML_TYPE)); - QueryHandleWithResultSet result = target.request(MediaType.APPLICATION_XML_TYPE) + QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE) .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData(); assertNotNull(result.getQueryHandle()); @@ -1963,7 +1975,7 @@ public class TestQueryService extends LensJerseyTest { target = target().path("queryapi/queries/detail"); List<LensQuery> results = target.queryParam("queryName", queryName) .queryParam("sessionid", lensSessionId) - .request(MediaType.APPLICATION_XML_TYPE) + .request(APPLICATION_XML_TYPE) .get(new GenericType<List<LensQuery>>(){}); Assert.assertNotNull(results); Assert.assertEquals(1, results.size());
