Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Wed Dec 18 01:48:24 2013 @@ -26,6 +26,7 @@ import org.apache.hive.service.cli.Fetch import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.OperationType; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; @@ -41,6 +42,7 @@ public abstract class Operation { public static final Log LOG = LogFactory.getLog(Operation.class.getName()); public static final long DEFAULT_FETCH_MAX_ROWS = 100; protected boolean hasResultSet; + protected volatile HiveSQLException operationException; protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); @@ -71,8 +73,8 @@ public abstract class Operation { return opHandle.getOperationType(); } - public OperationState getState() { - return state; + public OperationStatus getStatus() { + return new OperationStatus(state, operationException); } public boolean hasResultSet() { @@ -90,6 +92,10 @@ public abstract class Operation { return this.state; } + protected void setOperationException(HiveSQLException operationException) { + this.operationException = operationException; + } + protected final void assertState(OperationState state) throws HiveSQLException { if (this.state != state) { throw new HiveSQLException("Expected state " + state + ", but found " + this.state); @@ -97,19 +103,19 @@ public abstract class Operation { } public boolean isRunning() { - return OperationState.RUNNING.equals(getState()); + return OperationState.RUNNING.equals(state); } public boolean isFinished() { - return OperationState.FINISHED.equals(getState()); + return OperationState.FINISHED.equals(state); } public boolean isCanceled() { - return OperationState.CANCELED.equals(getState()); + return OperationState.CANCELED.equals(state); } public boolean isFailed() { - return OperationState.ERROR.equals(getState()); + return OperationState.ERROR.equals(state); } public abstract void run() throws HiveSQLException;
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Wed Dec 18 01:48:24 2013 @@ -27,7 +27,7 @@ import org.apache.hive.service.AbstractS import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; @@ -67,7 +67,7 @@ public class OperationManager extends Ab public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runAsync) - throws HiveSQLException { + throws HiveSQLException { ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); addOperation(executeStatementOperation); @@ -140,8 +140,8 @@ public class OperationManager extends Ab return handleToOperation.remove(opHandle); } - public OperationState getOperationState(OperationHandle opHandle) throws HiveSQLException { - return getOperation(opHandle).getState(); + public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + return getOperation(opHandle).getStatus(); } public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { @@ -167,7 +167,7 @@ public class OperationManager extends Ab public RowSet getOperationNextRowSet(OperationHandle opHandle, FetchOrientation orientation, long maxRows) - throws HiveSQLException { + throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); } } Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Wed Dec 18 01:48:24 2013 @@ -50,6 +50,7 @@ import org.apache.hadoop.io.BytesWritabl import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; @@ -66,7 +67,7 @@ public class SQLOperation extends Execut private Schema mResultSchema = null; private SerDe serde = null; private final boolean runAsync; - private Future<?> backgroundHandle; + private volatile Future<?> backgroundHandle; private boolean fetchStarted = false; public SQLOperation(HiveSession parentSession, String statement, Map<String, @@ -158,7 +159,7 @@ public class SQLOperation extends Execut public void run() throws HiveSQLException { setState(OperationState.PENDING); prepare(getConfigForOperation()); - if (!shouldRunAsync()) { + if (!runAsync) { runInternal(getConfigForOperation()); } else { Runnable backgroundOperation = new Runnable() { @@ -169,16 +170,15 @@ public class SQLOperation extends Execut try { runInternal(getConfigForOperation()); } catch (HiveSQLException e) { + setOperationException(e); LOG.error("Error: ", e); - // TODO: Return a more detailed error to the client, - // currently the async thread only writes to the log and sets the OperationState } } }; try { // This submit blocks if no background threads are available to run this operation backgroundHandle = - getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); + getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); } catch (RejectedExecutionException rejected) { setState(OperationState.ERROR); throw new HiveSQLException("All the asynchronous threads are currently busy, " + @@ -189,7 +189,7 @@ public class SQLOperation extends Execut private void cleanup(OperationState state) throws HiveSQLException { setState(state); - if (shouldRunAsync()) { + if (runAsync) { if (backgroundHandle != null) { backgroundHandle.cancel(true); } @@ -335,10 +335,6 @@ public class SQLOperation extends Execut return serde; } - private boolean shouldRunAsync() { - return runAsync; - } - /** * If there are query specific settings to overlay, then create a copy of config * There are two cases we need to clone the session config that's being passed to hive driver @@ -351,7 +347,7 @@ public class SQLOperation extends Execut */ private HiveConf getConfigForOperation() throws HiveSQLException { HiveConf sqlOperationConf = getParentSession().getHiveConf(); - if (!getConfOverlay().isEmpty() || shouldRunAsync()) { + if (!getConfOverlay().isEmpty() || runAsync) { // clone the partent session config for this query sqlOperationConf = new HiveConf(sqlOperationConf); Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Wed Dec 18 01:48:24 2013 @@ -202,7 +202,11 @@ public class HiveSessionImpl implements opHandleSet.add(opHandle); return opHandle; } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); + // Cleanup opHandle in case the query is synchronous + // Async query needs to retain and pass back the opHandle for error reporting + if (!runAsync) { + operationManager.closeOperation(opHandle); + } throw e; } finally { release(); Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Wed Dec 18 01:48:24 2013 @@ -43,13 +43,10 @@ import org.apache.hive.service.cli.opera public class SessionManager extends CompositeService { private static final Log LOG = LogFactory.getLog(CompositeService.class); - private HiveConf hiveConf; - private final Map<SessionHandle, HiveSession> handleToSession = new ConcurrentHashMap<SessionHandle, HiveSession>(); private final OperationManager operationManager = new OperationManager(); - private ThreadPoolExecutor backgroundOperationPool; public SessionManager() { @@ -96,19 +93,19 @@ public class SessionManager extends Comp } public SessionHandle openSession(String username, String password, Map<String, String> sessionConf) - throws HiveSQLException { - return openSession(username, password, sessionConf, false, null); + throws HiveSQLException { + return openSession(username, password, sessionConf, false, null); } public SessionHandle openSession(String username, String password, Map<String, String> sessionConf, - boolean withImpersonation, String delegationToken) throws HiveSQLException { + boolean withImpersonation, String delegationToken) throws HiveSQLException { if (username == null) { username = threadLocalUserName.get(); } HiveSession session; if (withImpersonation) { HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(username, password, - sessionConf, delegationToken); + sessionConf, delegationToken); session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); hiveSessionUgi.setProxySession(session); } else { Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Wed Dec 18 01:48:24 2013 @@ -37,7 +37,7 @@ import org.apache.hive.service.cli.GetIn import org.apache.hive.service.cli.GetInfoValue; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; @@ -114,6 +114,7 @@ public abstract class ThriftCLIService e @Override public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { + LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { SessionHandle sessionHandle = getSessionHandle(req); @@ -210,8 +211,8 @@ public abstract class ThriftCLIService e resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { - LOG.warn("Error executing statement: ", e); - resp.setStatus(HiveSQLException.toTStatus(e)); + LOG.warn("Error executing statement: ", e); + resp.setStatus(HiveSQLException.toTStatus(e)); } return resp; } @@ -328,8 +329,15 @@ public abstract class ThriftCLIService e public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException { TGetOperationStatusResp resp = new TGetOperationStatusResp(); try { - OperationState operationState = cliService.getOperationStatus(new OperationHandle(req.getOperationHandle())); - resp.setOperationState(operationState.toTOperationState()); + OperationStatus operationStatus = cliService.getOperationStatus( + new OperationHandle(req.getOperationHandle())); + resp.setOperationState(operationStatus.getState().toTOperationState()); + HiveSQLException opException = operationStatus.getOperationException(); + if (opException != null) { + resp.setSqlState(opException.getSQLState()); + resp.setErrorCode(opException.getErrorCode()); + resp.setErrorMessage(opException.getMessage()); + } resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting operation status: ", e); Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (original) +++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java Wed Dec 18 01:48:24 2013 @@ -28,6 +28,7 @@ import org.apache.hive.service.cli.GetIn import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; @@ -295,12 +296,18 @@ public class ThriftCLIServiceClient exte * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle) */ @Override - public OperationState getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { try { TGetOperationStatusReq req = new TGetOperationStatusReq(opHandle.toTOperationHandle()); TGetOperationStatusResp resp = cliService.GetOperationStatus(req); + // Checks the status of the RPC call, throws an exception in case of error checkStatus(resp.getStatus()); - return OperationState.getOperationState(resp.getOperationState()); + OperationState opState = OperationState.getOperationState(resp.getOperationState()); + HiveSQLException opException = null; + if (opState == OperationState.ERROR) { + opException = new HiveSQLException(resp.getErrorMessage(), resp.getSqlState(), resp.getErrorCode()); + } + return new OperationStatus(opState, opException); } catch (HiveSQLException e) { throw e; } catch (Exception e) { Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original) +++ hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Wed Dec 18 01:48:24 2013 @@ -20,6 +20,7 @@ package org.apache.hive.service.cli; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assert.assertTrue; @@ -28,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,8 +58,8 @@ public abstract class CLIServiceTest { @Test public void openSessionTest() throws Exception { - SessionHandle sessionHandle = client - .openSession("tom", "password", Collections.<String, String>emptyMap()); + SessionHandle sessionHandle = client.openSession( + "tom", "password", Collections.<String, String>emptyMap()); assertNotNull(sessionHandle); client.closeSession(sessionHandle); @@ -68,8 +70,9 @@ public abstract class CLIServiceTest { @Test public void getFunctionsTest() throws Exception { - SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap<String, String>()); + SessionHandle sessionHandle = client.openSession("tom", "password"); assertNotNull(sessionHandle); + OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*"); TableSchema schema = client.getResultSetMetadata(opHandle); @@ -97,13 +100,15 @@ public abstract class CLIServiceTest { assertEquals("SPECIFIC_NAME", columnDesc.getName()); assertEquals(Type.STRING_TYPE, columnDesc.getType()); + // Cleanup client.closeOperation(opHandle); client.closeSession(sessionHandle); } @Test public void getInfoTest() throws Exception { - SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap<String, String>()); + SessionHandle sessionHandle = client.openSession( + "tom", "password", Collections.<String, String>emptyMap()); assertNotNull(sessionHandle); GetInfoValue value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_NAME); @@ -121,30 +126,39 @@ public abstract class CLIServiceTest { @Test public void testExecuteStatement() throws Exception { HashMap<String, String> confOverlay = new HashMap<String, String>(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap<String, String>()); + SessionHandle sessionHandle = client.openSession( + "tom", "password", new HashMap<String, String>()); assertNotNull(sessionHandle); - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); + OperationHandle opHandle; + + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); - // Drop the table if it exists queryString = "DROP TABLE IF EXISTS TEST_EXEC"; - client.executeStatement(sessionHandle, queryString, confOverlay); + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Create a test table queryString = "CREATE TABLE TEST_EXEC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Blocking execute queryString = "SELECT ID FROM TEST_EXEC"; - OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay); - + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); // Expect query to be completed now assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(ophandle)); + OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); + client.closeOperation(opHandle); + + // Cleanup + queryString = "DROP TABLE IF EXISTS TEST_EXEC"; + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); + client.closeSession(sessionHandle); } @Test @@ -156,32 +170,40 @@ public abstract class CLIServiceTest { long pollTimeout = System.currentTimeMillis() + 100000; assertNotNull(sessionHandle); OperationState state = null; - OperationHandle ophandle = null; + OperationHandle opHandle; + OperationStatus opStatus = null; // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Drop the table if it exists queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - client.executeStatement(sessionHandle, queryString, confOverlay); + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Create a test table queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); // Test async execution response when query is malformed - String wrongQuery = "SELECT NAME FROM TEST_EXEC"; + // Compile time error + // This query will error out during compilation (which is done synchronous as of now) + String wrongQueryString = "SELECT NON_EXISTANT_COLUMN FROM TEST_EXEC_ASYNC"; try { - ophandle = client.executeStatementAsync(sessionHandle, wrongQuery, confOverlay); + opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); fail("Async syntax excution should fail"); } catch (HiveSQLException e) { // expected error } + - wrongQuery = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://fooNN:10000/a/b/c'"; - ophandle = client.executeStatementAsync(sessionHandle, wrongQuery, confOverlay); + // Runtime error + wrongQueryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://fooNN:10000/a/b/c'"; + opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); int count = 0; while (true) { @@ -190,25 +212,28 @@ public abstract class CLIServiceTest { System.out.println("Polling timed out"); break; } - state = client.getOperationStatus(ophandle); - System.out.println("Polling: " + ophandle + " count=" + (++count) + opStatus = client.getOperationStatus(opHandle); + state = opStatus.getState(); + System.out.println("Polling: " + opHandle + " count=" + (++count) + " state=" + state); - if (OperationState.CANCELED == state || state == OperationState.CLOSED + if (state == OperationState.CANCELED || state == OperationState.CLOSED || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } Thread.sleep(1000); } - assertEquals("Query should return an error state", - OperationState.ERROR, client.getOperationStatus(ophandle)); - + assertEquals("Operation should be in error state", OperationState.ERROR, state); + // sqlState, errorCode should be set + assertEquals(opStatus.getOperationException().getSQLState(), "08S01"); + assertEquals(opStatus.getOperationException().getErrorCode(), 1); + client.closeOperation(opHandle); + // Test async execution when query is well formed queryString = "SELECT ID FROM TEST_EXEC_ASYNC"; - ophandle = - client.executeStatementAsync(sessionHandle, queryString, confOverlay); - - assertTrue(ophandle.hasResultSet()); + opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + assertTrue(opHandle.hasResultSet()); + count = 0; while (true) { // Break if polling times out @@ -216,26 +241,33 @@ public abstract class CLIServiceTest { System.out.println("Polling timed out"); break; } - state = client.getOperationStatus(ophandle); - System.out.println("Polling: " + ophandle + " count=" + (++count) + opStatus = client.getOperationStatus(opHandle); + state = opStatus.getState(); + System.out.println("Polling: " + opHandle + " count=" + (++count) + " state=" + state); - if (OperationState.CANCELED == state || state == OperationState.CLOSED + if (state == OperationState.CANCELED || state == OperationState.CLOSED || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } Thread.sleep(1000); } - assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(ophandle)); + assertEquals("Query should be finished", OperationState.FINISHED, state); + client.closeOperation(opHandle); // Cancellation test - ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - System.out.println("cancelling " + ophandle); - client.cancelOperation(ophandle); - state = client.getOperationStatus(ophandle); - System.out.println(ophandle + " after cancelling, state= " + state); + opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + System.out.println("cancelling " + opHandle); + client.cancelOperation(opHandle); + state = client.getOperationStatus(opHandle).getState(); + System.out.println(opHandle + " after cancelling, state= " + state); assertEquals("Query should be cancelled", OperationState.CANCELED, state); + + // Cleanup + queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; + opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeOperation(opHandle); + client.closeSession(sessionHandle); } /** @@ -271,7 +303,7 @@ public abstract class CLIServiceTest { assertNotNull(opHandle); // query should pass and create the table assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle)); + OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); client.closeOperation(opHandle); // select from the new table should pass @@ -280,10 +312,10 @@ public abstract class CLIServiceTest { assertNotNull(opHandle); // query should pass and create the table assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle)); + OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); client.closeOperation(opHandle); - // the settings in confoverly should not be part of session config + // the settings in conf overlay should not be part of session config // another query referring that property with the conf overlay should fail selectTab = "SELECT * FROM ${hiveconf:" + tabNameVar + "}"; try { @@ -297,8 +329,6 @@ public abstract class CLIServiceTest { dropTable = "DROP TABLE IF EXISTS " + tabName; opHandle = client.executeStatement(sessionHandle, dropTable, null); client.closeOperation(opHandle); - - client.closeSession(sessionHandle); } } Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java?rev=1551801&r1=1551800&r2=1551801&view=diff ============================================================================== --- hive/trunk/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java (original) +++ hive/trunk/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java Wed Dec 18 01:48:24 2013 @@ -31,12 +31,12 @@ import javax.security.auth.login.LoginEx import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hive.service.Service; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.PlainSaslHelper; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationState; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.session.SessionManager; @@ -67,8 +67,6 @@ public abstract class ThriftCLIServiceTe protected static String anonymousUser = "anonymous"; protected static String anonymousPasswd = "anonymous"; - - /** * @throws java.lang.Exception */ @@ -162,6 +160,10 @@ public abstract class ThriftCLIServiceTe client.CloseSession(closeReq); } + /** + * Test synchronous query execution + * @throws Exception + */ @Test public void testExecuteStatement() throws Exception { // Create a new request object @@ -172,19 +174,19 @@ public abstract class ThriftCLIServiceTe // Change lock manager to embedded mode String queryString = "SET hive.lock.manager=" + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - executeQuerySync(queryString, sessHandle); + executeQuery(queryString, sessHandle, false); // Drop the table if it exists queryString = "DROP TABLE IF EXISTS TEST_EXEC_THRIFT"; - executeQuerySync(queryString, sessHandle); + executeQuery(queryString, sessHandle, false); // Create a test table queryString = "CREATE TABLE TEST_EXEC_THRIFT(ID STRING)"; - executeQuerySync(queryString, sessHandle); + executeQuery(queryString, sessHandle, false); - // Execute another query to test + // Execute another query queryString = "SELECT ID FROM TEST_EXEC_THRIFT"; - TExecuteStatementResp execResp = executeQuerySync(queryString, sessHandle); + TExecuteStatementResp execResp = executeQuery(queryString, sessHandle, false); TOperationHandle operationHandle = execResp.getOperationHandle(); assertNotNull(operationHandle); @@ -192,28 +194,133 @@ public abstract class ThriftCLIServiceTe opStatusReq.setOperationHandle(operationHandle); assertNotNull(opStatusReq); TGetOperationStatusResp opStatusResp = client.GetOperationStatus(opStatusReq); - + TOperationState state = opStatusResp.getOperationState(); // Expect query to be completed now - assertEquals("Query should be finished", - OperationState.FINISHED, OperationState.getOperationState(opStatusResp.getOperationState())); + assertEquals("Query should be finished", TOperationState.FINISHED_STATE, state); + // Cleanup queryString = "DROP TABLE TEST_EXEC_THRIFT"; - executeQuerySync(queryString, sessHandle); + executeQuery(queryString, sessHandle, false); + + // Close the session; ignore exception if any + TCloseSessionReq closeReq = new TCloseSessionReq(sessHandle); + client.CloseSession(closeReq); + } + + /** + * Test asynchronous query execution and error message reporting to the client + * @throws Exception + */ + @Test + public void testExecuteStatementAsync() throws Exception { + // Create a new request object + TOpenSessionReq openReq = new TOpenSessionReq(); + TSessionHandle sessHandle = client.OpenSession(openReq).getSessionHandle(); + assertNotNull(sessHandle); + + // Change lock manager to embedded mode + String queryString = "SET hive.lock.manager=" + + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; + executeQuery(queryString, sessHandle, false); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC_THRIFT"; + executeQuery(queryString, sessHandle, false); + + // Create a test table + queryString = "CREATE TABLE TEST_EXEC_ASYNC_THRIFT(ID STRING)"; + executeQuery(queryString, sessHandle, false); + + // Execute another query + queryString = "SELECT ID FROM TEST_EXEC_ASYNC_THRIFT"; + System.out.println("Will attempt to execute: " + queryString); + TExecuteStatementResp execResp = executeQuery(queryString, sessHandle, true); + TOperationHandle operationHandle = execResp.getOperationHandle(); + assertNotNull(operationHandle); + + // Poll on the operation status till the query is completed + boolean isQueryRunning = true; + TGetOperationStatusReq opStatusReq; + TGetOperationStatusResp opStatusResp = null; + TOperationState state = null; + long pollTimeout = System.currentTimeMillis() + 100000; + + while(isQueryRunning) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + System.out.println("Polling timed out"); + break; + } + opStatusReq = new TGetOperationStatusReq(); + opStatusReq.setOperationHandle(operationHandle); + assertNotNull(opStatusReq); + opStatusResp = client.GetOperationStatus(opStatusReq); + state = opStatusResp.getOperationState(); + System.out.println("Current state: " + state); + + if (state == TOperationState.CANCELED_STATE || state == TOperationState.CLOSED_STATE + || state == TOperationState.FINISHED_STATE || state == TOperationState.ERROR_STATE) { + isQueryRunning = false; + } + Thread.sleep(1000); + } + + // Expect query to be successfully completed now + assertEquals("Query should be finished", + TOperationState.FINISHED_STATE, state); + + // Execute a malformed query + // This query will give a runtime error + queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://fooNN:10000/a/b/c'"; + System.out.println("Will attempt to execute: " + queryString); + execResp = executeQuery(queryString, sessHandle, true); + operationHandle = execResp.getOperationHandle(); + assertNotNull(operationHandle); + isQueryRunning = true; + while(isQueryRunning) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + System.out.println("Polling timed out"); + break; + } + opStatusReq = new TGetOperationStatusReq(); + opStatusReq.setOperationHandle(operationHandle); + assertNotNull(opStatusReq); + opStatusResp = client.GetOperationStatus(opStatusReq); + state = opStatusResp.getOperationState(); + System.out.println("Current state: " + state); + + if (state == TOperationState.CANCELED_STATE || state == TOperationState.CLOSED_STATE + || state == TOperationState.FINISHED_STATE || state == TOperationState.ERROR_STATE) { + isQueryRunning = false; + } + Thread.sleep(1000); + } + + // Expect query to return an error state + assertEquals("Operation should be in error state", TOperationState.ERROR_STATE, state); + + // sqlState, errorCode should be set to appropriate values + assertEquals(opStatusResp.getSqlState(), "08S01"); + assertEquals(opStatusResp.getErrorCode(), 1); + + // Cleanup + queryString = "DROP TABLE TEST_EXEC_ASYNC_THRIFT"; + executeQuery(queryString, sessHandle, false); // Close the session; ignore exception if any TCloseSessionReq closeReq = new TCloseSessionReq(sessHandle); client.CloseSession(closeReq); } - private TExecuteStatementResp executeQuerySync(String queryString, TSessionHandle sessHandle) + private TExecuteStatementResp executeQuery(String queryString, TSessionHandle sessHandle, boolean runAsync) throws Exception { TExecuteStatementReq execReq = new TExecuteStatementReq(); execReq.setSessionHandle(sessHandle); execReq.setStatement(queryString); - execReq.setRunAsync(false); + execReq.setRunAsync(runAsync); TExecuteStatementResp execResp = client.ExecuteStatement(execReq); assertNotNull(execResp); - assertFalse(execResp.getStatus().getStatusCode() == TStatusCode.ERROR_STATUS); return execResp; }
