Repository: lens
Updated Branches:
  refs/heads/master c7a39bc93 -> bed8e9a83


Execute with timeout fails to read metadata when user requests streamed results


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/bed8e9a8
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/bed8e9a8
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/bed8e9a8

Branch: refs/heads/master
Commit: bed8e9a83cbdd35b1878a4ba3b83cfcf8d871ae9
Parents: c7a39bc
Author: Puneet Gupta <puneet.k.gupta@gmaillcom>
Authored: Fri Jun 3 18:04:39 2016 +0530
Committer: Rajat Khandelwal <[email protected]>
Committed: Fri Jun 3 18:04:39 2016 +0530

----------------------------------------------------------------------
 .../server/api/driver/AbstractLensDriver.java   |  6 +-
 .../PartiallyFetchedInMemoryResultSet.java      |  9 ++-
 .../lens/server/api/query/QueryContext.java     |  5 ++
 .../server/query/QueryExecutionServiceImpl.java | 37 +++++++----
 .../lens/server/query/TestQueryService.java     | 69 ++++++++++++++++----
 .../drivers/jdbc/jdbc1/jdbcdriver-site.xml      |  4 +-
 6 files changed, 95 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
index 959a5b2..f1d844a 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
@@ -65,11 +65,7 @@ public abstract class AbstractLensDriver implements 
LensDriver {
   @Override
   public LensResultSet fetchResultSet(QueryContext ctx) throws LensException {
     log.info("FetchResultSet: {}", ctx.getQueryHandle());
-    synchronized (ctx) {
-      if (!ctx.isDriverResultRegistered()) {
-        ctx.registerDriverResult(createResultSet(ctx));
-      }
-    }
+    ctx.registerDriverResult(createResultSet(ctx)); // registerDriverResult 
makes sure registration happens ony once
     return ctx.getDriverResult();
   }
 

http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
index 59918bd..0b136e2 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
@@ -75,6 +75,12 @@ public class PartiallyFetchedInMemoryResultSet extends 
InMemoryResultSet {
   private boolean preFetchedRowsConsumed;
 
   /**
+   * This is the metadata that is returned by underlying resultset. We cache 
it to make sure its available even
+   * after the underlying resultset has been closed.
+   */
+  private LensResultSetMetadata cachedResultSetMetadata;
+
+  /**
    * Constructor
    * @param inMemoryRS : Underlying in-memory result set
    * @param reqPreFetchSize : requested number of rows to be pre-fetched and 
cached.
@@ -85,6 +91,7 @@ public class PartiallyFetchedInMemoryResultSet extends 
InMemoryResultSet {
     if (reqPreFetchSize <= 0) {
       throw new IllegalArgumentException("Invalid pre fetch size " + 
reqPreFetchSize);
     }
+    cachedResultSetMetadata = inMemoryRS.getMetadata();
     preFetchRows(reqPreFetchSize);
     log.info("Pre-Fetched {} rows of result and isComplteleyFetched = {} and 
doNotPurgeUntilTimeMillis ={}",
         numOfPreFetchedRows, isComplteleyFetched);
@@ -151,7 +158,7 @@ public class PartiallyFetchedInMemoryResultSet extends 
InMemoryResultSet {
 
   @Override
   public LensResultSetMetadata getMetadata() throws LensException {
-    return inMemoryRS.getMetadata();
+    return cachedResultSetMetadata;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
index 379d532..aebb395 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
@@ -485,7 +485,9 @@ public class QueryContext extends AbstractQueryContext {
     if (isDriverResultRegistered) {
       return; //already registered
     }
+    log.info("Registering driver resultset for query {}", 
getQueryHandleString());
     this.isDriverResultRegistered = true;
+
     /*
      *  Check if results needs to be streamed to client in which case driver 
result needs to be wrapped in
      *  PartiallyFetchedInMemoryResultSet
@@ -506,6 +508,9 @@ public class QueryContext extends AbstractQueryContext {
         if (System.currentTimeMillis() < executeTimeOutTime) {
           this.driverResult = new 
PartiallyFetchedInMemoryResultSet((InMemoryResultSet) result, rowsToPreFetch);
           return;
+        } else {
+          log.info("Skipping creation of PartiallyFetchedInMemoryResultSet as 
the query {} has already timed out",
+            getQueryHandleString());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 1b3a7c0..8fe02aa 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -911,10 +911,10 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     QueryContext ctx = allQueries.get(handle);
     if (ctx != null) {
       logSegregationContext.setLogSegragationAndQueryId(ctx.getLogHandle());
+      log.info("Updating status for {}", ctx.getQueryHandle());
       synchronized (ctx) {
         QueryStatus before = ctx.getStatus();
         if (!ctx.queued() && !ctx.finished() && 
!ctx.getDriverStatus().isFinished()) {
-          log.debug("Updating status for {}", ctx.getQueryHandle());
           try {
             ctx.updateDriverStatus(statusUpdateRetryHandler);
           } catch (LensException exc) {
@@ -2136,7 +2136,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   private QueryHandleWithResultSet executeTimeoutInternal(LensSessionHandle 
sessionHandle, QueryContext ctx,
     long timeoutMillis, Configuration conf) throws LensException {
     QueryHandle handle = submitQuery(ctx);
-    long timeOutTime = System.currentTimeMillis() + timeoutMillis;
+    long timeOutTime = ctx.getSubmissionTime() + timeoutMillis;
     QueryHandleWithResultSet result = new QueryHandleWithResultSet(handle);
 
     boolean isQueued = true;
@@ -2161,22 +2161,32 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
 
     QueryCompletionListenerImpl listener = new 
QueryCompletionListenerImpl(handle);
-    long waitTime = timeOutTime - System.currentTimeMillis();
-    if (waitTime > 0 && !queryCtx.getStatus().finished()) {
-      synchronized (queryCtx) {
-        queryCtx.getSelectedDriver().registerForCompletionNotification(handle, 
waitTime, listener);
-        try {
-          synchronized (listener) {
-            listener.wait(waitTime);
+    long totalWaitTime = timeOutTime - System.currentTimeMillis();
+
+    if (totalWaitTime > 0 && !queryCtx.getStatus().executed() && 
!queryCtx.getStatus().finished()) {
+      log.info("Registering for query {} completion notification", 
ctx.getQueryHandleString());
+      queryCtx.getSelectedDriver().registerForCompletionNotification(handle, 
totalWaitTime, listener);
+      try {
+        // We will wait for a few millis at a time until we reach max required 
wait time and also check the state
+        // each time we come out of the wait.
+        // This is done because the registerForCompletionNotification and 
query execution completion can happen
+        // parallely especailly in case of drivers like JDBC and in that case 
completion notification may not be
+        //  received by this listener. So its better to break the wait into 
smaller ones.
+        long waitMillisPerCheck = totalWaitTime/10;
+        waitMillisPerCheck = (waitMillisPerCheck > 500) ? 500 : 
waitMillisPerCheck; // Lets keep max as 500
+        long totalWaitMillisSoFar = 0;
+        synchronized (listener) {
+          while (totalWaitMillisSoFar < totalWaitTime
+            && !queryCtx.getStatus().executed() && 
!queryCtx.getStatus().finished()) {
+            listener.wait(waitMillisPerCheck);
+            totalWaitMillisSoFar += waitMillisPerCheck;
           }
-        } catch (InterruptedException e) {
-          log.info("Waiting thread interrupted");
         }
+      } catch (InterruptedException e) {
+        log.info("{} query completion notification wait interrupted", 
queryCtx.getQueryHandleString());
       }
     }
 
-
-
     // At this stage (since the listener waits only for driver completion and 
not server that may include result
     // formatting and persistence) the query status can be RUNNING or EXECUTED 
or FAILED or SUCCESSFUL
     LensResultSet resultSet = null;
@@ -2385,6 +2395,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
   }
 
+
   private boolean cancelQuery(@NonNull QueryHandle queryHandle) throws 
LensException {
     QueryContext ctx =  allQueries.get(queryHandle);
     if (ctx == null) {

http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index b6ec422..fdc8fe0 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -30,6 +30,7 @@ import static org.apache.lens.server.common.RestAPITestUtil.*;
 import static org.testng.Assert.*;
 
 import java.io.*;
+import java.sql.*;
 import java.util.*;
 
 import javax.ws.rs.NotFoundException;
@@ -128,13 +129,35 @@ public class TestQueryService extends LensJerseyTest {
     metricsSvc = LensServices.get().getService(MetricsService.NAME);
     Map<String, String> sessionconf = new HashMap<>();
     sessionconf.put("test.session.key", "svalue");
-    lensSessionId = queryService.openSession("foo@localhost", "bar", 
sessionconf); // @localhost should be removed
-    // automatically
+    // @localhost should be removed automatically
+    lensSessionId = queryService.openSession("foo@localhost", "bar", 
sessionconf);
+
+    //Create Hive table and load data
     createTable(TEST_TABLE);
     loadData(TEST_TABLE, TestResourceFile.TEST_DATA2_FILE.getValue());
+
+    //Create HSQLDB table and load data
+    createHSQLTableAndLoadData();
+  }
+
+  private void createHSQLTableAndLoadData() throws SQLException {
+    Connection conn = 
DriverManager.getConnection("jdbc:hsqldb:mem:jdbcTestDB;MODE=MYSQL", "sa", "");
+    String createTableCmd = "create table " + TEST_JDBC_TABLE + " (ID integer, 
IDSTR varchar(10))";
+    String loadTableCmd = "Insert into " + TEST_JDBC_TABLE + " values "
+      + "(1, 'one'), (NULL, 'two'), (3, NULL), (NULL, NULL), (5, '')";
+    Statement statement = conn.createStatement();
+    int result = statement.executeUpdate(createTableCmd);
+    System.out.print(result);
+    conn.commit();
+    result = statement.executeUpdate(loadTableCmd);
+    System.out.print(result);
+    statement.close();
+    conn.commit();
+    conn.close();
   }
 
   /*
+  /*
    * (non-Javadoc)
    *
    * @see org.glassfish.jersey.test.JerseyTest#tearDown()
@@ -166,6 +189,8 @@ public class TestQueryService extends LensJerseyTest {
   /** The test table. */
   public static final String TEST_TABLE = "TEST_TABLE";
 
+  public static final String TEST_JDBC_TABLE = "TEST_JDBC_TABLE";
+
   /**
    * Creates the table.
    *
@@ -1336,12 +1361,24 @@ public class TestQueryService extends LensJerseyTest {
    */
   @DataProvider
   public Object[][] executeWithTimeoutAndPreFetechAndServerPersistenceDP() {
-    //Columns: timeOutMillis, preFetchRows, isStreamingResultAvailable, 
deferPersistenceByMillis
+    String query5RowsHive = "select ID, IDSTR from " + TEST_TABLE;
+    String query0RowsHive = "select ID, IDSTR from " + TEST_TABLE + " where 
ID=99";
+
+    String query5RowsJdbc = "select ID, IDSTR from " + TEST_JDBC_TABLE;
+    String query0RowsJdbc = "select ID, IDSTR from " + TEST_JDBC_TABLE + " 
where ID=99";
+
+    //Columns: timeOutMillis, preFetchRows, isStreamingResultAvailable, 
deferPersistenceByMillis,query,rows in result
     return new Object[][] {
-      {30000, 5, true, 0}, //result has 5 rows & all 5 rows are requested to 
be pre-fetched
-      {30000, 10, true, 6000}, //result has 5 rows & 10 rows are requested to 
be pre-fetched.
-      {30000, 2, false, 4000}, //result has 5 rows & 2 rows are requested to 
be pre-fetched. Will not stream
-      {10, 5, false, 0}, //result has 5 rows & 5 rows requested. Timeout is 
less (10ms). Will not stream
+      {30000, 5, true, 0, query5RowsHive, 5}, //All 5 rows are requested to be 
pre-fetched
+      {30000, 10, true, 6000, query5RowsHive, 5}, //10 rows are requested to 
be pre-fetched.
+      {30000, 2, false, 4000, query5RowsHive, 5}, //2 rows are requested to be 
pre-fetched. Will not stream
+      {10, 5, false, 0, query5RowsHive, 5}, //5 rows requested. Timeout is 
less (10ms). Will not stream
+      {30000, 5, true, 0, query0RowsHive, 0}, //Result has no rows
+      {30000, 5, true, 0, query5RowsJdbc, 5}, //All 5 rows are requested to be 
pre-fetched
+      {30000, 10, true, 6000, query5RowsJdbc, 5}, //10 rows are requested to 
be pre-fetched.
+      {30000, 2, false, 4000, query5RowsJdbc, 5}, //2 rows are requested to be 
pre-fetched. Will not stream
+      {10, 5, false, 0, query5RowsJdbc, 5}, //5 rows requested. Timeout is 
less (10ms). Will not stream
+      {30000, 5, true, 0, query0RowsJdbc, 0}, //Result has no rows
     };
   }
 
@@ -1355,14 +1392,14 @@ public class TestQueryService extends LensJerseyTest {
    */
   @Test(dataProvider = "executeWithTimeoutAndPreFetechAndServerPersistenceDP")
   public void testExecuteWithTimeoutAndPreFetchAndServerPersistence(long 
timeOutMillis, int preFetchRows,
-      boolean isStreamingResultAvailable, long deferPersistenceByMillis) 
throws Exception {
+      boolean isStreamingResultAvailable, long deferPersistenceByMillis, 
String query, int rowsInResult)
+    throws Exception {
     final WebTarget target = target().path("queryapi/queries");
 
     final FormDataMultiPart mp = new FormDataMultiPart();
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), 
lensSessionId,
         APPLICATION_XML_TYPE));
-    mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, 
IDSTR from "
-        + TEST_TABLE));
+    mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("query").build(), query));
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("operation").build(), 
"execute_with_timeout"));
     // Set a timeout value enough for tests
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), 
timeOutMillis + ""));
@@ -1390,11 +1427,15 @@ public class TestQueryService extends LensJerseyTest {
           "Check if timeoutmillis need to be increased based on query status " 
+ result.getStatus());
       assertEquals(result.getResultMetadata().getColumns().size(), 2);
       assertNotNull(result.getResult());
-      validateInmemoryResult((InMemoryQueryResult) result.getResult());
-    } else if (timeOutMillis > 20000) { // timeout is sufficient for query to 
finish
-      assertTrue(result.getResult() instanceof PersistentQueryResult);
+      if (rowsInResult > 0) {
+        validateInmemoryResult((InMemoryQueryResult) result.getResult());
+      } else {
+        assertEquals(((InMemoryQueryResult) 
result.getResult()).getRows().size(), 0);
+      }
     } else {
-      assertNull(result.getResult()); // Query execution not finished yet
+      // IF timeout is sufficient for query to finish , we should receive 
PersistentQueryResult
+      // Else we will get null result
+      assertTrue(result.getResult()==null || result.getResult() instanceof 
PersistentQueryResult);
     }
 
     waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, 
APPLICATION_XML_TYPE);

http://git-wip-us.apache.org/repos/asf/lens/blob/bed8e9a8/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml 
b/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml
index 9ed0c87..5b8b43f 100644
--- a/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml
+++ b/lens-server/src/test/resources/drivers/jdbc/jdbc1/jdbcdriver-site.xml
@@ -28,11 +28,11 @@
   </property>
   <property>
     <name>lens.driver.jdbc.db.uri</name>
-    <value>jdbc:hsqldb:./target/db-storage.db;MODE=MYSQL</value>
+    <value>jdbc:hsqldb:mem:jdbcTestDB;MODE=MYSQL</value>
   </property>
   <property>
     <name>lens.driver.jdbc.db.user</name>
-    <value>SA</value>
+    <value>sa</value>
   </property>
   <property>
     <name>lens.cube.query.driver.supported.storages</name>

Reply via email to