Repository: tajo Updated Branches: refs/heads/master 607fdea45 -> 4a9da73c6
TAJO-1394 Support reconnect on tsql. closes #414 Signed-off-by: Hyunsik Choi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4a9da73c Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4a9da73c Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4a9da73c Branch: refs/heads/master Commit: 4a9da73c6cc6ff670f867c13d60dd951bf8190bc Parents: 607fdea Author: navis.ryu <[email protected]> Authored: Thu Mar 12 10:57:53 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Thu Mar 12 02:46:09 2015 -0700 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/cli/tsql/TajoCli.java | 61 +++++++++++--------- .../apache/tajo/client/SessionConnection.java | 60 +++++++++++++++++++ 3 files changed, 97 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index f330a0a..39f0fc4 100644 --- a/CHANGES +++ b/CHANGES @@ -9,6 +9,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1394: Support reconnect on tsql. + (Contributed by navis, Committed by hyunsik) + TAJO-527: Upgrade to Netty 4. (jihun) TAJO-1369: Some stack trace information is missed in error/fail logging. http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index 7d7d0bd..354f60d 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -41,10 +41,7 @@ import java.io.*; import java.lang.reflect.Constructor; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; public class TajoCli { public static final String ERROR_PREFIX = "ERROR: "; @@ -60,6 +57,8 @@ public class TajoCli { private final PrintWriter sout; private TajoFileHistory history; + private final boolean reconnect; // reconnect on invalid session + // Current States private String currentDatabase; @@ -99,6 +98,7 @@ public class TajoCli { options.addOption("B", "background", false, "execute as background process"); options.addOption("conf", "conf", true, "configuration value"); options.addOption("param", "param", true, "parameter value in SQL file"); + options.addOption("reconnect", "reconnect", false, "reconnect on invalid session"); options.addOption("help", "help", false, "help"); } @@ -208,6 +208,8 @@ public class TajoCli { processConfVarCommand(cmd.getOptionValues("conf")); } + this.reconnect = cmd.hasOption("reconnect"); + // if there is no "-h" option, if(hostName == null) { if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) { @@ -467,13 +469,8 @@ public class TajoCli { try { invoked.invoke(arguments); - } catch (IllegalArgumentException ige) { - displayFormatter.printErrorMessage(sout, ige); - wasError = true; - return -1; } catch (Exception e) { - displayFormatter.printErrorMessage(sout, e); - wasError = true; + onError(null, e); return -1; } finally { context.getOutput().flush(); @@ -492,8 +489,7 @@ public class TajoCli { long startTime = System.currentTimeMillis(); ClientProtos.SubmitQueryResponse response = client.executeQueryWithJson(json); if (response == null) { - displayFormatter.printErrorMessage(sout, "response is null"); - wasError = true; + onError("response is null", null); } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { if (response.getIsForwarded()) { QueryId queryId = new QueryId(response.getQueryId()); @@ -508,8 +504,7 @@ public class TajoCli { } } else { if (response.hasErrorMessage()) { - displayFormatter.printErrorMessage(sout, response.getErrorMessage()); - wasError = true; + onError(response.getErrorMessage(), null); } } } @@ -520,17 +515,12 @@ public class TajoCli { ClientProtos.SubmitQueryResponse response = null; try{ response = client.executeQuery(statement); - } catch (ServiceException e){ - displayFormatter.printErrorMessage(sout, e.getMessage()); - wasError = true; } catch(Throwable te){ - displayFormatter.printErrorMessage(sout, te); - wasError = true; + onError(null, te); } if (response == null) { - displayFormatter.printErrorMessage(sout, "response is null"); - wasError = true; + onError("response is null", null); } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { if (response.getIsForwarded()) { QueryId queryId = new QueryId(response.getQueryId()); @@ -544,8 +534,7 @@ public class TajoCli { } } else { if (response.hasErrorMessage()) { - displayFormatter.printErrorMessage(sout, response.getErrorMessage()); - wasError = true; + onError(response.getErrorMessage(), null); } } @@ -569,13 +558,13 @@ public class TajoCli { displayFormatter.printResult(sout, sin, desc, responseTime, res); } } catch (Throwable t) { - displayFormatter.printErrorMessage(sout, t); - wasError = true; + onError(null, t); } finally { if (res != null) { try { res.close(); } catch (SQLException e) { + // ignore } } } @@ -637,8 +626,7 @@ public class TajoCli { } } } catch (Throwable t) { - displayFormatter.printErrorMessage(sout, t); - wasError = true; + onError(null, t); } finally { if (res != null) { try { @@ -668,6 +656,25 @@ public class TajoCli { sout.println("Invalid command " + command + ". Try \\? for help."); } + private void onError(String message, Throwable t) { + wasError = true; + if (t == null) { + displayFormatter.printErrorMessage(sout, message); + } else { + displayFormatter.printErrorMessage(sout, t); + } + if (reconnect && (t instanceof InvalidClientSessionException || + (message != null && message.startsWith("org.apache.tajo.session.InvalidSessionException")))) { + if (client instanceof SessionConnection) { + try { + ((SessionConnection)client).reconnect(); + } catch (Exception e) { + // ignore + } + } + } + } + @VisibleForTesting public void close() { //for testcase http://git-wip-us.apache.org/repos/asf/tajo/blob/4a9da73c/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index bcf6d8b..d05d3b1 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -21,6 +21,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.auth.UserRoleInfo; @@ -319,6 +320,65 @@ public class SessionConnection implements Closeable { } } + public boolean reconnect() throws Exception { + return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); + builder.setUsername(userInfo.getUserName()).build(); + if (baseDatabase != null) { + builder.setBaseDatabaseName(baseDatabase); + } + + + // create new session + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); + if (response.getResultCode() != ResultCode.OK) { + return false; + } + + // Invalidate some session variables in client cache + sessionId = response.getSessionId(); + Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars()); + synchronized (sessionVarsCache) { + for (SessionVars var : UPDATE_ON_RECONNECT) { + String value = sessionVars.get(var.keyname()); + if (value != null) { + sessionVarsCache.put(var.keyname(), value); + } + } + } + + // Update the session variables in server side + try { + KeyValueSet keyValueSet = new KeyValueSet(); + keyValueSet.putAll(sessionVarsCache); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .setSessionVars(keyValueSet.getProto()).build(); + + if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) { + tajoMasterService.removeSession(null, sessionId); + return false; + } + LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); + return true; + } catch (ServiceException e) { + tajoMasterService.removeSession(null, sessionId); + return false; + } + } + }.withRetries(); + } + + /** + * Session variables which should be updated upon reconnecting + */ + private static final SessionVars[] UPDATE_ON_RECONNECT = new SessionVars[] { + SessionVars.SESSION_ID, SessionVars.SESSION_LAST_ACCESS_TIME, SessionVars.CLIENT_HOST + }; + ClientProtos.SessionedStringProto convertSessionedString(String str) { ClientProtos.SessionedStringProto.Builder builder = ClientProtos.SessionedStringProto.newBuilder(); builder.setSessionId(sessionId);
