Repository: tajo Updated Branches: refs/heads/master c2aef7d36 -> 61d1d3304
TAJO-1700: Add better exception handling in TajoMasterClientService. Closes #644 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/61d1d330 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/61d1d330 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/61d1d330 Branch: refs/heads/master Commit: 61d1d33047f316cf032714e4dd9bd7e145cd9ade Parents: c2aef7d Author: Hyunsik Choi <[email protected]> Authored: Fri Jul 24 11:23:08 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Fri Jul 24 11:23:31 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../apache/tajo/client/SessionConnection.java | 17 ++- .../main/proto/TajoMasterClientProtocol.proto | 2 +- .../tajo/master/TajoMasterClientService.java | 105 +++++++++++++++---- 4 files changed, 101 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/61d1d330/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 1c79c0d..6a75506 100644 --- a/CHANGES +++ b/CHANGES @@ -32,6 +32,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1700: Add better exception handling in TajoMasterClientService. + (hyunsik) + TAJO-1343: Improve the memory usage of physical executors. (jihoon) TAJO-1696: Resource calculator should consider the requested disk resource http://git-wip-us.apache.org/repos/asf/tajo/blob/61d1d330/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 788d193..f875335 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -36,7 +36,9 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.CommonTestingUtil; @@ -54,8 +56,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE; import static org.apache.tajo.exception.ReturnStateUtil.isError; import static org.apache.tajo.exception.ReturnStateUtil.isSuccess; +import static org.apache.tajo.exception.ReturnStateUtil.isThisError; import static org.apache.tajo.exception.SQLExceptionUtil.toSQLException; import static org.apache.tajo.exception.SQLExceptionUtil.throwIfError; import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; @@ -278,9 +282,18 @@ public class SessionConnection implements Closeable { public Boolean existSessionVariable(final String varname) throws SQLException { - BlockingInterface stub = getTMStub(); try { - return isSuccess(stub.existSessionVariable(null, getSessionedString(varname))); + final BlockingInterface stub = getTMStub(); + ReturnState state = stub.existSessionVariable(null, getSessionedString(varname)); + + if (isThisError(state, NO_SUCH_SESSION_VARIABLE)) { + return false; + } else if (isError(state)){ + throw SQLExceptionUtil.toSQLException(state); + } + + return isSuccess(state); + } catch (ServiceException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/tajo/blob/61d1d330/tajo-client/src/main/proto/TajoMasterClientProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto index 1dcf1ac..57bb2db 100644 --- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto @@ -37,7 +37,7 @@ service TajoMasterClientProtocolService { rpc removeSession(SessionIdProto) returns (ReturnState); rpc updateSessionVariables(UpdateSessionVariableRequest) returns (SessionUpdateResponse); rpc existSessionVariable(SessionedStringProto) returns (ReturnState); - rpc getSessionVariable(SessionedStringProto) returns (StringProto); + rpc getSessionVariable(SessionedStringProto) returns (StringResponse); rpc getAllSessionVariables(SessionIdProto) returns (KeyValueSetResponse); // Query Submission and Result APIs http://git-wip-us.apache.org/repos/asf/tajo/blob/61d1d330/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 6fbe968..baf1320 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -41,6 +41,9 @@ import org.apache.tajo.catalog.proto.CatalogProtos.TableResponse; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.ErrorUtil; +import org.apache.tajo.exception.ExceptionUtil; +import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; @@ -65,8 +68,12 @@ import java.net.InetSocketAddress; import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.exception.ExceptionUtil.printStackTraceIfError; import static org.apache.tajo.exception.ReturnStateUtil.*; +/** + * It provides Client Remote API service for TajoMaster. + */ public class TajoMasterClientService extends AbstractService { private final static Log LOG = LogFactory.getLog(TajoMasterClientService.class); private final MasterContext context; @@ -76,8 +83,6 @@ public class TajoMasterClientService extends AbstractService { private BlockingRpcServer server; private InetSocketAddress bindAddress; - private final BoolProto BOOL_TRUE = - BoolProto.newBuilder().setValue(true).build(); public TajoMasterClientService(MasterContext context) { super(TajoMasterClientService.class.getName()); @@ -140,6 +145,9 @@ public class TajoMasterClientService extends AbstractService { return builder.build(); } catch (Throwable t) { + + printStackTraceIfError(LOG, t); + CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder(); builder.setState(returnError(t)); return builder.build(); @@ -150,8 +158,13 @@ public class TajoMasterClientService extends AbstractService { public ReturnState removeSession(RpcController controller, TajoIdProtos.SessionIdProto request) throws ServiceException { - if (request != null) { - context.getSessionManager().removeSession(request.getId()); + try { + if (request != null) { + context.getSessionManager().removeSession(request.getId()); + } + } catch (Throwable t) { + printStackTraceIfError(LOG, t); + return ReturnStateUtil.returnError(t); } return OK; @@ -179,20 +192,29 @@ public class TajoMasterClientService extends AbstractService { return builder.build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); return builder.build(); } } @Override - public StringProto getSessionVariable(RpcController controller, SessionedStringProto request) + public StringResponse getSessionVariable(RpcController controller, SessionedStringProto request) throws ServiceException { try { - return ProtoUtil.convertString( - context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue())); + String value = context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue()); + + return StringResponse.newBuilder() + .setState(OK) + .setValue(value) + .build(); + } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + return StringResponse.newBuilder() + .setState(returnError(t)) + .build(); } } @@ -207,6 +229,7 @@ public class TajoMasterClientService extends AbstractService { return errNoSessionVar(request.getValue()); } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -227,6 +250,7 @@ public class TajoMasterClientService extends AbstractService { .build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return KeyValueSetResponse.newBuilder() .setState(returnError(t)) .build(); @@ -243,6 +267,7 @@ public class TajoMasterClientService extends AbstractService { .build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return StringResponse.newBuilder() .setState(returnError(t)) .build(); @@ -262,6 +287,7 @@ public class TajoMasterClientService extends AbstractService { return errUndefinedDatabase(databaseName); } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -277,10 +303,11 @@ public class TajoMasterClientService extends AbstractService { return context.getGlobalEngine().executeQuery(session, request.getQuery(), request.getIsJson()); - } catch (Exception e) { + } catch (Throwable t) { + printStackTraceIfError(LOG, t); return ClientProtos.SubmitQueryResponse.newBuilder() - .setState(returnError(e)) + .setState(returnError(t)) .setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()) .setIsForwarded(true) .setUserName(context.getConf().getVar(ConfVars.USERNAME)) @@ -300,6 +327,7 @@ public class TajoMasterClientService extends AbstractService { builder.setState(OK); } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); } return builder.build(); @@ -351,20 +379,22 @@ public class TajoMasterClientService extends AbstractService { builder.setState(errIncompleteQuery(queryId)); } - return builder.build(); } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + builder.setState(returnError(t)); } + + return builder.build(); } @Override public GetQueryListResponse getRunningQueryList(RpcController controller, TajoIdProtos.SessionIdProto request) - throws ServiceException { + GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder(); + try { context.getSessionManager().touch(request.getId()); - GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder(); Collection<QueryInProgress> queries = new ArrayList<QueryInProgress>(context.getQueryJobManager().getSubmittedQueries()); queries.addAll(context.getQueryJobManager().getRunningQueries()); @@ -387,11 +417,14 @@ public class TajoMasterClientService extends AbstractService { builder.addQueryList(infoBuilder.build()); } - GetQueryListResponse result = builder.build(); - return result; + builder.setState(OK); + } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + builder.setState(returnError(t)); } + + return builder.build(); } @Override @@ -424,7 +457,9 @@ public class TajoMasterClientService extends AbstractService { } builder.setState(OK); + } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); } @@ -435,10 +470,11 @@ public class TajoMasterClientService extends AbstractService { public GetQueryStatusResponse getQueryStatus(RpcController controller, GetQueryStatusRequest request) throws ServiceException { + GetQueryStatusResponse.Builder builder = GetQueryStatusResponse.newBuilder(); + try { context.getSessionManager().touch(request.getSessionId().getId()); - GetQueryStatusResponse.Builder builder = GetQueryStatusResponse.newBuilder(); QueryId queryId = new QueryId(request.getQueryId()); builder.setQueryId(request.getQueryId()); @@ -485,11 +521,13 @@ public class TajoMasterClientService extends AbstractService { } } } - return builder.build(); } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + builder.setState(returnError(t)); } + + return builder.build(); } @Override @@ -539,8 +577,10 @@ public class TajoMasterClientService extends AbstractService { request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows"); } catch (Throwable t) { - builder.setResultSet(resultSetBuilder.build()); // required field + printStackTraceIfError(LOG, t); + builder.setState(returnError(t)); + builder.setResultSet(resultSetBuilder.build()); // required field } return builder.build(); @@ -559,6 +599,7 @@ public class TajoMasterClientService extends AbstractService { return OK; } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -587,6 +628,7 @@ public class TajoMasterClientService extends AbstractService { builder.setState(OK); } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); } @@ -611,6 +653,7 @@ public class TajoMasterClientService extends AbstractService { return OK; } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -646,6 +689,7 @@ public class TajoMasterClientService extends AbstractService { builder.setState(OK); } catch (Throwable t) { + printStackTraceIfError(LOG, t); builder.setState(returnError(t)); } @@ -665,6 +709,7 @@ public class TajoMasterClientService extends AbstractService { } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -680,6 +725,7 @@ public class TajoMasterClientService extends AbstractService { } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -697,6 +743,7 @@ public class TajoMasterClientService extends AbstractService { } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -714,6 +761,7 @@ public class TajoMasterClientService extends AbstractService { .build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return StringListResponse.newBuilder() .setState(returnError(t)) .build(); @@ -744,6 +792,7 @@ public class TajoMasterClientService extends AbstractService { } } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -767,6 +816,7 @@ public class TajoMasterClientService extends AbstractService { .build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return StringListResponse.newBuilder() .setState(returnError(t)) .build(); @@ -807,7 +857,10 @@ public class TajoMasterClientService extends AbstractService { .build(); } } catch (Throwable t) { - throw new ServiceException(t); + printStackTraceIfError(LOG, t); + return TableResponse.newBuilder() + .setState(returnError(t)) + .build(); } } @@ -850,6 +903,7 @@ public class TajoMasterClientService extends AbstractService { .setTable(desc.getProto()).build(); } catch (Throwable t) { + printStackTraceIfError(LOG, t); return TableResponse.newBuilder() .setState(returnError(t)) .build(); @@ -867,6 +921,7 @@ public class TajoMasterClientService extends AbstractService { return OK; } catch (Throwable t) { + printStackTraceIfError(LOG, t); return returnError(t); } } @@ -898,7 +953,11 @@ public class TajoMasterClientService extends AbstractService { .build(); } catch (Throwable t) { - return FunctionListResponse.newBuilder().setState(returnError(t)).build(); + printStackTraceIfError(LOG, t); + + return FunctionListResponse.newBuilder(). + setState(returnError(t)) + .build(); } } }
