Repository: tajo Updated Branches: refs/heads/master 011de8bab -> d926247dc
TAJO-1633: Cleanup TajoMasterClientService. Closes #594 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d926247d Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d926247d Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d926247d Branch: refs/heads/master Commit: d926247dcaba0af8d810c54a7c668272c3c86ea5 Parents: 011de8b Author: Hyunsik Choi <[email protected]> Authored: Mon Jun 15 19:33:33 2015 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Mon Jun 15 19:33:33 2015 -0700 ---------------------------------------------------------------------- .../tajo/client/CatalogAdminClientImpl.java | 14 +++-- .../org/apache/tajo/client/QueryClientImpl.java | 8 +-- tajo-client/src/main/proto/ClientProtos.proto | 16 +----- .../main/proto/TajoMasterClientProtocol.proto | 8 +-- .../tajo/master/TajoMasterClientService.java | 60 ++++++++++---------- 5 files changed, 47 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/d926247d/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 9397fcf..1fe856a 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -27,8 +27,10 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.SessionedStringProto; import org.apache.tajo.jdbc.SQLStates; import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import java.io.IOException; import java.net.URI; @@ -145,13 +147,13 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { connection.checkSessionAndGet(client); BlockingInterface tajoMasterService = client.getStub(); - ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder(); + SessionedStringProto.Builder builder = SessionedStringProto.newBuilder(); builder.setSessionId(connection.sessionId); if (databaseName != null) { - builder.setDatabaseName(databaseName); + builder.setValue(databaseName); } - ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build()); - return res.getTablesList(); + PrimitiveProtos.StringListProto res = tajoMasterService.getTableList(null, builder.build()); + return res.getValuesList(); } @Override @@ -161,9 +163,9 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { connection.checkSessionAndGet(client); BlockingInterface tajoMasterService = client.getStub(); - ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder(); + SessionedStringProto.Builder builder = SessionedStringProto.newBuilder(); builder.setSessionId(connection.sessionId); - builder.setTableName(tableName); + builder.setValue(tableName); ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); if (res.getResultCode() == ClientProtos.ResultCode.OK) { return CatalogUtil.newTableDesc(res.getTableDesc()); http://git-wip-us.apache.org/repos/asf/tajo/blob/d926247d/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 53889fe..ac25933 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -437,8 +437,8 @@ public class QueryClientImpl implements QueryClient { connection.checkSessionAndGet(client); TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder(); - builder.setSessionId(connection.sessionId); + TajoIdProtos.SessionIdProto.Builder builder = TajoIdProtos.SessionIdProto.newBuilder(); + builder.setId(connection.sessionId.getId()); ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build()); return res.getQueryListList(); } @@ -450,8 +450,8 @@ public class QueryClientImpl implements QueryClient { connection.checkSessionAndGet(client); TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder(); - builder.setSessionId(connection.sessionId); + TajoIdProtos.SessionIdProto.Builder builder = TajoIdProtos.SessionIdProto.newBuilder(); + builder.setId(connection.sessionId.getId()); ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build()); return res.getQueryListList(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d926247d/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index ecb136e..5497faa 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -57,7 +57,7 @@ message SessionUpdateResponse { message SessionedStringProto { optional SessionIdProto sessionId = 1; - required string value = 2; + optional string value = 2; } message ExplainQueryResponse { @@ -196,20 +196,6 @@ message GetClusterInfoResponse { repeated WorkerResourceInfo workerList = 1; } -message GetTableListRequest { - optional SessionIdProto sessionId = 1; - optional string databaseName = 2; -} - -message GetTableListResponse { - repeated string tables = 1; -} - -message GetTableDescRequest { - optional SessionIdProto sessionId = 1; - required string tableName = 2; -} - message CreateTableRequest { optional SessionIdProto sessionId = 1; required string name = 2; http://git-wip-us.apache.org/repos/asf/tajo/blob/d926247d/tajo-client/src/main/proto/TajoMasterClientProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto index 10ca268..468a998 100644 --- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto @@ -46,8 +46,8 @@ service TajoMasterClientProtocolService { // Query And Resource Management APIs rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse); - rpc getRunningQueryList(GetQueryListRequest) returns (GetQueryListResponse); - rpc getFinishedQueryList(GetQueryListRequest) returns (GetQueryListResponse); + rpc getRunningQueryList(SessionIdProto) returns (GetQueryListResponse); + rpc getFinishedQueryList(SessionIdProto) returns (GetQueryListResponse); rpc killQuery(QueryIdRequest) returns (BoolProto); rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); rpc closeNonForwardQuery(QueryIdRequest) returns (BoolProto); @@ -65,7 +65,7 @@ service TajoMasterClientProtocolService { rpc createExternalTable(CreateTableRequest) returns (TableResponse); rpc existTable(SessionedStringProto) returns (BoolProto); rpc dropTable(DropTableRequest) returns (BoolProto); - rpc getTableList(GetTableListRequest) returns (GetTableListResponse); - rpc getTableDesc(GetTableDescRequest) returns (TableResponse); + rpc getTableList(SessionedStringProto) returns (StringListProto); + rpc getTableDesc(SessionedStringProto) returns (TableResponse); rpc getFunctionList(SessionedStringProto) returns (FunctionResponse); } http://git-wip-us.apache.org/repos/asf/tajo/blob/d926247d/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 4fcdc88..2602d7d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -47,19 +47,20 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolServ import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner; import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.master.rm.WorkerResource; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.PartitionedTableScanNode; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.querymaster.QueryJobEvent; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerResource; -import org.apache.tajo.session.InvalidSessionException; -import org.apache.tajo.session.NoSuchSessionVariableException; -import org.apache.tajo.session.Session; import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; +import org.apache.tajo.session.InvalidSessionException; +import org.apache.tajo.session.NoSuchSessionVariableException; +import org.apache.tajo.session.Session; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; @@ -307,11 +308,6 @@ public class TajoMasterClientService extends AbstractService { try { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); QueryContext queryContext = new QueryContext(conf, session); - if (queryContext.getCurrentDatabase() == null) { - for (Map.Entry<String,String> e : queryContext.getAllKeyValus().entrySet()) { - System.out.println(e.getKey() + "=" + e.getValue()); - } - } UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder(); try { @@ -379,12 +375,12 @@ public class TajoMasterClientService extends AbstractService { } @Override - public GetQueryListResponse getRunningQueryList(RpcController controller, GetQueryListRequest request) + public GetQueryListResponse getRunningQueryList(RpcController controller, TajoIdProtos.SessionIdProto request) throws ServiceException { try { - context.getSessionManager().touch(request.getSessionId().getId()); + context.getSessionManager().touch(request.getId()); GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder(); Collection<QueryInProgress> queries = new ArrayList<QueryInProgress>(context.getQueryJobManager().getSubmittedQueries()); @@ -416,11 +412,11 @@ public class TajoMasterClientService extends AbstractService { } @Override - public GetQueryListResponse getFinishedQueryList(RpcController controller, GetQueryListRequest request) + public GetQueryListResponse getFinishedQueryList(RpcController controller, TajoIdProtos.SessionIdProto request) throws ServiceException { try { - context.getSessionManager().touch(request.getSessionId().getId()); + context.getSessionManager().touch(request.getId()); GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder(); Collection<QueryInfo> queries @@ -723,7 +719,7 @@ public class TajoMasterClientService extends AbstractService { } @Override - public PrimitiveProtos.StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto + public StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto request) throws ServiceException { try { context.getSessionManager().touch(request.getId()); @@ -749,10 +745,6 @@ public class TajoMasterClientService extends AbstractService { tableName = request.getValue(); } - if (databaseName == null) { - System.out.println("A"); - } - if (catalog.existsTable(databaseName, tableName)) { return BOOL_TRUE; } else { @@ -764,19 +756,19 @@ public class TajoMasterClientService extends AbstractService { } @Override - public GetTableListResponse getTableList(RpcController controller, - GetTableListRequest request) throws ServiceException { + public StringListProto getTableList(RpcController controller, + SessionedStringProto request) throws ServiceException { try { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); String databaseName; - if (request.hasDatabaseName()) { - databaseName = request.getDatabaseName(); + if (request.hasValue()) { + databaseName = request.getValue(); } else { databaseName = session.getCurrentDatabase(); } Collection<String> tableNames = catalog.getAllTableNames(databaseName); - GetTableListResponse.Builder builder = GetTableListResponse.newBuilder(); - builder.addAllTables(tableNames); + StringListProto.Builder builder = StringListProto.newBuilder(); + builder.addAllValues(tableNames); return builder.build(); } catch (Throwable t) { throw new ServiceException(t); @@ -784,19 +776,27 @@ public class TajoMasterClientService extends AbstractService { } @Override - public TableResponse getTableDesc(RpcController controller, GetTableDescRequest request) throws ServiceException { + public TableResponse getTableDesc(RpcController controller, SessionedStringProto request) throws ServiceException { try { + + if (!request.hasValue()) { + return TableResponse.newBuilder() + .setResultCode(ResultCode.ERROR) + .setErrorMessage("table name is required.") + .build(); + } + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); String databaseName; String tableName; - if (CatalogUtil.isFQTableName(request.getTableName())) { - String [] splitted = CatalogUtil.splitFQTableName(request.getTableName()); + if (CatalogUtil.isFQTableName(request.getValue())) { + String [] splitted = CatalogUtil.splitFQTableName(request.getValue()); databaseName = splitted[0]; tableName = splitted[1]; } else { databaseName = session.getCurrentDatabase(); - tableName = request.getTableName(); + tableName = request.getValue(); } if (catalog.existsTable(databaseName, tableName)) { @@ -807,7 +807,7 @@ public class TajoMasterClientService extends AbstractService { } else { return TableResponse.newBuilder() .setResultCode(ResultCode.ERROR) - .setErrorMessage("ERROR: no such a table: " + request.getTableName()) + .setErrorMessage("ERROR: no such a table: " + request.getValue()) .build(); } } catch (Throwable t) {
