Repository: lens Updated Branches: refs/heads/master abf44abab -> c2100fa2d
LENS-1243: Support Asynchronous status updates from drivers Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/c2100fa2 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/c2100fa2 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/c2100fa2 Branch: refs/heads/master Commit: c2100fa2d7f98ed79762ff0679cdb257a12b060e Parents: abf44ab Author: Rajat Khandelwal <[email protected]> Authored: Thu Aug 25 13:09:17 2016 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Thu Aug 25 13:09:17 2016 +0530 ---------------------------------------------------------------------- .../org/apache/lens/driver/es/ESDriver.java | 34 +--- .../org/apache/lens/driver/hive/HiveDriver.java | 98 ++++++----- .../lens/driver/hive/TestRemoteHiveDriver.java | 11 +- .../src/test/resources/hive-site.xml | 5 + .../org/apache/lens/driver/jdbc/JDBCDriver.java | 169 ++++++------------- .../driver/jdbc/TestColumnarSQLRewriter.java | 22 ++- .../apache/lens/driver/jdbc/TestJdbcDriver.java | 20 +-- .../src/test/resources/hive-site.xml | 5 + .../server/api/driver/AbstractLensDriver.java | 11 ++ .../server/api/driver/DriverQueryStatus.java | 43 ++--- .../lens/server/api/driver/LensDriver.java | 17 +- .../api/driver/QueryCompletionListener.java | 23 ++- .../driver/QueryDriverStatusUpdateListener.java | 28 +++ .../server/api/driver/StatusUpdateMethod.java | 31 ++++ .../lens/server/api/query/QueryContext.java | 67 ++++++-- .../lens/server/api/driver/MockDriver.java | 7 +- .../org/apache/lens/server/LensServerConf.java | 5 +- .../server/query/QueryExecutionServiceImpl.java | 72 ++++---- .../server/metastore/TestMetastoreService.java | 5 +- lens-server/src/test/resources/hive-site.xml | 5 + 20 files changed, 377 insertions(+), 301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java index 8a4f410..fceabea 100644 --- a/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java +++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/ESDriver.java @@ -86,7 +86,7 @@ public class ESDriver extends AbstractLensDriver { */ private final Map<String, ESQuery> rewrittenQueriesCache = Maps.newConcurrentMap(); private final Map<QueryHandle, Future<LensResultSet>> resultSetMap = Maps.newConcurrentMap(); - private final Map<QueryHandle, QueryCompletionListener> handleListenerMap = Maps.newConcurrentMap(); + private final Map<QueryHandle, QueryContext> handleContextMap = Maps.newConcurrentMap(); @Override public Configuration getConf() { @@ -138,27 +138,23 @@ public class ESDriver extends AbstractLensDriver { @Override public LensResultSet execute(QueryContext context) throws LensException { + handleContextMap.put(context.getQueryHandle(), context); final ESQuery esQuery = rewrite(context); - final QueryHandle queryHandle = context.getQueryHandle(); final ESResultSet resultSet = esClient.execute(esQuery); - notifyComplIfRegistered(queryHandle); + context.setDriverStatus(DriverQueryStatus.DriverQueryState.SUCCESSFUL); + handleContextMap.remove(context.getQueryHandle()); return resultSet; } @Override public void executeAsync(final QueryContext context) { + handleContextMap.put(context.getQueryHandle(), context); final Future<LensResultSet> futureResult = asyncQueryPool.submit(new ESQueryExecuteCallable(context, SessionState.get())); resultSetMap.put(context.getQueryHandle(), futureResult); } @Override - public void registerForCompletionNotification(QueryHandle handle, long timeoutMillis, - QueryCompletionListener listener) { - handleListenerMap.put(handle, listener); - } - - @Override public void updateStatus(QueryContext context) { final QueryHandle queryHandle = context.getQueryHandle(); final Future<LensResultSet> lensResultSetFuture = resultSetMap.get(queryHandle); @@ -205,7 +201,7 @@ public class ESDriver extends AbstractLensDriver { try { boolean cancelled = resultSetMap.get(handle).cancel(true); if (cancelled) { - notifyQueryCancellation(handle); + handleContextMap.get(handle).setDriverStatus(DriverQueryStatus.DriverQueryState.CANCELED); } return cancelled; } catch (NullPointerException e) { @@ -217,7 +213,7 @@ public class ESDriver extends AbstractLensDriver { public void closeQuery(QueryHandle handle) throws LensException { cancelQuery(handle); closeResultSet(handle); - handleListenerMap.remove(handle); + handleContextMap.remove(handle); } @Override @@ -246,22 +242,6 @@ public class ESDriver extends AbstractLensDriver { return ImmutableSet.copyOf(Sets.<WaitingQueriesSelectionPolicy>newHashSet()); } - private void notifyComplIfRegistered(QueryHandle queryHandle) { - try { - handleListenerMap.get(queryHandle).onCompletion(queryHandle); - } catch (NullPointerException e) { - log.debug("There are no subscriptions for notification. Skipping for {}", queryHandle.getHandleIdString(), e); - } - } - - private void notifyQueryCancellation(QueryHandle handle) { - try { - handleListenerMap.get(handle).onError(handle, handle + " cancelled"); - } catch (NullPointerException e) { - log.debug("There are no subscriptions for notification. Skipping for {}", handle.getHandleIdString(), e); - } - } - private ESQuery rewrite(AbstractQueryContext context) throws LensException { final String key = keyFor(context); if (rewrittenQueriesCache.containsKey(key)) { http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java index 1326611..0218be3 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java @@ -582,6 +582,52 @@ public class HiveDriver extends AbstractLensDriver { throw new LensException(DRIVER_ERROR.getLensErrorInfo(), ex, ex.getMessage()); } + private DriverQueryStatus updateDriverStateFromOperationStatus(OperationHandle handle, DriverQueryStatus status) + throws LensException, HiveSQLException { + if (status == null) { + status = new DriverQueryStatus(); + } + OperationStatus opStatus = getClient().getOperationStatus(handle); + log.debug("GetStatus on hiveHandle: {} returned state:", handle, opStatus.getState().name()); + switch (opStatus.getState()) { + case CANCELED: + status.setState(DriverQueryState.CANCELED); + status.setStatusMessage("Query has been cancelled!"); + break; + case CLOSED: + status.setState(DriverQueryState.CLOSED); + status.setStatusMessage("Query has been closed!"); + break; + case ERROR: + status.setState(DriverQueryState.FAILED); + status.setStatusMessage("Query execution failed!"); + status.setErrorMessage( + "Query failed with errorCode:" + opStatus.getOperationException().getErrorCode() + " with errorMessage: " + + opStatus.getOperationException().getMessage()); + break; + case FINISHED: + status.setState(DriverQueryState.SUCCESSFUL); + status.setStatusMessage("Query is successful!"); + status.setResultSetAvailable(handle.hasResultSet()); + break; + case INITIALIZED: + status.setState(DriverQueryState.INITIALIZED); + status.setStatusMessage("Query is initiazed in HiveServer!"); + break; + case RUNNING: + status.setState(DriverQueryState.RUNNING); + status.setStatusMessage("Query is running in HiveServer!"); + break; + case PENDING: + status.setState(DriverQueryState.PENDING); + status.setStatusMessage("Query is pending in HiveServer"); + break; + case UNKNOWN: + default: + throw new LensException("Query is in unknown state at HiveServer"); + } + return status; + } /* * (non-Javadoc) * @@ -600,45 +646,7 @@ public class HiveDriver extends AbstractLensDriver { log.debug("GetStatus hiveHandle: {}", hiveHandle); fetchLogs(hiveHandle); OperationStatus opStatus = getClient().getOperationStatus(hiveHandle); - log.debug("GetStatus on hiveHandle: {} returned state:", hiveHandle, opStatus.getState().name()); - switch (opStatus.getState()) { - case CANCELED: - context.getDriverStatus().setState(DriverQueryState.CANCELED); - context.getDriverStatus().setStatusMessage("Query has been cancelled!"); - break; - case CLOSED: - context.getDriverStatus().setState(DriverQueryState.CLOSED); - context.getDriverStatus().setStatusMessage("Query has been closed!"); - break; - case ERROR: - context.getDriverStatus().setState(DriverQueryState.FAILED); - context.getDriverStatus().setStatusMessage("Query execution failed!"); - context.getDriverStatus().setErrorMessage( - "Query failed with errorCode:" + opStatus.getOperationException().getErrorCode() + " with errorMessage: " - + opStatus.getOperationException().getMessage()); - break; - case FINISHED: - context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL); - context.getDriverStatus().setStatusMessage("Query is successful!"); - context.getDriverStatus().setResultSetAvailable(hiveHandle.hasResultSet()); - break; - case INITIALIZED: - context.getDriverStatus().setState(DriverQueryState.INITIALIZED); - context.getDriverStatus().setStatusMessage("Query is initiazed in HiveServer!"); - break; - case RUNNING: - context.getDriverStatus().setState(DriverQueryState.RUNNING); - context.getDriverStatus().setStatusMessage("Query is running in HiveServer!"); - break; - case PENDING: - context.getDriverStatus().setState(DriverQueryState.PENDING); - context.getDriverStatus().setStatusMessage("Query is pending in HiveServer"); - break; - case UNKNOWN: - default: - throw new LensException("Query is in unknown state at HiveServer"); - } - + updateDriverStateFromOperationStatus(hiveHandle, context.getDriverStatus()); float progress = 0f; String jsonTaskStatus = opStatus.getTaskStatus(); String errorMsg = null; @@ -1047,8 +1055,7 @@ public class HiveDriver extends AbstractLensDriver { * @param listener the listener * @throws LensException the lens exception */ - QueryCompletionNotifier(QueryHandle handle, long timeoutMillis, QueryCompletionListener listener) - throws LensException { + QueryCompletionNotifier(QueryHandle handle, long timeoutMillis, QueryCompletionListener listener) { this.handle = handle; this.timeoutMillis = timeoutMillis; this.listener = listener; @@ -1073,7 +1080,7 @@ public class HiveDriver extends AbstractLensDriver { try { hiveHandle = getHiveHandle(handle); if (isFinished(hiveHandle)) { - listener.onCompletion(handle); + listener.onDriverStatusUpdated(handle, updateDriverStateFromOperationStatus(hiveHandle, null)); return; } } catch (LensException e) { @@ -1117,13 +1124,12 @@ public class HiveDriver extends AbstractLensDriver { * * @see * org.apache.lens.server.api.driver.LensDriver#registerForCompletionNotification - * (org.apache.lens.api.query.QueryHandle, long, org.apache.lens.server.api.driver.QueryCompletionListener) + * (org.apache.lens.api.query.QueryHandle, long, org.apache.lens.server.api.driver.QueryDriverStatusUpdateListener) */ @Override public void registerForCompletionNotification( - QueryHandle handle, long timeoutMillis, QueryCompletionListener listener) - throws LensException { - Thread th = new Thread(new QueryCompletionNotifier(handle, timeoutMillis, listener)); + QueryContext context, long timeoutMillis, QueryCompletionListener listener) { + Thread th = new Thread(new QueryCompletionNotifier(context.getQueryHandle(), timeoutMillis, listener)); th.start(); } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java index 6dff173..8a776a8 100644 --- a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java +++ b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java @@ -21,6 +21,7 @@ package org.apache.lens.driver.hive; import static org.testng.Assert.assertEquals; import java.io.*; +import java.net.Socket; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -104,8 +105,14 @@ public class TestRemoteHiveDriver extends TestHiveDriver { hiveConf.addResource(remoteConf); server.init(hiveConf); server.start(); - // TODO figure out a better way to wait for thrift service to start - Thread.sleep(7000); + while (true) { + try { + new Socket(HS2_HOST, HS2_PORT); + break; + } catch (Throwable th) { + Thread.sleep(1000); + } + } } /** http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-driver-hive/src/test/resources/hive-site.xml ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/test/resources/hive-site.xml b/lens-driver-hive/src/test/resources/hive-site.xml index 2f4076a..48ec90f 100644 --- a/lens-driver-hive/src/test/resources/hive-site.xml +++ b/lens-driver-hive/src/test/resources/hive-site.xml @@ -59,4 +59,9 @@ <value>true</value> </property> + <property> + <name>hive.metastore.schema.verification</name> + <value>false</value> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java index bebb9ae..3bf5e8f 100644 --- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java +++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java @@ -70,9 +70,7 @@ import org.apache.hadoop.hive.ql.parse.HiveParser; import com.google.common.collect.ImmutableSet; -import lombok.Getter; -import lombok.NonNull; -import lombok.Setter; +import lombok.*; import lombok.extern.slf4j.Slf4j; /** @@ -116,90 +114,27 @@ public class JDBCDriver extends AbstractLensDriver { /** * Data related to a query submitted to JDBCDriver. */ + @Data protected class JdbcQueryContext { /** The lens context. */ - @Getter private final QueryContext lensContext; /** The result future. */ - @Getter - @Setter private Future<QueryResult> resultFuture; /** The rewritten query. */ - @Getter - @Setter private String rewrittenQuery; /** The is prepared. */ - @Getter - @Setter private boolean isPrepared; - /** The is cancelled. */ - @Getter - @Setter - private boolean isCancelled; - /** The is closed. */ - @Getter private boolean isClosed; - /** The listener. */ - @Getter - @Setter - private QueryCompletionListener listener; - /** The query result. */ - @Getter - @Setter private QueryResult queryResult; - /** The start time. */ - @Getter - @Setter - private long startTime; - - /** The end time. */ - @Getter - @Setter - private long endTime; - - private final LogSegregationContext logSegregationContext; - - /** - * Instantiates a new jdbc query context. - * - * @param context the context - */ - public JdbcQueryContext(QueryContext context, @NonNull final LogSegregationContext logSegregationContext) { - this.logSegregationContext = logSegregationContext; - this.lensContext = context; - } - - /** - * Notify error. - * - * @param th the th - */ - public void notifyError(Throwable th) { - // If query is closed in another thread while the callable is still waiting for result - // set, then it throws an SQLException in the callable. We don't want to send that exception - if (listener != null && !isClosed) { - listener.onError(lensContext.getQueryHandle(), th.getMessage()); - } - } - - /** - * Notify complete. - */ - public void notifyComplete() { - if (listener != null) { - listener.onCompletion(lensContext.getQueryHandle()); - } - } - /** * Close result. */ @@ -325,7 +260,7 @@ public class JDBCDriver extends AbstractLensDriver { public QueryCallable(JdbcQueryContext queryContext, @NonNull LogSegregationContext logSegregationContext) { this.queryContext = queryContext; this.logSegregationContext = logSegregationContext; - queryContext.setStartTime(System.currentTimeMillis()); + queryContext.getLensContext().setDriverStatus(DriverQueryState.INITIALIZED); } /* @@ -335,15 +270,13 @@ public class JDBCDriver extends AbstractLensDriver { */ @Override public QueryResult call() { - logSegregationContext.setLogSegragationAndQueryId(this.queryContext.getQueryHandleString()); - + queryContext.getLensContext().setDriverStatus(DriverQueryState.RUNNING); Statement stmt; Connection conn = null; QueryResult result = new QueryResult(); try { queryContext.setQueryResult(result); - try { conn = getConnection(); result.conn = conn; @@ -351,33 +284,42 @@ public class JDBCDriver extends AbstractLensDriver { log.error("Error obtaining connection: ", e); result.error = e; } - if (conn != null) { try { stmt = createStatement(conn); result.stmt = stmt; Boolean isResultAvailable = stmt.execute(queryContext.getRewrittenQuery()); + if (queryContext.getLensContext().getDriverStatus().isCanceled()) { + return result; + } + queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable); + queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL); if (isResultAvailable) { result.resultSet = stmt.getResultSet(); } - queryContext.notifyComplete(); - } catch (SQLException sqlEx) { + } catch (Exception e) { + if (queryContext.getLensContext().getDriverStatus().isCanceled()) { + return result; + } if (queryContext.isClosed()) { log.info("Ignored exception on already closed query : {} - {}", - queryContext.getLensContext().getQueryHandle(), sqlEx.getMessage()); + queryContext.getLensContext().getQueryHandle(), e.getMessage()); } else { log.error("Error executing SQL query: {} reason: {}", queryContext.getLensContext().getQueryHandle(), - sqlEx.getMessage(), sqlEx); - result.error = sqlEx; + e.getMessage(), e); + result.error = e; + queryContext.getLensContext().setDriverStatus(DriverQueryState.FAILED, e.getMessage()); // Close connection in case of failed queries. For successful queries, connection is closed // When result set is closed or driver.closeQuery is called result.close(); - queryContext.notifyError(sqlEx); } } } } finally { - queryContext.setEndTime(System.currentTimeMillis()); + Long endTime = queryContext.getLensContext().getDriverStatus().getDriverFinishTime(); + if (endTime == null || endTime <= 0) { + queryContext.getLensContext().getDriverStatus().setDriverFinishTime(System.currentTimeMillis()); + } } return result; } @@ -688,7 +630,7 @@ public class JDBCDriver extends AbstractLensDriver { /** * Validate query using prepare * - * @param pContext + * @param pContext context to validate * @throws LensException */ public void validate(AbstractQueryContext pContext) throws LensException { @@ -698,7 +640,7 @@ public class JDBCDriver extends AbstractLensDriver { boolean validateThroughPrepare = pContext.getDriverConf(this).getBoolean(JDBC_VALIDATE_THROUGH_PREPARE, DEFAULT_JDBC_VALIDATE_THROUGH_PREPARE); if (validateThroughPrepare) { - PreparedStatement stmt = null; + PreparedStatement stmt; // Estimate queries need to get connection from estimate pool to make sure // we are not blocked by data queries. stmt = prepareInternal(pContext, true, true, "validate-"); @@ -771,8 +713,8 @@ public class JDBCDriver extends AbstractLensDriver { /** * Internally prepare the query * - * @param pContext - * @return + * @param pContext prepare context + * @return prepared statement of the query * @throws LensException */ private PreparedStatement prepareInternal(AbstractQueryContext pContext) throws LensException { @@ -940,7 +882,7 @@ public class JDBCDriver extends AbstractLensDriver { */ private QueryResult executeInternal(QueryContext context, String rewrittenQuery) throws LensException { - JdbcQueryContext queryContext = new JdbcQueryContext(context, logSegregationContext); + JdbcQueryContext queryContext = new JdbcQueryContext(context); queryContext.setPrepared(false); queryContext.setRewrittenQuery(rewrittenQuery); return new QueryCallable(queryContext, logSegregationContext).call(); @@ -959,7 +901,7 @@ public class JDBCDriver extends AbstractLensDriver { // Always use the driver rewritten query not user query. Since the // conf we are passing here is query context conf, we need to add jdbc xml in resource path String rewrittenQuery = rewriteQuery(context); - JdbcQueryContext jdbcCtx = new JdbcQueryContext(context, logSegregationContext); + JdbcQueryContext jdbcCtx = new JdbcQueryContext(context); jdbcCtx.setRewrittenQuery(rewrittenQuery); try { Future<QueryResult> future = asyncQueryPool.submit(new QueryCallable(jdbcCtx, logSegregationContext)); @@ -973,21 +915,6 @@ public class JDBCDriver extends AbstractLensDriver { } /** - * Register for query completion notification. - * - * @param handle the handle - * @param timeoutMillis the timeout millis - * @param listener the listener - * @throws LensException the lens exception - */ - @Override - public void registerForCompletionNotification(QueryHandle handle, long timeoutMillis, - QueryCompletionListener listener) throws LensException { - checkConfigured(); - getQueryContext(handle).setListener(listener); - } - - /** * Get status of the query, specified by the handle. * * @param context The query handle @@ -997,15 +924,18 @@ public class JDBCDriver extends AbstractLensDriver { public void updateStatus(QueryContext context) throws LensException { checkConfigured(); JdbcQueryContext ctx = getQueryContext(context.getQueryHandle()); - context.getDriverStatus().setDriverStartTime(ctx.getStartTime()); - if (ctx.getResultFuture().isDone()) { - // Since future is already done, this call should not block + if (ctx.getLensContext().getDriverStatus().isFinished()) { + // terminal state. No updates can be done. + return; + } + if (ctx.getResultFuture().isCancelled()) { context.getDriverStatus().setProgress(1.0); - context.getDriverStatus().setDriverFinishTime(ctx.getEndTime()); - if (ctx.isCancelled()) { - context.getDriverStatus().setState(DriverQueryState.CANCELED); - context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " cancelled"); - } else if (ctx.getQueryResult() != null && ctx.getQueryResult().error != null) { + context.getDriverStatus().setState(DriverQueryState.CANCELED); + context.getDriverStatus().setStatusMessage("Query Canceled"); + } else if (ctx.getResultFuture().isDone()) { + context.getDriverStatus().setProgress(1.0); + // Since future is already done, this call should not block + if (ctx.getQueryResult() != null && ctx.getQueryResult().error != null) { context.getDriverStatus().setState(DriverQueryState.FAILED); context.getDriverStatus().setStatusMessage("Query execution failed!"); context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage()); @@ -1015,7 +945,6 @@ public class JDBCDriver extends AbstractLensDriver { context.getDriverStatus().setResultSetAvailable(true); } } else { - context.getDriverStatus().setProgress(0.0); context.getDriverStatus().setState(DriverQueryState.RUNNING); context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " is running"); } @@ -1029,8 +958,8 @@ public class JDBCDriver extends AbstractLensDriver { private LensResultSet getDriverResult(QueryContext context) throws LensException { JdbcQueryContext ctx = getQueryContext(context.getQueryHandle()); - if (ctx.isCancelled()) { - throw new LensException("Result set not available for cancelled query " + context.getQueryHandle()); + if (ctx.getLensContext().getDriverStatus().isCanceled()) { + throw new LensException("Result set not available for canceled query " + context.getQueryHandle()); } Future<QueryResult> future = ctx.getResultFuture(); @@ -1043,7 +972,7 @@ public class JDBCDriver extends AbstractLensDriver { } catch (ExecutionException e) { throw new LensException("Error while executing query " + queryHandle.getHandleId() + " in background", e); } catch (CancellationException e) { - throw new LensException("Query was already cancelled " + queryHandle.getHandleId(), e); + throw new LensException("Query was already canceled " + queryHandle.getHandleId(), e); } } @@ -1073,14 +1002,9 @@ public class JDBCDriver extends AbstractLensDriver { log.info("{} cancel request on query {}", getFullyQualifiedName(), handle); boolean cancelResult = context.cancel(); if (cancelResult) { - context.setCancelled(true); - // this is required because future.cancel does not guarantee - // that finally block is always called. - if (context.getEndTime() == 0) { - context.setEndTime(System.currentTimeMillis()); - } + context.getLensContext().setDriverStatus(DriverQueryState.CANCELED); context.closeResult(); - log.info("{} Cancelled query : {}", getFullyQualifiedName(), handle); + log.info("{} Canceled query : {}", getFullyQualifiedName(), handle); } return cancelResult; } @@ -1113,7 +1037,7 @@ public class JDBCDriver extends AbstractLensDriver { public void close() throws LensException { checkConfigured(); try { - for (QueryHandle query : new ArrayList<QueryHandle>(queryContextMap.keySet())) { + for (QueryHandle query : new ArrayList<>(queryContextMap.keySet())) { try { closeQuery(query); } catch (LensException e) { @@ -1176,4 +1100,9 @@ public class JDBCDriver extends AbstractLensDriver { public DriverQueryHook getQueryHook() { return queryHook; } + + @Override + public StatusUpdateMethod getStatusUpdateMethod() { + return StatusUpdateMethod.PUSH; + } } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java index 12fa6f0..8e042d1 100644 --- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java +++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestColumnarSQLRewriter.java @@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -1082,12 +1083,15 @@ public class TestColumnarSQLRewriter { Database database = new Database(); database.setName("mydb"); - Hive.get(hconf).createDatabase(database); - SessionState.get().setCurrentDatabase("mydb"); - createTable(hconf, "mydb", "mytable", "testDB", "testTable_1"); - createTable(hconf, "mydb", "mytable_2", "testDB", "testTable_2"); - createTable(hconf, "default", "mytable_3", "testDB", "testTable_3"); - + try { + Hive.get(hconf).createDatabase(database); + SessionState.get().setCurrentDatabase("mydb"); + createTable(hconf, "mydb", "mytable", "testDB", "testTable_1"); + createTable(hconf, "mydb", "mytable_2", "testDB", "testTable_2"); + createTable(hconf, "default", "mytable_3", "testDB", "testTable_3"); + } catch (AlreadyExistsException e) { + //pass + } String query = "SELECT * FROM mydb.mytable t1 JOIN mytable_2 t2 ON t1.t2id = t2.id " + " left outer join default.mytable_3 t3 on t2.t3id = t3.id " + "WHERE A = 100"; @@ -1176,7 +1180,11 @@ public class TestColumnarSQLRewriter { Database database = new Database(); database.setName(testDB); - Hive.get(hconf).createDatabase(database); + try { + Hive.get(hconf).createDatabase(database); + } catch(AlreadyExistsException ignored) { + //ignore + } try { SessionState.get().setCurrentDatabase(testDB); Map<String, String> columnMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java index e7636d2..6e9086f 100644 --- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java +++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java @@ -457,7 +457,7 @@ public class TestJdbcDriver { /** * Data provider for test case {@link #testExecuteWithPreFetch(int, boolean, int, boolean, long)} ()} - * @return + * @return data */ @DataProvider public Object[][] executeWithPreFetchDP() { @@ -470,7 +470,7 @@ public class TestJdbcDriver { }; } - /** + /**Testjdbcdri * @param rowsToPreFecth : requested number of rows to be pre-fetched * @param isComplteleyFetched : whether the wrapped in memory result has been completely accessed due to pre fetch * @param rowsPreFetched : actual rows pre-fetched @@ -689,7 +689,7 @@ public class TestJdbcDriver { executeAsync(context); QueryHandle handle = context.getQueryHandle(); - driver.registerForCompletionNotification(handle, 0, listener); + driver.registerForCompletionNotification(context, 0, listener); while (true) { driver.updateStatus(context); @@ -881,7 +881,6 @@ public class TestJdbcDriver { driver.updateStatus(context); assertTrue(isCancelled); assertEquals(context.getDriverStatus().getState(), DriverQueryState.CANCELED); - assertTrue(context.getDriverStatus().getDriverStartTime() > 0); assertTrue(context.getDriverStatus().getDriverFinishTime() > 0); driver.closeQuery(handle); @@ -915,22 +914,21 @@ public class TestJdbcDriver { public void onCompletion(QueryHandle handle) { fail("Was expecting this query to fail " + handle); } + }; executeAsync(ctx); QueryHandle handle = ctx.getQueryHandle(); - driver.registerForCompletionNotification(handle, 0, listener); + driver.registerForCompletionNotification(ctx, 0, listener); - while (true) { + while (!ctx.getDriverStatus().isFinished()) { driver.updateStatus(ctx); System.out.println("Query: " + handle + " Status: " + ctx.getDriverStatus()); - if (ctx.getDriverStatus().isFinished()) { - assertEquals(ctx.getDriverStatus().getState(), DriverQueryState.FAILED); - assertEquals(ctx.getDriverStatus().getProgress(), 1.0); - break; - } Thread.sleep(500); } + assertEquals(ctx.getDriverStatus().getState(), DriverQueryState.FAILED); + assertEquals(ctx.getDriverStatus().getProgress(), 1.0); + assertTrue(ctx.getDriverStatus().getDriverStartTime() > 0); assertTrue(ctx.getDriverStatus().getDriverFinishTime() > 0); http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-driver-jdbc/src/test/resources/hive-site.xml ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/test/resources/hive-site.xml b/lens-driver-jdbc/src/test/resources/hive-site.xml index b497ca1..ef41681 100644 --- a/lens-driver-jdbc/src/test/resources/hive-site.xml +++ b/lens-driver-jdbc/src/test/resources/hive-site.xml @@ -49,4 +49,9 @@ <value>false</value> </property> + <property> + <name>hive.metastore.schema.verification</name> + <value>false</value> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java index f1d844a..03079e2 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java @@ -106,6 +106,17 @@ public abstract class AbstractLensDriver implements LensDriver { } @Override + public StatusUpdateMethod getStatusUpdateMethod() { + return StatusUpdateMethod.PULL; + } + + @Override + public void registerForCompletionNotification(QueryContext context, long timeoutMillis, + QueryCompletionListener listener) { + context.registerStatusUpdateListener(listener); + } + + @Override public String toString() { return getFullyQualifiedName(); } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java index 2374c1e..033f677 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/DriverQueryStatus.java @@ -35,6 +35,8 @@ public class DriverQueryStatus implements Serializable { */ private static final long serialVersionUID = 1L; + + /** * The Enum DriverQueryState. */ @@ -43,42 +45,52 @@ public class DriverQueryStatus implements Serializable { /** * The new. */ - NEW, + NEW(0), /** * The initialized. */ - INITIALIZED, + INITIALIZED(1), /** * The pending. */ - PENDING, + PENDING(2), /** * The running. */ - RUNNING, + RUNNING(3), /** * The successful. */ - SUCCESSFUL, + SUCCESSFUL(4), /** * The failed. */ - FAILED, + FAILED(4), /** * The canceled. */ - CANCELED, + CANCELED(4), /** * The closed. */ - CLOSED + CLOSED(5); + + private int order; + + DriverQueryState(int order) { + this.order = order; + } + + public int getOrder() { + return order; + } } /** @@ -171,18 +183,6 @@ public class DriverQueryStatus implements Serializable { errorMessage, null); } - /** - * Creates the query status. - * - * @param state the state - * @param dstatus the dstatus - * @return the query status - */ - public static QueryStatus createQueryStatus(QueryStatus.Status state, DriverQueryStatus dstatus) { - return new QueryStatus(dstatus.progress, null, state, dstatus.statusMessage, - dstatus.isResultSetAvailable, dstatus.progressMessage, dstatus.errorMessage, null); - } - /* * (non-Javadoc) * @@ -215,5 +215,8 @@ public class DriverQueryStatus implements Serializable { public boolean isSuccessful() { return state.equals(DriverQueryState.SUCCESSFUL); } + public boolean isCanceled() { + return state.equals(DriverQueryState.CANCELED); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java index 95ea360..e472de0 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java @@ -125,12 +125,14 @@ public interface LensDriver extends Externalizable { /** * Register for query completion notification. * - * @param handle the handle + * @param context the context * @param timeoutMillis the timeout millis - * @param listener the listener + * @param listener the listener. Only query completions are guaranteed to be notified. + * Notably: SUCCESS and FAILURE * @throws LensException the lens exception */ - void registerForCompletionNotification(QueryHandle handle, long timeoutMillis, QueryCompletionListener listener) + void registerForCompletionNotification(QueryContext context, long timeoutMillis, + QueryCompletionListener listener) throws LensException; /** @@ -215,8 +217,7 @@ public interface LensDriver extends Externalizable { /** * decide priority based on query's cost. The cost should be already computed by estimate call, but it's * not guaranteed to be pre-computed. It's up to the driver to do an on-demand computation of cost. - * @see AbstractQueryContext#decidePriority(LensDriver, QueryPriorityDecider) that handles this on-demand computation. - * @param queryContext + * @param queryContext Query context whose priority is to be decided */ Priority decidePriority(AbstractQueryContext queryContext); @@ -225,4 +226,10 @@ public interface LensDriver extends Externalizable { * @see DriverQueryHook for more details. */ DriverQueryHook getQueryHook(); + + /** + * + * @return The method of status update supported by this driver. + */ + StatusUpdateMethod getStatusUpdateMethod(); } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryCompletionListener.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryCompletionListener.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryCompletionListener.java index 3713b51..d0da3ac 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryCompletionListener.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryCompletionListener.java @@ -27,14 +27,13 @@ import org.apache.lens.api.query.QueryHandle; * the queryCompletion event occurs, that object's appropriate * method is invoked. */ -public interface QueryCompletionListener { - +public abstract class QueryCompletionListener implements QueryDriverStatusUpdateListener { /** * On completion. * * @param handle the handle */ - void onCompletion(QueryHandle handle); + public abstract void onCompletion(QueryHandle handle); /** * On error. @@ -42,6 +41,22 @@ public interface QueryCompletionListener { * @param handle the handle * @param error the error */ - void onError(QueryHandle handle, String error); + public abstract void onError(QueryHandle handle, String error); + @Override + public void onDriverStatusUpdated(QueryHandle handle, DriverQueryStatus status) { + switch (status.getState()) { + case SUCCESSFUL: + onCompletion(handle); + break; + case FAILED: + onError(handle, status.getErrorMessage()); + break; + case CANCELED: + onError(handle, "Query cancelled"); + break; + default: + break; + } + } } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryDriverStatusUpdateListener.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryDriverStatusUpdateListener.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryDriverStatusUpdateListener.java new file mode 100644 index 0000000..dcddd1b --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/QueryDriverStatusUpdateListener.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.driver; + +import org.apache.lens.api.query.QueryHandle; + +/** + * This listener has callback method for status update events + */ +public interface QueryDriverStatusUpdateListener { + void onDriverStatusUpdated(QueryHandle handle, DriverQueryStatus status); +} http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server-api/src/main/java/org/apache/lens/server/api/driver/StatusUpdateMethod.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/StatusUpdateMethod.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/StatusUpdateMethod.java new file mode 100644 index 0000000..0366bd1 --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/StatusUpdateMethod.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.driver; + +public enum StatusUpdateMethod { + /** + * Async status update: driver supports taking a callback and updating status through that. Should be used for + * fast drivers e.g. JDBC, ES, Druid + */ + PUSH, + /** + * Driver has to be polled for status updates. Should be used for slow drivers e.g. Hive, Spark. + */ + PULL +} http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index 2641b60..b584c6a 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -24,6 +24,7 @@ import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RES import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Future; @@ -36,13 +37,8 @@ import org.apache.lens.api.query.QueryStatus.Status; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.common.BackOffRetryHandler; import org.apache.lens.server.api.common.FailureContext; -import org.apache.lens.server.api.driver.DriverQueryStatus; -import org.apache.lens.server.api.driver.InMemoryResultSet; -import org.apache.lens.server.api.driver.LensDriver; -import org.apache.lens.server.api.driver.LensResultSet; -import org.apache.lens.server.api.driver.PartiallyFetchedInMemoryResultSet; +import org.apache.lens.server.api.driver.*; import org.apache.lens.server.api.error.LensException; -import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy; import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; import org.apache.lens.server.api.util.LensUtil; @@ -207,6 +203,8 @@ public class QueryContext extends AbstractQueryContext { @Getter @Setter private transient Future queryLauncher; + private List<QueryDriverStatusUpdateListener> driverStatusUpdateListener = Lists.newArrayList(); + /** * Creates context from query * @@ -446,8 +444,7 @@ public class QueryContext extends AbstractQueryContext { /** - * Get query handle string - * @return + * @return query handle string */ @Override public String getLogHandle() { @@ -489,10 +486,6 @@ public class QueryContext extends AbstractQueryContext { return getSelectedDriver().getQueryConstraints(); } - public ImmutableSet<WaitingQueriesSelectionPolicy> getSelectedDriverSelectionPolicies() { - return getSelectedDriver().getWaitingQuerySelectionPolicies(); - } - public synchronized void registerDriverResult(LensResultSet result) throws LensException { if (isDriverResultRegistered) { return; //already registered @@ -528,8 +521,58 @@ public class QueryContext extends AbstractQueryContext { } this.driverResult = result; } + public void setDriverStatus(DriverQueryStatus.DriverQueryState state, String message) { + if (getDriverStatus().getState().getOrder() > state.getOrder()) { + log.info("current driver status: {}, ignoring transition request to {}", getDriverStatus().getState(), state); + return; + } + switch (state) { + case NEW: + case INITIALIZED: + case PENDING: + getDriverStatus().setProgress(0.0); + case RUNNING: + if (getDriverStatus().getDriverStartTime() == null || getDriverStatus().getDriverStartTime() <= 0) { + getDriverStatus().setDriverStartTime(System.currentTimeMillis()); + } + break; + case SUCCESSFUL: + case FAILED: + case CANCELED: + getDriverStatus().setProgress(1.0); + if (getDriverStatus().getDriverFinishTime() == null || getDriverStatus().getDriverFinishTime() <= 0) { + getDriverStatus().setDriverFinishTime(System.currentTimeMillis()); + } + break; + default: + break; + } + if (message != null) { + if (state == DriverQueryStatus.DriverQueryState.FAILED) { + getDriverStatus().setErrorMessage(message); + } else { + getDriverStatus().setStatusMessage(message); + } + } + if (getDriverStatus().getStatusMessage() == null) { + getDriverStatus().setStatusMessage("Query " + getQueryHandleString() + " " + state.name().toLowerCase()); + } + getDriverStatus().setState(state); + for (QueryDriverStatusUpdateListener listener: this.driverStatusUpdateListener) { + listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus()); + } + } public String toString() { return queryHandle + ":" + this.status; } + + public void setDriverStatus(DriverQueryStatus.DriverQueryState state) { + setDriverStatus(state, null); + } + + + public void registerStatusUpdateListener(QueryDriverStatusUpdateListener driverStatusUpdateListener) { + this.driverStatusUpdateListener.add(driverStatusUpdateListener); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java index 59f8569..168b3cc 100644 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java @@ -388,12 +388,11 @@ public class MockDriver extends AbstractLensDriver { * * @see * org.apache.lens.server.api.driver.LensDriver#registerForCompletionNotification - * (org.apache.lens.api.query.QueryHandle, long, org.apache.lens.server.api.driver.QueryCompletionListener) + * (org.apache.lens.api.query.QueryHandle, long, org.apache.lens.server.api.driver.QueryDriverStatusUpdateListener) */ @Override - public void registerForCompletionNotification(QueryHandle handle, - long timeoutMillis, QueryCompletionListener listener) - throws LensException { + public void registerForCompletionNotification(QueryContext context, + long timeoutMillis, QueryCompletionListener listener) { // TODO Auto-generated method stub } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server/src/main/java/org/apache/lens/server/LensServerConf.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/LensServerConf.java b/lens-server/src/main/java/org/apache/lens/server/LensServerConf.java index e977ebd..8d2a3fc 100644 --- a/lens-server/src/main/java/org/apache/lens/server/LensServerConf.java +++ b/lens-server/src/main/java/org/apache/lens/server/LensServerConf.java @@ -18,7 +18,6 @@ */ package org.apache.lens.server; -import java.util.Iterator; import java.util.Map; import org.apache.lens.server.api.LensConfConstants; @@ -45,9 +44,7 @@ public final class LensServerConf { HIVE_CONF.addResource("lens-site.xml"); Configuration conf = new Configuration(false); conf.addResource("lens-site.xml"); - Iterator<Map.Entry<String, String>> confItr = conf.iterator(); - while (confItr.hasNext()) { - Map.Entry<String, String> prop = confItr.next(); + for (Map.Entry<String, String> prop : conf) { if (!prop.getKey().startsWith(LensConfConstants.SERVER_PFX)) { OVERRIDING_CONF_FOR_DRIVER.set(prop.getKey(), prop.getValue()); } http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 78d0b8a..2f27fc2 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -196,6 +196,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE */ protected Thread querySubmitter; + private final AsyncStatusUpdater asyncStatusUpdater = new AsyncStatusUpdater(); /** * The status poller. */ @@ -754,9 +755,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE logSegregationContext.setLogSegragationAndQueryId(query.getQueryHandleString()); // acquire session before launching query. acquire(query.getLensSessionIdentifier()); - if (query.getStatus().cancelled()) { - return; - } else { + if (!query.getStatus().cancelled()) { launchQuery(query); } } catch (Exception e) { @@ -784,6 +783,10 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE checkEstimatedQueriesState(query); query.getSelectedDriver().getQueryHook().preLaunch(query); QueryStatus oldStatus = query.getStatus(); + // If driver supports async updates. + if (query.getSelectedDriver().getStatusUpdateMethod() == StatusUpdateMethod.PUSH) { + query.registerStatusUpdateListener(asyncStatusUpdater); + } QueryStatus newStatus = new QueryStatus(query.getStatus().getProgress(), null, QueryStatus.Status.LAUNCHED, "Query is launched on driver", false, null, null, null); query.validateTransition(newStatus); @@ -793,7 +796,6 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE query.setStatusSkippingTransitionTest(newStatus); query.setLaunchTime(System.currentTimeMillis()); query.clearTransientStateAfterLaunch(); - log.info("Added to launched queries. QueryId:{}", query.getQueryHandleString()); fireStatusChangeEvent(query, newStatus, oldStatus); } @@ -814,6 +816,17 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE querySubmitterRunnable.pausedForTest = pause; } + private class AsyncStatusUpdater implements QueryDriverStatusUpdateListener { + + @Override + public void onDriverStatusUpdated(QueryHandle handle, DriverQueryStatus status) { + try { + updateStatus(handle, false); + } catch (LensException e) { + log.error("Unable to update status from driver status for query {}", handle, e); + } + } + } /** * The Class StatusPoller. */ @@ -843,7 +856,6 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE if (ctx.isLaunching()) { continue; } - logSegregationContext.setLogSegragationAndQueryId(ctx.getQueryHandleString()); log.debug("Polling status for {}", ctx.getQueryHandle()); try { @@ -941,20 +953,25 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE * @throws LensException the lens exception */ private void updateStatus(final QueryHandle handle) throws LensException { + updateStatus(handle, true); + } + private void updateStatus(final QueryHandle handle, boolean updateDriverStatus) throws LensException { QueryContext ctx = allQueries.get(handle); if (ctx != null) { logSegregationContext.setLogSegragationAndQueryId(ctx.getLogHandle()); log.info("Updating status for {}", ctx.getQueryHandle()); synchronized (ctx) { QueryStatus before = ctx.getStatus(); - if (!ctx.queued() && !ctx.finished() && !ctx.getDriverStatus().isFinished()) { - try { - ctx.updateDriverStatus(statusUpdateRetryHandler); - } catch (LensException exc) { - // Status update from driver failed - setFailedStatus(ctx, "Status update failed", exc); - log.error("Status update failed for {}", handle, exc); - return; + if (!ctx.queued() && !ctx.finished()) { + if (updateDriverStatus) { + try { + ctx.updateDriverStatus(statusUpdateRetryHandler); + } catch (LensException exc) { + // Status update from driver failed + setFailedStatus(ctx, "Status update failed", exc); + log.error("Status update failed for {}", handle, exc); + return; + } } ctx.setStatus(ctx.getDriverStatus().toQueryStatus()); // query is successfully executed by driver and @@ -2284,7 +2301,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE if (totalWaitTime > 0 && !queryCtx.getStatus().executed() && !queryCtx.getStatus().finished()) { log.info("Registering for query {} completion notification", ctx.getQueryHandleString()); - queryCtx.getSelectedDriver().registerForCompletionNotification(handle, totalWaitTime, listener); + queryCtx.getSelectedDriver().registerForCompletionNotification(ctx, totalWaitTime, listener); try { // We will wait for a few millis at a time until we reach max required wait time and also check the state // each time we come out of the wait. @@ -2385,7 +2402,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE /** * The Class QueryCompletionListenerImpl. */ - class QueryCompletionListenerImpl implements QueryCompletionListener { + @Data + class QueryCompletionListenerImpl extends QueryCompletionListener { /** * The succeeded. @@ -2395,23 +2413,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE /** * The handle. */ - QueryHandle handle; - - /** - * Instantiates a new query completion listener impl. - * - * @param handle the handle - */ - QueryCompletionListenerImpl(QueryHandle handle) { - this.handle = handle; - } + final QueryHandle handle; - /* - * (non-Javadoc) - * - * @see - * org.apache.lens.server.api.driver.QueryCompletionListener#onCompletion(org.apache.lens.api.query.QueryHandle) - */ @Override public void onCompletion(QueryHandle handle) { synchronized (this) { @@ -2421,12 +2424,6 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } } - /* - * (non-Javadoc) - * - * @see org.apache.lens.server.api.driver.QueryCompletionListener#onError(org.apache.lens.api.query.QueryHandle, - * java.lang.String) - */ @Override public void onError(QueryHandle handle, String error) { synchronized (this) { @@ -2435,7 +2432,6 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE this.notify(); } } - } /* http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server/src/test/java/org/apache/lens/server/metastore/TestMetastoreService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/metastore/TestMetastoreService.java b/lens-server/src/test/java/org/apache/lens/server/metastore/TestMetastoreService.java index 9384e1c..0e6a4a1 100644 --- a/lens-server/src/test/java/org/apache/lens/server/metastore/TestMetastoreService.java +++ b/lens-server/src/test/java/org/apache/lens/server/metastore/TestMetastoreService.java @@ -1046,10 +1046,13 @@ public class TestMetastoreService extends LensJerseyTest { XDimension dimension = cubeObjectFactory.createXDimension(); dimension.setName(dimName); + dimension.setAttributes(new XDimAttributes()); dimension.setExpressions(new XExpressions()); dimension.setJoinChains(new XJoinChains()); - dimension.setProperties(new XProperties()); + dimension.setProperties(new XProperties().withProperty( + new XProperty().withName(MetastoreUtil.getDimTimedDimensionKey(dimName)).withValue("dt")) + ); XDimAttribute xd1 = cubeObjectFactory.createXDimAttribute(); xd1.setName("col1"); xd1.setType("STRING"); http://git-wip-us.apache.org/repos/asf/lens/blob/c2100fa2/lens-server/src/test/resources/hive-site.xml ---------------------------------------------------------------------- diff --git a/lens-server/src/test/resources/hive-site.xml b/lens-server/src/test/resources/hive-site.xml index 94c5012..8bf6fe0 100644 --- a/lens-server/src/test/resources/hive-site.xml +++ b/lens-server/src/test/resources/hive-site.xml @@ -70,4 +70,9 @@ <value>false</value> </property> + <property> + <name>hive.metastore.schema.verification</name> + <value>false</value> + </property> + </configuration>
