This is an automated email from the ASF dual-hosted git repository. dmollitor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 26945ee HIVE-23601: Hive Statement Clear Statement Handle on Error (David Mollitor, reviewed by Peter Vary) 26945ee is described below commit 26945ee16e3caa1bed1e25fab3a07b5190b441ac Author: belugabehr <12578579+belugab...@users.noreply.github.com> AuthorDate: Thu Jun 11 10:30:06 2020 -0400 HIVE-23601: Hive Statement Clear Statement Handle on Error (David Mollitor, reviewed by Peter Vary) --- .../java/org/apache/hive/jdbc/HiveStatement.java | 64 ++++++++++++---------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index ae60c32..6c90cef 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -73,7 +73,7 @@ public class HiveStatement implements java.sql.Statement { private final HiveConnection connection; private TCLIService.Iface client; - private TOperationHandle stmtHandle = null; + private Optional<TOperationHandle> stmtHandle; private final TSessionHandle sessHandle; Map<String, String> sessConf = new HashMap<>(); private int fetchSize; @@ -145,6 +145,7 @@ public class HiveStatement implements java.sql.Statement { this.defaultFetchSize = defaultFetchSize; this.fetchSize = (initFetchSize == 0) ? defaultFetchSize : initFetchSize; this.inPlaceUpdateStream = Optional.empty(); + this.stmtHandle = Optional.empty(); } @Override @@ -160,8 +161,8 @@ public class HiveStatement implements java.sql.Statement { } try { - if (stmtHandle != null) { - TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle); + if (stmtHandle.isPresent()) { + TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle.get()); TCancelOperationResp cancelResp = client.CancelOperation(cancelReq); Utils.verifySuccessWithInfo(cancelResp.getStatus()); } @@ -189,11 +190,10 @@ public class HiveStatement implements java.sql.Statement { */ private void closeStatementIfNeeded() throws SQLException { try { - if (stmtHandle != null) { - TCloseOperationReq closeReq = new TCloseOperationReq(stmtHandle); + if (stmtHandle.isPresent()) { + TCloseOperationReq closeReq = new TCloseOperationReq(stmtHandle.get()); TCloseOperationResp closeResp = client.CloseOperation(closeReq); Utils.verifySuccessWithInfo(closeResp.getStatus()); - stmtHandle = null; } } catch (SQLException e) { throw e; @@ -207,13 +207,17 @@ public class HiveStatement implements java.sql.Statement { throw new SQLException(errorMsg, "08S01", tae); } catch (Exception e) { throw new SQLException("Failed to close statement", "08S01", e); + } finally { + stmtHandle = Optional.empty(); } } void closeClientOperation() throws SQLException { - closeStatementIfNeeded(); - isQueryClosed = true; - stmtHandle = null; + try { + closeStatementIfNeeded(); + } finally { + isQueryClosed = true; + } } void closeOnResultSetCompletion() throws SQLException { @@ -248,11 +252,11 @@ public class HiveStatement implements java.sql.Statement { TGetOperationStatusResp status = waitForOperationToComplete(); // The query should be completed by now - if (!status.isHasResultSet() && !stmtHandle.isHasResultSet()) { + if (!status.isHasResultSet() && stmtHandle.isPresent() && !stmtHandle.get().isHasResultSet()) { return false; } resultSet = new HiveQueryResultSet.Builder(this).setClient(client) - .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) + .setStmtHandle(stmtHandle.get()).setMaxRows(maxRows).setFetchSize(fetchSize) .setScrollable(isScrollableResultset) .build(); return true; @@ -281,7 +285,7 @@ public class HiveStatement implements java.sql.Statement { } resultSet = new HiveQueryResultSet.Builder(this).setClient(client) - .setStmtHandle(stmtHandle).setMaxRows(maxRows) + .setStmtHandle(stmtHandle.get()).setMaxRows(maxRows) .setFetchSize(fetchSize).setScrollable(isScrollableResultset) .build(); return true; @@ -305,7 +309,7 @@ public class HiveStatement implements java.sql.Statement { try { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); - stmtHandle = execResp.getOperationHandle(); + stmtHandle = Optional.of(execResp.getOperationHandle()); } catch (SQLException eS) { isLogBeingGenerated = false; throw eS; @@ -321,7 +325,7 @@ public class HiveStatement implements java.sql.Statement { * @throws SQLException */ private TGetOperationStatusResp waitForResultSetStatus() throws SQLException { - TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); + TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle.get()); TGetOperationStatusResp statusResp = null; while (statusResp == null || !statusResp.isSetHasResultSet()) { @@ -339,7 +343,7 @@ public class HiveStatement implements java.sql.Statement { TGetOperationStatusResp waitForOperationToComplete() throws SQLException { TGetOperationStatusResp statusResp = null; - final TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); + final TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle.get()); statusReq.setGetProgressUpdate(inPlaceUpdateStream.isPresent()); // Progress bar is completed if there is nothing to request @@ -413,11 +417,14 @@ public class HiveStatement implements java.sql.Statement { * @throws SQLException */ private void reInitState() throws SQLException { - closeStatementIfNeeded(); - isCancelled = false; - isQueryClosed = false; - isLogBeingGenerated = true; - isOperationComplete = false; + try { + closeStatementIfNeeded(); + } finally { + isCancelled = false; + isQueryClosed = false; + isLogBeingGenerated = true; + isOperationComplete = false; + } } @Override @@ -694,8 +701,8 @@ public class HiveStatement implements java.sql.Statement { TFetchResultsResp tFetchResultsResp = null; try { - if (stmtHandle != null) { - TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle, + if (stmtHandle.isPresent()) { + TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle.get(), getFetchOrientation(incremental), fetchSize); tFetchResultsReq.setFetchType((short)1); tFetchResultsResp = client.FetchResults(tFetchResultsReq); @@ -738,12 +745,11 @@ public class HiveStatement implements java.sql.Statement { * @return Yarn ATS GUID or null if it hasn't been created yet. */ public String getYarnATSGuid() { - if (stmtHandle != null) { - // Set on the server side. - // @see org.apache.hive.service.cli.operation.SQLOperation#prepare - return Base64.getUrlEncoder().encodeToString(stmtHandle.getOperationId().getGuid()).trim(); - } - return null; + // Set on the server side. + // @see org.apache.hive.service.cli.operation.SQLOperation#prepare + return (stmtHandle.isPresent()) + ? Base64.getUrlEncoder().encodeToString(stmtHandle.get().getOperationId().getGuid()).trim() + : null; } /** @@ -766,7 +772,7 @@ public class HiveStatement implements java.sql.Statement { public String getQueryId() throws SQLException { // Storing it in temp variable as this method is not thread-safe and concurrent thread can // close this handle and set it to null after checking for null. - TOperationHandle stmtHandleTmp = stmtHandle; + TOperationHandle stmtHandleTmp = stmtHandle.orElse(null); if (stmtHandleTmp == null) { // If query is not running or already closed. return null;