Repository: lens Updated Branches: refs/heads/master c7a39bc93 -> bed8e9a83
Execute with timeout fails to read metadata when user requests streamed results Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/bed8e9a8 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/bed8e9a8 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/bed8e9a8 Branch: refs/heads/master Commit: bed8e9a83cbdd35b1878a4ba3b83cfcf8d871ae9 Parents: c7a39bc Author: Puneet Gupta <puneet.k.gupta@gmaillcom> Authored: Fri Jun 3 18:04:39 2016 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Fri Jun 3 18:04:39 2016 +0530 ---------------------------------------------------------------------- .../server/api/driver/AbstractLensDriver.java | 6 +- .../PartiallyFetchedInMemoryResultSet.java | 9 ++- .../lens/server/api/query/QueryContext.java | 5 ++ .../server/query/QueryExecutionServiceImpl.java | 37 +++++++---- .../lens/server/query/TestQueryService.java | 69 ++++++++++++++++---- .../drivers/jdbc/jdbc1/jdbcdriver-site.xml | 4 +- 6 files changed, 95 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java index 959a5b2..f1d844a 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java @@ -65,11 +65,7 @@ public abstract class AbstractLensDriver implements LensDriver { @Override public LensResultSet fetchResultSet(QueryContext ctx) throws LensException { log.info("FetchResultSet: {}", ctx.getQueryHandle()); - synchronized (ctx) { - if (!ctx.isDriverResultRegistered()) { - ctx.registerDriverResult(createResultSet(ctx)); - } - } + ctx.registerDriverResult(createResultSet(ctx)); // registerDriverResult makes sure registration happens ony once return ctx.getDriverResult(); } http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java index 59918bd..0b136e2 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java @@ -75,6 +75,12 @@ public class PartiallyFetchedInMemoryResultSet extends InMemoryResultSet { private boolean preFetchedRowsConsumed; /** + * This is the metadata that is returned by underlying resultset. We cache it to make sure its available even + * after the underlying resultset has been closed. + */ + private LensResultSetMetadata cachedResultSetMetadata; + + /** * Constructor * @param inMemoryRS : Underlying in-memory result set * @param reqPreFetchSize : requested number of rows to be pre-fetched and cached. @@ -85,6 +91,7 @@ public class PartiallyFetchedInMemoryResultSet extends InMemoryResultSet { if (reqPreFetchSize <= 0) { throw new IllegalArgumentException("Invalid pre fetch size " + reqPreFetchSize); } + cachedResultSetMetadata = inMemoryRS.getMetadata(); preFetchRows(reqPreFetchSize); log.info("Pre-Fetched {} rows of result and isComplteleyFetched = {} and doNotPurgeUntilTimeMillis ={}", numOfPreFetchedRows, isComplteleyFetched); @@ -151,7 +158,7 @@ public class PartiallyFetchedInMemoryResultSet extends InMemoryResultSet { @Override public LensResultSetMetadata getMetadata() throws LensException { - return inMemoryRS.getMetadata(); + return cachedResultSetMetadata; } @Override http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index 379d532..aebb395 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -485,7 +485,9 @@ public class QueryContext extends AbstractQueryContext { if (isDriverResultRegistered) { return; //already registered } + log.info("Registering driver resultset for query {}", getQueryHandleString()); this.isDriverResultRegistered = true; + /* * Check if results needs to be streamed to client in which case driver result needs to be wrapped in * PartiallyFetchedInMemoryResultSet @@ -506,6 +508,9 @@ public class QueryContext extends AbstractQueryContext { if (System.currentTimeMillis() < executeTimeOutTime) { this.driverResult = new PartiallyFetchedInMemoryResultSet((InMemoryResultSet) result, rowsToPreFetch); return; + } else { + log.info("Skipping creation of PartiallyFetchedInMemoryResultSet as the query {} has already timed out", + getQueryHandleString()); } } } http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 1b3a7c0..8fe02aa 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -911,10 +911,10 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE QueryContext ctx = allQueries.get(handle); if (ctx != null) { logSegregationContext.setLogSegragationAndQueryId(ctx.getLogHandle()); + log.info("Updating status for {}", ctx.getQueryHandle()); synchronized (ctx) { QueryStatus before = ctx.getStatus(); if (!ctx.queued() && !ctx.finished() && !ctx.getDriverStatus().isFinished()) { - log.debug("Updating status for {}", ctx.getQueryHandle()); try { ctx.updateDriverStatus(statusUpdateRetryHandler); } catch (LensException exc) { @@ -2136,7 +2136,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE private QueryHandleWithResultSet executeTimeoutInternal(LensSessionHandle sessionHandle, QueryContext ctx, long timeoutMillis, Configuration conf) throws LensException { QueryHandle handle = submitQuery(ctx); - long timeOutTime = System.currentTimeMillis() + timeoutMillis; + long timeOutTime = ctx.getSubmissionTime() + timeoutMillis; QueryHandleWithResultSet result = new QueryHandleWithResultSet(handle); boolean isQueued = true; @@ -2161,22 +2161,32 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } QueryCompletionListenerImpl listener = new QueryCompletionListenerImpl(handle); - long waitTime = timeOutTime - System.currentTimeMillis(); - if (waitTime > 0 && !queryCtx.getStatus().finished()) { - synchronized (queryCtx) { - queryCtx.getSelectedDriver().registerForCompletionNotification(handle, waitTime, listener); - try { - synchronized (listener) { - listener.wait(waitTime); + long totalWaitTime = timeOutTime - System.currentTimeMillis(); + + if (totalWaitTime > 0 && !queryCtx.getStatus().executed() && !queryCtx.getStatus().finished()) { + log.info("Registering for query {} completion notification", ctx.getQueryHandleString()); + queryCtx.getSelectedDriver().registerForCompletionNotification(handle, totalWaitTime, listener); + try { + // We will wait for a few millis at a time until we reach max required wait time and also check the state + // each time we come out of the wait. + // This is done because the registerForCompletionNotification and query execution completion can happen + // parallely especailly in case of drivers like JDBC and in that case completion notification may not be + // received by this listener. So its better to break the wait into smaller ones. + long waitMillisPerCheck = totalWaitTime/10; + waitMillisPerCheck = (waitMillisPerCheck > 500) ? 500 : waitMillisPerCheck; // Lets keep max as 500 + long totalWaitMillisSoFar = 0; + synchronized (listener) { + while (totalWaitMillisSoFar < totalWaitTime + && !queryCtx.getStatus().executed() && !queryCtx.getStatus().finished()) { + listener.wait(waitMillisPerCheck); + totalWaitMillisSoFar += waitMillisPerCheck; } - } catch (InterruptedException e) { - log.info("Waiting thread interrupted"); } + } catch (InterruptedException e) { + log.info("{} query completion notification wait interrupted", queryCtx.getQueryHandleString()); } } - - // At this stage (since the listener waits only for driver completion and not server that may include result // formatting and persistence) the query status can be RUNNING or EXECUTED or FAILED or SUCCESSFUL LensResultSet resultSet = null; @@ -2385,6 +2395,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } } + private boolean cancelQuery(@NonNull QueryHandle queryHandle) throws LensException { QueryContext ctx = allQueries.get(queryHandle); if (ctx == null) { http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java index b6ec422..fdc8fe0 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java @@ -30,6 +30,7 @@ import static org.apache.lens.server.common.RestAPITestUtil.*; import static org.testng.Assert.*; import java.io.*; +import java.sql.*; import java.util.*; import javax.ws.rs.NotFoundException; @@ -128,13 +129,35 @@ public class TestQueryService extends LensJerseyTest { metricsSvc = LensServices.get().getService(MetricsService.NAME); Map<String, String> sessionconf = new HashMap<>(); sessionconf.put("test.session.key", "svalue"); - lensSessionId = queryService.openSession("foo@localhost", "bar", sessionconf); // @localhost should be removed - // automatically + // @localhost should be removed automatically + lensSessionId = queryService.openSession("foo@localhost", "bar", sessionconf); + + //Create Hive table and load data createTable(TEST_TABLE); loadData(TEST_TABLE, TestResourceFile.TEST_DATA2_FILE.getValue()); + + //Create HSQLDB table and load data + createHSQLTableAndLoadData(); + } + + private void createHSQLTableAndLoadData() throws SQLException { + Connection conn = DriverManager.getConnection("jdbc:hsqldb:mem:jdbcTestDB;MODE=MYSQL", "sa", ""); + String createTableCmd = "create table " + TEST_JDBC_TABLE + " (ID integer, IDSTR varchar(10))"; + String loadTableCmd = "Insert into " + TEST_JDBC_TABLE + " values " + + "(1, 'one'), (NULL, 'two'), (3, NULL), (NULL, NULL), (5, '')"; + Statement statement = conn.createStatement(); + int result = statement.executeUpdate(createTableCmd); + System.out.print(result); + conn.commit(); + result = statement.executeUpdate(loadTableCmd); + System.out.print(result); + statement.close(); + conn.commit(); + conn.close(); } /* + /* * (non-Javadoc) * * @see org.glassfish.jersey.test.JerseyTest#tearDown() @@ -166,6 +189,8 @@ public class TestQueryService extends LensJerseyTest { /** The test table. */ public static final String TEST_TABLE = "TEST_TABLE"; + public static final String TEST_JDBC_TABLE = "TEST_JDBC_TABLE"; + /** * Creates the table. * @@ -1336,12 +1361,24 @@ public class TestQueryService extends LensJerseyTest { */ @DataProvider public Object[][] executeWithTimeoutAndPreFetechAndServerPersistenceDP() { - //Columns: timeOutMillis, preFetchRows, isStreamingResultAvailable, deferPersistenceByMillis + String query5RowsHive = "select ID, IDSTR from " + TEST_TABLE; + String query0RowsHive = "select ID, IDSTR from " + TEST_TABLE + " where ID=99"; + + String query5RowsJdbc = "select ID, IDSTR from " + TEST_JDBC_TABLE; + String query0RowsJdbc = "select ID, IDSTR from " + TEST_JDBC_TABLE + " where ID=99"; + + //Columns: timeOutMillis, preFetchRows, isStreamingResultAvailable, deferPersistenceByMillis,query,rows in result return new Object[][] { - {30000, 5, true, 0}, //result has 5 rows & all 5 rows are requested to be pre-fetched - {30000, 10, true, 6000}, //result has 5 rows & 10 rows are requested to be pre-fetched. - {30000, 2, false, 4000}, //result has 5 rows & 2 rows are requested to be pre-fetched. Will not stream - {10, 5, false, 0}, //result has 5 rows & 5 rows requested. Timeout is less (10ms). Will not stream + {30000, 5, true, 0, query5RowsHive, 5}, //All 5 rows are requested to be pre-fetched + {30000, 10, true, 6000, query5RowsHive, 5}, //10 rows are requested to be pre-fetched. + {30000, 2, false, 4000, query5RowsHive, 5}, //2 rows are requested to be pre-fetched. Will not stream + {10, 5, false, 0, query5RowsHive, 5}, //5 rows requested. Timeout is less (10ms). Will not stream + {30000, 5, true, 0, query0RowsHive, 0}, //Result has no rows + {30000, 5, true, 0, query5RowsJdbc, 5}, //All 5 rows are requested to be pre-fetched + {30000, 10, true, 6000, query5RowsJdbc, 5}, //10 rows are requested to be pre-fetched. + {30000, 2, false, 4000, query5RowsJdbc, 5}, //2 rows are requested to be pre-fetched. Will not stream + {10, 5, false, 0, query5RowsJdbc, 5}, //5 rows requested. Timeout is less (10ms). Will not stream + {30000, 5, true, 0, query0RowsJdbc, 0}, //Result has no rows }; } @@ -1355,14 +1392,14 @@ public class TestQueryService extends LensJerseyTest { */ @Test(dataProvider = "executeWithTimeoutAndPreFetechAndServerPersistenceDP") public void testExecuteWithTimeoutAndPreFetchAndServerPersistence(long timeOutMillis, int preFetchRows, - boolean isStreamingResultAvailable, long deferPersistenceByMillis) throws Exception { + boolean isStreamingResultAvailable, long deferPersistenceByMillis, String query, int rowsInResult) + throws Exception { final WebTarget target = target().path("queryapi/queries"); final FormDataMultiPart mp = new FormDataMultiPart(); mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, APPLICATION_XML_TYPE)); - mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " - + TEST_TABLE)); + mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), query)); mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout")); // Set a timeout value enough for tests mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), timeOutMillis + "")); @@ -1390,11 +1427,15 @@ public class TestQueryService extends LensJerseyTest { "Check if timeoutmillis need to be increased based on query status " + result.getStatus()); assertEquals(result.getResultMetadata().getColumns().size(), 2); assertNotNull(result.getResult()); - validateInmemoryResult((InMemoryQueryResult) result.getResult()); - } else if (timeOutMillis > 20000) { // timeout is sufficient for query to finish - assertTrue(result.getResult() instanceof PersistentQueryResult); + if (rowsInResult > 0) { + validateInmemoryResult((InMemoryQueryResult) result.getResult()); + } else { + assertEquals(((InMemoryQueryResult) result.getResult()).getRows().size(), 0); + } } else { - assertNull(result.getResult()); // Query execution not finished yet + // IF timeout is sufficient for query to finish , we should receive PersistentQueryResult + // Else we will get null result + assertTrue(result.getResult()==null || result.getResult() instanceof PersistentQueryResult); } waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, APPLICATION_XML_TYPE); http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml ---------------------------------------------------------------------- diff --git a/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml b/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml index 9ed0c87..5b8b43f 100644 --- a/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml +++ b/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml @@ -28,11 +28,11 @@ </property> <property> <name>lens.driver.jdbc.db.uri</name> - <value>jdbc:hsqldb:./target/db-storage.db;MODE=MYSQL</value> + <value>jdbc:hsqldb:mem:jdbcTestDB;MODE=MYSQL</value> </property> <property> <name>lens.driver.jdbc.db.user</name> - <value>SA</value> + <value>sa</value> </property> <property> <name>lens.cube.query.driver.supported.storages</name>
