Modified: hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1571600&r1=1571599&r2=1571600&view=diff ============================================================================== --- hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original) +++ hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Tue Feb 25 07:58:52 2014 @@ -22,14 +22,13 @@ import static org.junit.Assert.assertEqu import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -57,7 +56,7 @@ public abstract class CLIServiceTest { } @Test - public void openSessionTest() throws Exception { + public void testOpenSession() throws Exception { SessionHandle sessionHandle = client.openSession( "tom", "password", Collections.<String, String>emptyMap()); assertNotNull(sessionHandle); @@ -69,7 +68,7 @@ public abstract class CLIServiceTest { } @Test - public void getFunctionsTest() throws Exception { + public void testGetFunctions() throws Exception { SessionHandle sessionHandle = client.openSession("tom", "password"); assertNotNull(sessionHandle); @@ -106,7 +105,7 @@ public abstract class CLIServiceTest { } @Test - public void getInfoTest() throws Exception { + public void testGetInfo() throws Exception { SessionHandle sessionHandle = client.openSession( "tom", "password", Collections.<String, String>emptyMap()); assertNotNull(sessionHandle); @@ -123,6 +122,10 @@ public abstract class CLIServiceTest { client.closeSession(sessionHandle); } + /** + * Test the blocking execution of a query + * @throws Exception + */ @Test public void testExecuteStatement() throws Exception { HashMap<String, String> confOverlay = new HashMap<String, String>(); @@ -161,113 +164,171 @@ public abstract class CLIServiceTest { client.closeSession(sessionHandle); } + /** + * Test async execution of a well-formed and a malformed query with different long polling durations + * - Test malformed query with default long polling timeout + * - Test well-formed query with default long polling timeout + * - Test well-formed query with long polling timeout set to 0 + * - Test well-formed query with long polling timeout set to 500 millis + * - Test well-formed query cancellation + * @throws Exception + */ @Test public void testExecuteStatementAsync() throws Exception { - HashMap<String, String> confOverlay = new HashMap<String, String>(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap<String, String>()); - // Timeout for the poll in case of asynchronous execute - long pollTimeout = System.currentTimeMillis() + 100000; + Map<String, String> confOverlay = new HashMap<String, String>(); + String tableName = "TEST_EXEC_ASYNC"; + String columnDefinitions = "(ID STRING)"; + String queryString; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); assertNotNull(sessionHandle); + OperationState state = null; OperationHandle opHandle; OperationStatus opStatus = null; // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + " = false"; opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); client.closeOperation(opHandle); - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); - client.closeOperation(opHandle); + // Set longPollingTimeout to a custom value for different test cases + long longPollingTimeout; - // Create a test table - queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; - opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); - client.closeOperation(opHandle); - - // Test async execution response when query is malformed - // Compile time error - // This query will error out during compilation (which is done synchronous as of now) - String wrongQueryString = "SELECT NON_EXISTANT_COLUMN FROM TEST_EXEC_ASYNC"; + /** + * Execute a malformed async query with default config, + * to give a compile time error. + * (compilation is done synchronous as of now) + */ + longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; try { - opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); - fail("Async syntax excution should fail"); - } catch (HiveSQLException e) { + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); + } + catch (HiveSQLException e) { // expected error } - - - // Runtime error - wrongQueryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://fooNN:10000/a/b/c'"; - opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); - - int count = 0; - while (true) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { - System.out.println("Polling timed out"); - break; - } - opStatus = client.getOperationStatus(opHandle); - state = opStatus.getState(); - System.out.println("Polling: " + opHandle + " count=" + (++count) - + " state=" + state); - if (state == OperationState.CANCELED || state == OperationState.CLOSED - || state == OperationState.FINISHED || state == OperationState.ERROR) { - break; - } - Thread.sleep(1000); - } - assertEquals("Operation should be in error state", OperationState.ERROR, state); + /** + * Execute a malformed async query with default config, + * to give a runtime time error. + * Also check that the sqlState and errorCode should be set + */ + queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://localhost:10000/a/b/c'"; + opStatus = runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); // sqlState, errorCode should be set assertEquals(opStatus.getOperationException().getSQLState(), "08S01"); assertEquals(opStatus.getOperationException().getErrorCode(), 1); - client.closeOperation(opHandle); - - // Test async execution when query is well formed - queryString = "SELECT ID FROM TEST_EXEC_ASYNC"; + /** + * Execute an async query with default config + */ + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + /** + * Execute an async query with long polling timeout set to 0 + */ + longPollingTimeout = 0; + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + /** + * Execute an async query with long polling timeout set to 500 millis + */ + longPollingTimeout = 500; + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + /** + * Cancellation test + */ + queryString = "SELECT ID FROM " + tableName; opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - assertTrue(opHandle.hasResultSet()); - - count = 0; + System.out.println("Cancelling " + opHandle); + client.cancelOperation(opHandle); + state = client.getOperationStatus(opHandle).getState(); + System.out.println(opHandle + " after cancelling, state= " + state); + assertEquals("Query should be cancelled", OperationState.CANCELED, state); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); + } + + /** + * Sets up a test specific table with the given column definitions and config + * @param tableName + * @param columnDefinitions + * @param confOverlay + * @throws Exception + */ + private SessionHandle setupTestData(String tableName, String columnDefinitions, + Map<String, String> confOverlay) throws Exception { + SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay); + assertNotNull(sessionHandle); + + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Create a test table + queryString = "CREATE TABLE " + tableName + columnDefinitions; + client.executeStatement(sessionHandle, queryString, confOverlay); + + return sessionHandle; + } + + private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryString, + Map<String, String> confOverlay, OperationState expectedState, + long longPollingTimeout) throws HiveSQLException { + // Timeout for the iteration in case of asynchronous execute + long testIterationTimeout = System.currentTimeMillis() + 100000; + long longPollingStart; + long longPollingEnd; + long longPollingTimeDelta; + OperationStatus opStatus = null; + OperationState state = null; + confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout)); + OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + int count = 0; while (true) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { + // Break if iteration times out + if (System.currentTimeMillis() > testIterationTimeout) { System.out.println("Polling timed out"); break; } + longPollingStart = System.currentTimeMillis(); + System.out.println("Long polling starts at: " + longPollingStart); opStatus = client.getOperationStatus(opHandle); state = opStatus.getState(); + longPollingEnd = System.currentTimeMillis(); + System.out.println("Long polling ends at: " + longPollingEnd); + System.out.println("Polling: " + opHandle + " count=" + (++count) + " state=" + state); - if (state == OperationState.CANCELED || state == OperationState.CLOSED - || state == OperationState.FINISHED || state == OperationState.ERROR) { + if (state == OperationState.CANCELED || + state == OperationState.CLOSED || + state == OperationState.FINISHED || + state == OperationState.ERROR) { break; + } else { + // Verify that getOperationStatus returned only after the long polling timeout + longPollingTimeDelta = longPollingEnd - longPollingStart; + // Scale down by a factor of 0.9 to account for approximate values + assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0); } - Thread.sleep(1000); } - assertEquals("Query should be finished", OperationState.FINISHED, state); - client.closeOperation(opHandle); - - // Cancellation test - opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - System.out.println("cancelling " + opHandle); - client.cancelOperation(opHandle); - state = client.getOperationStatus(opHandle).getState(); - System.out.println(opHandle + " after cancelling, state= " + state); - assertEquals("Query should be cancelled", OperationState.CANCELED, state); - - // Cleanup - queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + assertEquals(expectedState, client.getOperationStatus(opHandle).getState()); client.closeOperation(opHandle); - client.closeSession(sessionHandle); + return opStatus; } /**
