Author: brock
Date: Mon Dec 2 16:11:23 2013
New Revision: 1547080
URL: http://svn.apache.org/r1547080
Log:
HIVE-5441 - Async query execution doesn't return resultset status (Prasad
Mujumdar via Thejas M Nair)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1547080&r1=1547079&r2=1547080&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Mon Dec 2
16:11:23 2013
@@ -901,8 +901,19 @@ public class Driver implements CommandPr
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
}
- public CommandProcessorResponse run(String command) throws
CommandNeedRetryException {
- CommandProcessorResponse cpr = runInternal(command);
+ public CommandProcessorResponse run(String command)
+ throws CommandNeedRetryException {
+ return run(command, false);
+ }
+
+ public CommandProcessorResponse run()
+ throws CommandNeedRetryException {
+ return run(null, true);
+ }
+
+ public CommandProcessorResponse run(String command, boolean alreadyCompiled)
+ throws CommandNeedRetryException {
+ CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
if(cpr.getResponseCode() == 0) {
return cpr;
}
@@ -957,7 +968,25 @@ public class Driver implements CommandPr
}
return cpr;
}
- private CommandProcessorResponse runInternal(String command) throws
CommandNeedRetryException {
+
+ public CommandProcessorResponse compileAndRespond(String command) {
+ return new CommandProcessorResponse(compileInternal(command),
+ errorMessage, SQLState);
+ }
+
+ private int compileInternal(String command) {
+ int ret;
+ synchronized (compileMonitor) {
+ ret = compile(command);
+ }
+ if (ret != 0) {
+ releaseLocks(ctx.getHiveLocks());
+ }
+ return ret;
+ }
+
+ private CommandProcessorResponse runInternal(String command, boolean
alreadyCompiled)
+ throws CommandNeedRetryException {
errorMessage = null;
SQLState = null;
downstreamError = null;
@@ -990,12 +1019,11 @@ public class Driver implements CommandPr
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
int ret;
- synchronized (compileMonitor) {
- ret = compile(command);
- }
- if (ret != 0) {
- releaseLocks(ctx.getHiveLocks());
- return new CommandProcessorResponse(ret, errorMessage, SQLState);
+ if (!alreadyCompiled) {
+ ret = compileInternal(command);
+ if (ret != 0) {
+ return new CommandProcessorResponse(ret, errorMessage, SQLState);
+ }
}
boolean requireLock = false;
Modified:
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL:
http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1547080&r1=1547079&r2=1547080&view=diff
==============================================================================
---
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
(original)
+++
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
Mon Dec 2 16:11:23 2013
@@ -76,19 +76,13 @@ public class SQLOperation extends Execut
this.runAsync = runInBackground;
}
-
- public void prepare() throws HiveSQLException {
- }
-
- private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException {
+ /***
+ * Compile the query and extract metadata
+ * @param sqlOperationConf
+ * @throws HiveSQLException
+ */
+ public void prepare(HiveConf sqlOperationConf) throws HiveSQLException {
setState(OperationState.RUNNING);
- String statement_trimmed = statement.trim();
- String[] tokens = statement_trimmed.split("\\s");
- String cmd_1 = statement_trimmed.substring(tokens[0].length()).trim();
-
- int ret = 0;
- String errorMessage = "";
- String SQLState = null;
try {
driver = new Driver(sqlOperationConf, getParentSession().getUserName());
@@ -98,10 +92,9 @@ public class SQLOperation extends Execut
driver.setTryCount(Integer.MAX_VALUE);
String subStatement = new
VariableSubstitution().substitute(sqlOperationConf, statement);
-
- response = driver.run(subStatement);
+ response = driver.compileAndRespond(subStatement);
if (0 != response.getResponseCode()) {
- throw new HiveSQLException("Error while processing statement: "
+ throw new HiveSQLException("Error while compiling statement: "
+ response.getErrorMessage(), response.getSQLState(),
response.getResponseCode());
}
@@ -112,7 +105,7 @@ public class SQLOperation extends Execut
if(driver.getPlan().getFetchTask() != null) {
//Schema has to be set
if (mResultSchema == null || !mResultSchema.isSetFieldSchemas()) {
- throw new HiveSQLException("Error running query: Schema and
FieldSchema " +
+ throw new HiveSQLException("Error compiling query: Schema and
FieldSchema " +
"should be set when query plan has a FetchTask");
}
resultSchema = new TableSchema(mResultSchema);
@@ -136,12 +129,35 @@ public class SQLOperation extends Execut
setState(OperationState.ERROR);
throw new HiveSQLException("Error running query: " + e.toString(), e);
}
+ }
+
+ private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException {
+ try {
+ // In Hive server mode, we are not able to retry in the FetchTask
+ // case, when calling fetch queries since execute() has returned.
+ // For now, we disable the test attempts.
+ driver.setTryCount(Integer.MAX_VALUE);
+
+ response = driver.run();
+ if (0 != response.getResponseCode()) {
+ throw new HiveSQLException("Error while processing statement: "
+ + response.getErrorMessage(), response.getSQLState(),
response.getResponseCode());
+ }
+
+ } catch (HiveSQLException e) {
+ setState(OperationState.ERROR);
+ throw e;
+ } catch (Exception e) {
+ setState(OperationState.ERROR);
+ throw new HiveSQLException("Error running query: " + e.toString(), e);
+ }
setState(OperationState.FINISHED);
}
@Override
public void run() throws HiveSQLException {
setState(OperationState.PENDING);
+ prepare(getConfigForOperation());
if (!shouldRunAsync()) {
runInternal(getConfigForOperation());
} else {
Modified:
hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL:
http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1547080&r1=1547079&r2=1547080&view=diff
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
(original)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
Mon Dec 2 16:11:23 2013
@@ -21,6 +21,7 @@ package org.apache.hive.service.cli;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
@@ -155,7 +156,7 @@ public abstract class CLIServiceTest {
long pollTimeout = System.currentTimeMillis() + 100000;
assertNotNull(sessionHandle);
OperationState state = null;
- OperationHandle ophandle;
+ OperationHandle ophandle = null;
// Change lock manager, otherwise unit-test doesn't go through
String queryString = "SET hive.lock.manager=" +
@@ -171,8 +172,16 @@ public abstract class CLIServiceTest {
client.executeStatement(sessionHandle, queryString, confOverlay);
// Test async execution response when query is malformed
- String wrongQueryString = "SELECT NAME FROM TEST_EXEC";
- ophandle = client.executeStatementAsync(sessionHandle, wrongQueryString,
confOverlay);
+ String wrongQuery = "SELECT NAME FROM TEST_EXEC";
+ try {
+ ophandle = client.executeStatementAsync(sessionHandle, wrongQuery,
confOverlay);
+ fail("Async syntax excution should fail");
+ } catch (HiveSQLException e) {
+ // expected error
+ }
+
+ wrongQuery = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location
'hdfs://fooNN:10000/a/b/c'";
+ ophandle = client.executeStatementAsync(sessionHandle, wrongQuery,
confOverlay);
int count = 0;
while (true) {
@@ -199,6 +208,7 @@ public abstract class CLIServiceTest {
ophandle =
client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ assertTrue(ophandle.hasResultSet());
count = 0;
while (true) {
// Break if polling times out