Repository: lens
Updated Branches:
  refs/heads/master 61ee6bfc8 -> 9ef7ce736


LENS-1345: Fixing deadlock in jdbc query status update flow


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/9ef7ce73
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/9ef7ce73
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/9ef7ce73

Branch: refs/heads/master
Commit: 9ef7ce73693d039ab5a197b4227c5c09c2efcaa4
Parents: 61ee6bf
Author: Rajat Khandelwal <[email protected]>
Authored: Mon Oct 10 14:20:31 2016 +0530
Committer: Rajat Khandelwal <[email protected]>
Committed: Mon Oct 10 14:20:31 2016 +0530

----------------------------------------------------------------------
 .../org/apache/lens/driver/jdbc/JDBCDriver.java | 58 ++++++++------------
 .../apache/lens/driver/jdbc/TestJdbcDriver.java |  3 +
 .../server/api/driver/AbstractLensDriver.java   |  4 ++
 .../server/api/driver/DriverQueryStatus.java    |  7 ++-
 .../lens/server/api/query/QueryContext.java     | 20 ++++---
 .../server/query/QueryExecutionServiceImpl.java |  5 +-
 .../lens/server/common/RestAPITestUtil.java     |  6 +-
 .../lens/server/query/TestQueryService.java     | 22 ++++++--
 8 files changed, 72 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java 
b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
index f805ec6..e41077c 100644
--- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
+++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
@@ -292,11 +292,11 @@ public class JDBCDriver extends AbstractLensDriver {
             if (queryContext.getLensContext().getDriverStatus().isCanceled()) {
               return result;
             }
-            
queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable);
-            
queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL);
             if (isResultAvailable) {
               result.resultSet = stmt.getResultSet();
             }
+            
queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable);
+            
queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL);
           } catch (Exception e) {
             if (queryContext.getLensContext().getDriverStatus().isCanceled()) {
               return result;
@@ -887,7 +887,6 @@ public class JDBCDriver extends AbstractLensDriver {
     queryContext.setPrepared(false);
     queryContext.setRewrittenQuery(rewrittenQuery);
     return new QueryCallable(queryContext, logSegregationContext).call();
-    // LOG.info("Execute " + context.getQueryHandle());
   }
 
   /**
@@ -930,51 +929,40 @@ public class JDBCDriver extends AbstractLensDriver {
       return;
     }
     if (ctx.getResultFuture().isCancelled()) {
-      context.getDriverStatus().setProgress(1.0);
-      context.getDriverStatus().setState(DriverQueryState.CANCELED);
-      context.getDriverStatus().setStatusMessage("Query Canceled");
+      if (!context.getDriverStatus().isCanceled()) {
+        context.getDriverStatus().setProgress(1.0);
+        context.getDriverStatus().setState(DriverQueryState.CANCELED);
+        context.getDriverStatus().setStatusMessage("Query Canceled");
+      }
     } else if (ctx.getResultFuture().isDone()) {
       context.getDriverStatus().setProgress(1.0);
       // Since future is already done, this call should not block
       if (ctx.getQueryResult() != null && ctx.getQueryResult().error != null) {
-        context.getDriverStatus().setState(DriverQueryState.FAILED);
-        context.getDriverStatus().setStatusMessage("Query execution failed!");
-        
context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage());
+        if (!context.getDriverStatus().isFailed()) {
+          context.getDriverStatus().setState(DriverQueryState.FAILED);
+          context.getDriverStatus().setStatusMessage("Query execution 
failed!");
+          
context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage());
+        }
       } else {
-        context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL);
-        context.getDriverStatus().setStatusMessage(context.getQueryHandle() + 
" successful");
-        context.getDriverStatus().setResultSetAvailable(true);
+        if (!context.getDriverStatus().isFinished()) {
+          // assuming successful
+          context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL);
+          context.getDriverStatus().setStatusMessage(context.getQueryHandle() 
+ " successful");
+          context.getDriverStatus().setResultSetAvailable(true);
+        }
       }
     } else {
-      context.getDriverStatus().setState(DriverQueryState.RUNNING);
-      context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " 
is running");
+      if (!context.getDriverStatus().isRunning()) {
+        context.getDriverStatus().setState(DriverQueryState.RUNNING);
+        context.getDriverStatus().setStatusMessage(context.getQueryHandle() + 
" is running");
+      }
     }
   }
 
   @Override
   protected LensResultSet createResultSet(QueryContext ctx) throws 
LensException {
     checkConfigured();
-    return getDriverResult(ctx);
-  }
-
-  private LensResultSet getDriverResult(QueryContext context) throws 
LensException {
-    JdbcQueryContext ctx = getQueryContext(context.getQueryHandle());
-    if (ctx.getLensContext().getDriverStatus().isCanceled()) {
-      throw new LensException("Result set not available for canceled query " + 
context.getQueryHandle());
-    }
-
-    Future<QueryResult> future = ctx.getResultFuture();
-    QueryHandle queryHandle = context.getQueryHandle();
-
-    try {
-      return future.get().getLensResultSet(true);
-    } catch (InterruptedException e) {
-      throw new LensException("Interrupted while getting resultset for query " 
+ queryHandle.getHandleId(), e);
-    } catch (ExecutionException e) {
-      throw new LensException("Error while executing query " + 
queryHandle.getHandleId() + " in background", e);
-    } catch (CancellationException e) {
-      throw new LensException("Query was already canceled " + 
queryHandle.getHandleId(), e);
-    }
+    return 
getQueryContext(ctx.getQueryHandle()).getQueryResult().getLensResultSet(true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
 
b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
index 6e9086f..2ad7f76 100644
--- 
a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
+++ 
b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
@@ -496,6 +496,9 @@ public class TestJdbcDriver {
     QueryContext context = createQueryContext(query, conf);
     context.setExecuteTimeoutMillis(executeTimeoutMillis);
     driver.executeAsync(context);
+    while (!context.getDriverStatus().isFinished()) {
+      Thread.sleep(1000);
+    }
     LensResultSet resultSet = driver.fetchResultSet(context);
     assertNotNull(resultSet);
 

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
index e498479..365a619 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
@@ -66,6 +66,10 @@ public abstract class AbstractLensDriver implements 
LensDriver {
   @Override
   public LensResultSet fetchResultSet(QueryContext ctx) throws LensException {
     log.info("FetchResultSet: {}", ctx.getQueryHandle());
+    if (!ctx.getDriverStatus().isSuccessful()) {
+      throw new LensException("Can't fetch results for a " + 
ctx.getQueryHandleString() + " because it's status is "
+        + ctx.getStatus());
+    }
     ctx.registerDriverResult(createResultSet(ctx)); // registerDriverResult 
makes sure registration happens ony once
     return ctx.getDriverResult();
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
index 033f677..fc24fc6 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java
@@ -218,5 +218,10 @@ public class DriverQueryStatus implements Serializable {
   public boolean isCanceled() {
     return state.equals(DriverQueryState.CANCELED);
   }
-
+  public boolean isFailed() {
+    return state.equals(DriverQueryState.FAILED);
+  }
+  public boolean isRunning() {
+    return state.equals(DriverQueryState.RUNNING);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
index b584c6a..d0662f4 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
@@ -23,10 +23,7 @@ import static 
org.apache.lens.server.api.LensConfConstants.DEFAULT_PREFETCH_INME
 import static 
org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET;
 import static 
org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Future;
 
 import org.apache.lens.api.LensConf;
@@ -203,7 +200,7 @@ public class QueryContext extends AbstractQueryContext {
   @Getter
   @Setter
   private transient Future queryLauncher;
-  private List<QueryDriverStatusUpdateListener> driverStatusUpdateListener = 
Lists.newArrayList();
+  private final List<QueryDriverStatusUpdateListener> 
driverStatusUpdateListeners = Lists.newArrayList();
 
   /**
    * Creates context from query
@@ -469,6 +466,9 @@ public class QueryContext extends AbstractQueryContext {
   public boolean successful() {
     return this.status.successful();
   }
+  public boolean executed() {
+    return this.status.executed();
+  }
 
   public boolean launched() {
     return this.status.launched();
@@ -558,8 +558,10 @@ public class QueryContext extends AbstractQueryContext {
       getDriverStatus().setStatusMessage("Query " + getQueryHandleString() + " 
" + state.name().toLowerCase());
     }
     getDriverStatus().setState(state);
-    for (QueryDriverStatusUpdateListener listener: 
this.driverStatusUpdateListener) {
-      listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus());
+    synchronized (this.driverStatusUpdateListeners) {
+      for (QueryDriverStatusUpdateListener listener : 
this.driverStatusUpdateListeners) {
+        listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus());
+      }
     }
   }
 
@@ -573,6 +575,8 @@ public class QueryContext extends AbstractQueryContext {
 
 
   public void registerStatusUpdateListener(QueryDriverStatusUpdateListener 
driverStatusUpdateListener) {
-    this.driverStatusUpdateListener.add(driverStatusUpdateListener);
+    synchronized (this.driverStatusUpdateListeners) {
+      this.driverStatusUpdateListeners.add(driverStatusUpdateListener);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 87d7cb0..cb5961f 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -582,12 +582,11 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
           driverRS = ctx.getSelectedDriver().fetchResultSet(getCtx());
         } catch (Exception e) {
           log.error(
-              "Error while getting result set form driver {}. Driver result 
set based purging logic will be ignored",
-              ctx.getSelectedDriver(), e);
+            "Error while getting result set form driver {}. Driver result set 
based purging logic will be ignored",
+            ctx.getSelectedDriver(), e);
         }
       }
     }
-
     public boolean canBePurged() {
       try {
         if (getCtx().getStatus().getStatus().equals(SUCCESSFUL) && 
getCtx().getStatus().isResultSetAvailable()) {

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java 
b/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
index 57786e6..02e2f8b 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/common/RestAPITestUtil.java
@@ -249,9 +249,13 @@ public class RestAPITestUtil {
 
   public static PersistentQueryResult getLensQueryResult(final WebTarget 
target,
     final LensSessionHandle lensSessionHandle, final QueryHandle handle, 
MediaType mt) throws InterruptedException {
+    return getLensQueryResult(target, lensSessionHandle, handle, 
PersistentQueryResult.class, mt);
+  }
+  public static <T> T getLensQueryResult(final WebTarget target, final 
LensSessionHandle lensSessionHandle,
+    final QueryHandle handle, Class<T> clazz, MediaType mt) throws 
InterruptedException {
     waitForQueryToFinish(target, lensSessionHandle, handle, 
QueryStatus.Status.SUCCESSFUL, mt);
     return 
target.path("queryapi/queries").path(handle.toString()).path("resultset")
-      .queryParam("sessionid", 
lensSessionHandle).request(mt).get(PersistentQueryResult.class);
+      .queryParam("sessionid", lensSessionHandle).request(mt).get(clazz);
   }
 
   public static Response getLensQueryHttpResult(final WebTarget target, final 
LensSessionHandle lensSessionHandle,

http://git-wip-us.apache.org/repos/asf/lens/blob/9ef7ce73/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index 3f71aef..440c30b 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -66,6 +66,7 @@ import org.apache.lens.server.api.query.QueryContext;
 import org.apache.lens.server.api.query.QueryExecutionService;
 import org.apache.lens.server.api.session.SessionService;
 import org.apache.lens.server.common.ErrorResponseExpectedData;
+import org.apache.lens.server.common.RestAPITestUtil;
 import org.apache.lens.server.common.TestDataUtils;
 import org.apache.lens.server.common.TestResourceFile;
 import org.apache.lens.server.error.GenericExceptionMapper;
@@ -1426,6 +1427,17 @@ public class TestQueryService extends LensJerseyTest {
     };
   }
 
+  @Test
+  public void testExecuteAsyncJDBCQuery() throws InterruptedException {
+    String query = "select ID, IDSTR from " + TEST_JDBC_TABLE;
+    QueryHandle handle = RestAPITestUtil.executeAndGetHandle(target(), 
Optional.of(lensSessionId), Optional.of(query),
+      Optional.of(getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, 
false)), APPLICATION_XML_TYPE);
+    // fetch results so that it can be purged
+    InMemoryQueryResult queryResult = 
RestAPITestUtil.getLensQueryResult(target(), lensSessionId, handle,
+      InMemoryQueryResult.class, APPLICATION_XML_TYPE);
+    assertEquals(queryResult.getRows().size(), 5);
+  }
+
   /**
    * @param timeOutMillis : wait time for execute with timeout api
    * @param preFetchRows : number of rows to pre-fetch in case of 
InMemoryResultSet
@@ -1457,7 +1469,7 @@ public class TestQueryService extends LensJerseyTest {
     conf.addProperty("deferPersistenceByMillis", deferPersistenceByMillis); // 
property used for test only
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(),
 conf,
         APPLICATION_XML_TYPE));
-    QueryHandleWithResultSet result =target.request(APPLICATION_XML_TYPE)
+    QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE)
             .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE),
                 new GenericType<LensAPIResult<QueryHandleWithResultSet>>() 
{}).getData();
     QueryHandle handle = result.getQueryHandle();
@@ -1945,16 +1957,16 @@ public class TestQueryService extends LensJerseyTest {
     WebTarget target = target().path("queryapi/queries");
     final FormDataMultiPart mp = new FormDataMultiPart();
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), 
lensSessionId,
-      MediaType.APPLICATION_XML_TYPE));
+      APPLICATION_XML_TYPE));
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, 
IDSTR from "
       + TEST_TABLE));
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("operation").build(), 
"execute_with_timeout"));
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), 
"300000"));
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("queryName").build(), 
queryName.toString()));
     mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(),
 new LensConf(),
-      MediaType.APPLICATION_XML_TYPE));
+      APPLICATION_XML_TYPE));
 
-    QueryHandleWithResultSet result = 
target.request(MediaType.APPLICATION_XML_TYPE)
+    QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE)
       .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE),
         new GenericType<LensAPIResult<QueryHandleWithResultSet>>() 
{}).getData();
     assertNotNull(result.getQueryHandle());
@@ -1963,7 +1975,7 @@ public class TestQueryService extends LensJerseyTest {
     target = target().path("queryapi/queries/detail");
     List<LensQuery> results = target.queryParam("queryName", queryName)
       .queryParam("sessionid", lensSessionId)
-      .request(MediaType.APPLICATION_XML_TYPE)
+      .request(APPLICATION_XML_TYPE)
       .get(new GenericType<List<LensQuery>>(){});
     Assert.assertNotNull(results);
     Assert.assertEquals(1, results.size());

Reply via email to