TAJO-1583: Remove ServerCallable in RPC client. (jinho) Closes #556
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47554105 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47554105 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47554105 Branch: refs/heads/index_support Commit: 475541057891518e08e5a18ebbbf916c1ad60c10 Parents: 9540f16 Author: Jinho Kim <[email protected]> Authored: Thu Apr 30 16:51:56 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Thu Apr 30 16:51:56 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/catalog/AbstractCatalogClient.java | 569 ++++++------------- .../org/apache/tajo/catalog/CatalogClient.java | 49 +- .../org/apache/tajo/catalog/CatalogServer.java | 8 +- .../tajo/catalog/LocalCatalogWrapper.java | 20 +- .../tajo/client/CatalogAdminClientImpl.java | 236 +++----- .../org/apache/tajo/client/QueryClientImpl.java | 328 +++++------ .../apache/tajo/client/SessionConnection.java | 275 ++++----- .../cli/tsql/TestDefaultCliOutputFormatter.java | 4 - .../apache/tajo/querymaster/TestKillQuery.java | 63 +- .../org/apache/tajo/rpc/NettyClientBase.java | 3 +- .../tajo/rpc/RetriesExhaustedException.java | 104 ---- .../org/apache/tajo/rpc/ServerCallable.java | 148 ----- .../org/apache/tajo/rpc/TestBlockingRpc.java | 39 -- 14 files changed, 618 insertions(+), 1230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 8bda2bd..952f852 100644 --- a/CHANGES +++ b/CHANGES @@ -214,6 +214,8 @@ Release 0.11.0 - unreleased TASKS + TAJO-1583: Remove ServerCallable in RPC client. (jinho) + TAJO-1587: Upgrade java version to 1.7 for Travis CI. (jihoon) TAJO-1559: Fix data model description (tinyint, smallint). http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 49be29a..766f6c2 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -29,16 +29,12 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.ServerCallable; +import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; -import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.ProtoUtil; -import java.net.InetSocketAddress; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -46,50 +42,27 @@ import java.util.List; /** * CatalogClient provides a client API to access the catalog server. */ -public abstract class AbstractCatalogClient implements CatalogService { - private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class); +public abstract class AbstractCatalogClient implements CatalogService, Closeable { + protected final Log LOG = LogFactory.getLog(AbstractCatalogClient.class); - protected ServiceTracker serviceTracker; - protected RpcClientManager manager; - protected InetSocketAddress catalogServerAddr; protected TajoConf conf; - abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client); - - public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) { - this.manager = RpcClientManager.getInstance(); - this.catalogServerAddr = catalogServerAddr; - this.serviceTracker = ServiceTrackerFactory.get(conf); + public AbstractCatalogClient(TajoConf conf) { this.conf = conf; } - private InetSocketAddress getCatalogServerAddr() { - if (catalogServerAddr == null) { - return null; - } else { - - if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - return catalogServerAddr; - } else { - return serviceTracker.getCatalogAddress(); - } - } - } + abstract CatalogProtocolService.BlockingInterface getStub() throws ServiceException; @Override public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) { try { - return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); + CatalogProtocolService.BlockingInterface stub = getStub(); - CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder(); - builder.setTablespaceName(tablespaceName); - builder.setTablespaceUri(tablespaceUri); - return stub.createTablespace(null, builder.build()).getValue(); - } - }.withRetries(); - } catch (ServiceException e) { + CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder(); + builder.setTablespaceName(tablespaceName); + builder.setTablespaceUri(tablespaceUri); + return stub.createTablespace(null, builder.build()).getValue(); + } catch (Exception e) { LOG.error(e.getMessage(), e); return Boolean.FALSE; } @@ -98,12 +71,8 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean dropTablespace(final String tablespaceName) { try { - return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return Boolean.FALSE; @@ -113,12 +82,8 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean existTablespace(final String tablespaceName) { try { - return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return Boolean.FALSE; @@ -128,46 +93,32 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection<String> getAllTablespaceNames() { try { - return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Collection<String> call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO); - return ProtoUtil.convertStrings(response); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO); + return ProtoUtil.convertStrings(response); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<String>(); } } @Override public List<TablespaceProto> getAllTablespaces() { try { - return new ServerCallable<List<TablespaceProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<TablespaceProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO); - return response.getTablespaceList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO); + return response.getTablespaceList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<TablespaceProto>(); } } @Override public TablespaceProto getTablespace(final String tablespaceName) { try { - return new ServerCallable<TablespaceProto>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public TablespaceProto call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName)); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName)); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@ -177,12 +128,8 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) { try { - return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.alterTablespace(null, alterTablespace).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.alterTablespace(null, alterTablespace).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -192,18 +139,14 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) { try { - return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); + CatalogProtocolService.BlockingInterface stub = getStub(); - CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder(); - builder.setDatabaseName(databaseName); - if (tablespaceName != null) { - builder.setTablespaceName(tablespaceName); - } - return stub.createDatabase(null, builder.build()).getValue(); - } - }.withRetries(); + CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder(); + builder.setDatabaseName(databaseName); + if (tablespaceName != null) { + builder.setTablespaceName(tablespaceName); + } + return stub.createDatabase(null, builder.build()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return Boolean.FALSE; @@ -213,12 +156,8 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean dropDatabase(final String databaseName) { try { - return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return Boolean.FALSE; @@ -228,12 +167,8 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean existDatabase(final String databaseName) { try { - return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return Boolean.FALSE; @@ -243,50 +178,36 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection<String> getAllDatabaseNames() { try { - return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Collection<String> call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO); - return ProtoUtil.convertStrings(response); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO); + return ProtoUtil.convertStrings(response); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<String>(); } } @Override public List<DatabaseProto> getAllDatabases() { try { - return new ServerCallable<List<DatabaseProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<DatabaseProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO); - return response.getDatabaseList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO); + return response.getDatabaseList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<DatabaseProto>(); } } @Override public final TableDesc getTableDesc(final String databaseName, final String tableName) { try { - return new ServerCallable<TableDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public TableDesc call(NettyClientBase client) throws ServiceException { - TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setTableName(tableName); + TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build())); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build())); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@ -302,89 +223,60 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<TableDescriptorProto> getAllTables() { try { - return new ServerCallable<List<TableDescriptorProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<TableDescriptorProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO); - return response.getTableList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO); + return response.getTableList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<TableDescriptorProto>(); } } @Override public List<TableOptionProto> getAllTableOptions() { try { - return new ServerCallable<List<TableOptionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<TableOptionProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO); - return response.getTableOptionList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO); + return response.getTableOptionList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<TableOptionProto>(); } } @Override public List<TableStatsProto> getAllTableStats() { try { - return new ServerCallable<List<TableStatsProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<TableStatsProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO); - return response.getStatList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO); + return response.getStatList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<TableStatsProto>(); } } @Override public List<ColumnProto> getAllColumns() { try { - return new ServerCallable<List<ColumnProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<ColumnProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO); - return response.getColumnList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO); + return response.getColumnList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<ColumnProto>(); } } @Override public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) { try { - return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public PartitionMethodDesc call(NettyClientBase client) throws ServiceException { - - TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setTableName(tableName); + TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build())); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build())); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@ -394,17 +286,12 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean existPartitionMethod(final String databaseName, final String tableName) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - - TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setTableName(tableName); + TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existPartitionMethod(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.existPartitionMethod(null, builder.build()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -415,18 +302,13 @@ public abstract class AbstractCatalogClient implements CatalogService { public final PartitionDescProto getPartition(final String databaseName, final String tableName, final String partitionName) { try { - return new ServerCallable<PartitionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public PartitionDescProto call(NettyClientBase client) throws ServiceException { - - PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setTableName(tableName); - builder.setPartitionName(partitionName); + PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); + builder.setPartitionName(partitionName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.getPartitionByPartitionName(null, builder.build()); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.getPartitionByPartitionName(null, builder.build()); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@ -436,94 +318,70 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) { try { - return new ServerCallable<List<PartitionDescProto>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, - false) { - public List<PartitionDescProto> call(NettyClientBase client) throws ServiceException { + PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); - PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setTableName(tableName); - - CatalogProtocolService.BlockingInterface stub = getStub(client); - PartitionsProto response = stub.getPartitionsByTableName(null, builder.build()); - return response.getPartitionList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + PartitionsProto response = stub.getPartitionsByTableName(null, builder.build()); + return response.getPartitionList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<PartitionDescProto>(); } } @Override public List<TablePartitionProto> getAllPartitions() { try { - return new ServerCallable<List<TablePartitionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<TablePartitionProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO); - return response.getPartList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO); + return response.getPartList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<TablePartitionProto>(); } } @Override public final Collection<String> getAllTableNames(final String databaseName) { try { - return new ServerCallable<Collection<String>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Collection<String> call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName)); - return ProtoUtil.convertStrings(response); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName)); + return ProtoUtil.convertStrings(response); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<String>(); } } @Override public final Collection<FunctionDesc> getFunctions() { - try { - return new ServerCallable<Collection<FunctionDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException { - List<FunctionDesc> list = new ArrayList<FunctionDesc>(); - GetFunctionsResponse response; - CatalogProtocolService.BlockingInterface stub = getStub(client); - response = stub.getFunctions(null, NullProto.newBuilder().build()); - int size = response.getFunctionDescCount(); - for (int i = 0; i < size; i++) { - try { - list.add(new FunctionDesc(response.getFunctionDesc(i))); - } catch (ClassNotFoundException e) { - LOG.error(e, e); - return null; - } - } + List<FunctionDesc> list = new ArrayList<FunctionDesc>(); + try { + GetFunctionsResponse response; + CatalogProtocolService.BlockingInterface stub = getStub(); + response = stub.getFunctions(null, NullProto.newBuilder().build()); + int size = response.getFunctionDescCount(); + for (int i = 0; i < size; i++) { + try { + list.add(new FunctionDesc(response.getFunctionDesc(i))); + } catch (ClassNotFoundException e) { + LOG.error(e, e); return list; } - }.withRetries(); + } + return list; } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return list; } } @Override public final boolean createTable(final TableDesc desc) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.createTable(null, desc.getProto()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.createTable(null, desc.getProto()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -537,17 +395,12 @@ public abstract class AbstractCatalogClient implements CatalogService { final String simpleName = splitted[1]; try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - - TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setTableName(simpleName); + TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(simpleName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.dropTable(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.dropTable(null, builder.build()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -561,17 +414,12 @@ public abstract class AbstractCatalogClient implements CatalogService { "tableName cannot be composed of multiple parts, but it is \"" + tableName + "\""); } try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - - TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setTableName(tableName); + TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existsTable(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.existsTable(null, builder.build()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -586,12 +434,8 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean createIndex(final IndexDesc index) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.createIndex(null, index.getProto()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.createIndex(null, index.getProto()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -601,16 +445,12 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean existIndexByName(final String databaseName, final String indexName) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - IndexNameProto.Builder builder = IndexNameProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setIndexName(indexName); + IndexNameProto.Builder builder = IndexNameProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setIndexName(indexName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existIndexByName(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.existIndexByName(null, builder.build()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -620,17 +460,13 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); - builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); - builder.setColumnName(columnName); + GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); + builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); + builder.setColumnName(columnName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existIndexByColumn(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.existIndexByColumn(null, builder.build()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -640,17 +476,12 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final IndexDesc getIndexByName(final String databaseName, final String indexName) { try { - return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public IndexDesc call(NettyClientBase client) throws ServiceException { - - IndexNameProto.Builder builder = IndexNameProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setIndexName(indexName); + IndexNameProto.Builder builder = IndexNameProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setIndexName(indexName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return new IndexDesc(stub.getIndexByName(null, builder.build())); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return new IndexDesc(stub.getIndexByName(null, builder.build())); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@ -662,17 +493,12 @@ public abstract class AbstractCatalogClient implements CatalogService { final String tableName, final String columnName) { try { - return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public IndexDesc call(NettyClientBase client) throws ServiceException { + GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); + builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); + builder.setColumnName(columnName); - GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); - builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); - builder.setColumnName(columnName); - - CatalogProtocolService.BlockingInterface stub = getStub(client); - return new IndexDesc(stub.getIndexByColumn(null, builder.build())); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return new IndexDesc(stub.getIndexByColumn(null, builder.build())); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@ -683,17 +509,12 @@ public abstract class AbstractCatalogClient implements CatalogService { public boolean dropIndex(final String databaseName, final String indexName) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { + IndexNameProto.Builder builder = IndexNameProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setIndexName(indexName); - IndexNameProto.Builder builder = IndexNameProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setIndexName(indexName); - - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.dropIndex(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.dropIndex(null, builder.build()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -703,30 +524,20 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<IndexProto> getAllIndexes() { try { - return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<IndexProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); - return response.getIndexList(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); + return response.getIndexList(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return null; + return new ArrayList<IndexProto>(); } } @Override public final boolean createFunction(final FunctionDesc funcDesc) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.createFunction(null, funcDesc.getProto()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.createFunction(null, funcDesc.getProto()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -736,15 +547,11 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean dropFunction(final String signature) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder(); - builder.setSignature(signature); + UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder(); + builder.setSignature(signature); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.dropFunction(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.dropFunction(null, builder.build()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -769,24 +576,12 @@ public abstract class AbstractCatalogClient implements CatalogService { FunctionDescProto descProto = null; try { - descProto = new ServerCallable<FunctionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public FunctionDescProto call(NettyClientBase client) throws ServiceException { - try { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.getFunctionMeta(null, builder.build()); - } catch (NoSuchFunctionException e) { - abort(); - throw e; - } - } - }.withRetries(); - } catch(ServiceException e) { - // this is not good. we need to define user massage exception - if(e.getCause() instanceof NoSuchFunctionException){ - LOG.debug(e.getMessage()); - } else { - LOG.error(e.getMessage(), e); - } + CatalogProtocolService.BlockingInterface stub = getStub(); + descProto = stub.getFunctionMeta(null, builder.build()); + } catch (NoSuchFunctionException e) { + LOG.debug(e.getMessage()); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); } if (descProto == null) { @@ -819,27 +614,21 @@ public abstract class AbstractCatalogClient implements CatalogService { } try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.containFunction(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.containFunction(null, builder.build()).getValue(); + } catch (InvalidOperationException e) { + LOG.error(e.getMessage()); } catch (ServiceException e) { LOG.error(e.getMessage(), e); - return false; } + return false; } @Override public final boolean alterTable(final AlterTableDesc desc) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.alterTable(null, desc.getProto()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.alterTable(null, desc.getProto()).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -849,12 +638,8 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) { try { - return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.updateTableStats(null, updateTableStatsProto).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return stub.updateTableStats(null, updateTableStatsProto).getValue(); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java index 7666a97..80ded4a 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java @@ -18,35 +18,72 @@ package org.apache.tajo.catalog; +import com.google.protobuf.ServiceException; import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.RpcConstants; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.NetUtils; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; /** * CatalogClient provides a client API to access the catalog server. */ public class CatalogClient extends AbstractCatalogClient { + protected NettyClientBase client; + protected ServiceTracker serviceTracker; + protected InetSocketAddress catalogServerAddr; /** * @throws java.io.IOException * */ public CatalogClient(final TajoConf conf) throws IOException { - super(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS))); + super(conf); + this.catalogServerAddr = NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS)); + this.serviceTracker = ServiceTrackerFactory.get(conf); } - public CatalogClient(TajoConf conf, String host, int port) throws IOException { - super(conf, NetUtils.createSocketAddr(host, port)); - } @Override - BlockingInterface getStub(NettyClientBase client) { - return client.getStub(); + BlockingInterface getStub() throws ServiceException { + return getCatalogConnection().getStub(); + } + + private InetSocketAddress getCatalogServerAddr() { + if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + return catalogServerAddr; + } else { + return serviceTracker.getCatalogAddress(); + } } + public synchronized NettyClientBase getCatalogConnection() throws ServiceException { + if (client != null && client.isConnected()) return client; + else { + try { + if (client != null && client.isConnected()) return client; + RpcClientManager.cleanup(client); + + int retry = conf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES); + // Client do not closed on idle state for support high available + this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class, false, + retry, 0, TimeUnit.SECONDS, false); + } catch (Exception e) { + throw new ServiceException(e); + } + return client; + } + } + + @Override public void close() { + RpcClientManager.cleanup(client); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index e9fb177..f2e9795 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -33,7 +33,6 @@ import org.apache.tajo.annotation.ThreadSafe; import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService; import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; import org.apache.tajo.catalog.exception.*; -import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.store.CatalogStore; import org.apache.tajo.catalog.store.DerbyStore; @@ -61,7 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand; import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto; -import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; /** * This class provides the catalog service. The catalog service enables clients @@ -1192,7 +1190,7 @@ public class CatalogServer extends AbstractService { if (functions.containsKey(funcDesc.getSignature())) { FunctionDescProto found = findFunctionStrictType(funcDesc, true); if (found != null) { - throw new AlreadyExistsFunctionException(signature.toString()); + throw new ServiceException(new AlreadyExistsFunctionException(signature.toString())); } } @@ -1209,7 +1207,7 @@ public class CatalogServer extends AbstractService { throws ServiceException { if (!containFunction(request.getSignature())) { - throw new NoSuchFunctionException(request.getSignature(), new DataType[] {}); + throw new ServiceException(new NoSuchFunctionException(request.getSignature(), new DataType[]{})); } functions.remove(request.getSignature()); @@ -1231,7 +1229,7 @@ public class CatalogServer extends AbstractService { } if (function == null) { - throw new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList()); + throw new ServiceException(new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList())); } else { return function; } http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java index df9bd2c..35e9e2e 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java @@ -22,9 +22,6 @@ package org.apache.tajo.catalog; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.rpc.NettyClientBase; - -import java.io.IOException; /** * This class provides a catalog service interface in @@ -34,20 +31,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient { private CatalogServer catalog; private CatalogProtocol.CatalogProtocolService.BlockingInterface stub; - public LocalCatalogWrapper(final TajoConf conf) throws IOException { - super(conf, null); - this.catalog = new CatalogServer(); - this.catalog.init(conf); - this.catalog.start(); - this.stub = catalog.getHandler(); - } - public LocalCatalogWrapper(final CatalogServer server) { this(server, server.getConf()); } public LocalCatalogWrapper(final CatalogServer server, final TajoConf conf) { - super(conf, null); + super(conf); this.catalog = server; this.stub = server.getHandler(); } @@ -57,7 +46,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient { } @Override - CatalogProtocol.CatalogProtocolService.BlockingInterface getStub(NettyClientBase client) { + CatalogProtocol.CatalogProtocolService.BlockingInterface getStub() { return stub; } + + @Override + public void close() { + //nothing to do + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 9d0e427..9397fcf 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -27,10 +27,8 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.ipc.ClientProtos; -import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.jdbc.SQLStates; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.ServerCallable; import java.io.IOException; import java.net.URI; @@ -48,79 +46,45 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean createDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMaster = client.getStub(); - return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue(); - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMaster = client.getStub(); + return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue(); } @Override public boolean existDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMaster = client.getStub(); - return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue(); - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMaster = client.getStub(); + return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue(); } @Override public boolean dropDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue(); - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue(); } @Override public List<String> getAllDatabaseNames() throws ServiceException { - return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public List<String> call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList(); - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList(); } public boolean existTable(final String tableName) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue(); - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue(); } @Override @@ -133,32 +97,25 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) throws SQLException, ServiceException { - return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setName(tableName); - builder.setSchema(schema.getProto()); - builder.setMeta(meta.getProto()); - builder.setPath(path.toString()); - if (partitionMethodDesc != null) { - builder.setPartition(partitionMethodDesc.getProto()); - } - ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { - return CatalogUtil.newTableDesc(res.getTableDesc()); - } else { - throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); - } - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setName(tableName); + builder.setSchema(schema.getProto()); + builder.setMeta(meta.getProto()); + builder.setPath(path.toString()); + if (partitionMethodDesc != null) { + builder.setPartition(partitionMethodDesc.getProto()); + } + ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); + if (res.getResultCode() == ClientProtos.ResultCode.OK) { + return CatalogUtil.newTableDesc(res.getTableDesc()); + } else { + throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); + } } @Override @@ -169,94 +126,67 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean dropTable(final String tableName, final boolean purge) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); - public Boolean call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setName(tableName); - builder.setPurge(purge); - return tajoMasterService.dropTable(null, builder.build()).getValue(); - } - - }.withRetries(); + ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setName(tableName); + builder.setPurge(purge); + return tajoMasterService.dropTable(null, builder.build()).getValue(); } @Override public List<String> getTableList(@Nullable final String databaseName) throws ServiceException { - return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public List<String> call(NettyClientBase client) throws ServiceException { - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - if (databaseName != null) { - builder.setDatabaseName(databaseName); - } - ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build()); - return res.getTablesList(); - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + if (databaseName != null) { + builder.setDatabaseName(databaseName); + } + ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build()); + return res.getTablesList(); } @Override public TableDesc getTableDesc(final String tableName) throws ServiceException { - return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setTableName(tableName); - ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { - return CatalogUtil.newTableDesc(res.getTableDesc()); - } else { - throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); - } - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setTableName(tableName); + ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); + if (res.getResultCode() == ClientProtos.ResultCode.OK) { + return CatalogUtil.newTableDesc(res.getTableDesc()); + } else { + throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState())); + } } @Override public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { - return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - String paramFunctionName = functionName == null ? "" : functionName; - ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null, - connection.convertSessionedString(paramFunctionName)); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { - return res.getFunctionsList(); - } else { - throw new SQLException(res.getErrorMessage()); - } - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + String paramFunctionName = functionName == null ? "" : functionName; + ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null, + connection.convertSessionedString(paramFunctionName)); + if (res.getResultCode() == ClientProtos.ResultCode.OK) { + return res.getFunctionsList(); + } else { + throw new ServiceException(new SQLException(res.getErrorMessage())); + } } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 99c58b6..53889fe 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -28,11 +28,10 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.QueryMasterClientProtocol; -import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.ServerCallable; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.util.ProtoUtil; import java.io.IOException; @@ -40,6 +39,7 @@ import java.net.InetSocketAddress; import java.sql.ResultSet; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.tajo.ipc.ClientProtos.*; import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService; @@ -102,7 +102,7 @@ public class QueryClientImpl implements QueryClient { public void closeNonForwardQuery(QueryId queryId) { NettyClientBase tmClient = null; try { - tmClient = connection.getTajoMasterConnection(false); + tmClient = connection.getTajoMasterConnection(); TajoMasterClientProtocolService.BlockingInterface tajoMaster = tmClient.getStub(); connection.checkSessionAndGet(tmClient); @@ -153,50 +153,37 @@ public class QueryClientImpl implements QueryClient { @Override public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException { + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); - return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { + final QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(sql); + builder.setIsJson(false); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { - connection.checkSessionAndGet(client); - - final QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(sql); - builder.setIsJson(false); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - - SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build()); - if (response.getResultCode() == ResultCode.OK) { - connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - } - return response; - } - }.withRetries(); + SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build()); + if (response.getResultCode() == ResultCode.OK) { + connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + } + return response; } @Override public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException { - return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); - final QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(json); - builder.setIsJson(true); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + final QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(json); + builder.setIsJson(true); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.submitQuery(null, builder.build()); - } - }.withRetries(); + return tajoMasterService.submitQuery(null, builder.build()); } @Override @@ -308,7 +295,7 @@ public class QueryClientImpl implements QueryClient { NettyClientBase tmClient = null; try { - tmClient = connection.getTajoMasterConnection(false); + tmClient = connection.getTajoMasterConnection(); connection.checkSessionAndGet(tmClient); builder.setSessionId(connection.sessionId); TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); @@ -348,7 +335,7 @@ public class QueryClientImpl implements QueryClient { try { - tmClient = connection.getTajoMasterConnection(false); + tmClient = connection.getTajoMasterConnection(); connection.checkSessionAndGet(tmClient); TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); @@ -369,42 +356,26 @@ public class QueryClientImpl implements QueryClient { throws ServiceException { try { - final ServerCallable<ClientProtos.SerializedResultSet> callable = - new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); - builder.setFetchRowNum(fetchRowNum); - try { - GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - abort(); - throw new ServiceException(response.getErrorMessage()); - } - - return response.getResultSet(); - } catch (ServiceException e) { - abort(); - throw e; - } catch (Throwable t) { - throw new ServiceException(t.getMessage(), t); - } - } - }; - - ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + builder.setFetchRowNum(fetchRowNum); + + GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); + if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { + throw new ServiceException(response.getErrorMessage()); + } + + ClientProtos.SerializedResultSet resultSet = response.getResultSet(); return new TajoMemoryResultSet(queryId, - new Schema(serializedResultSet.getSchema()), - serializedResultSet.getSerializedTuplesList(), - serializedResultSet.getSerializedTuplesCount(), + new Schema(resultSet.getSchema()), + resultSet.getSerializedTuplesList(), + resultSet.getSerializedTuplesCount(), getClientSideSessionVars()); } catch (ServiceException e) { throw e; @@ -416,119 +387,86 @@ public class QueryClientImpl implements QueryClient { @Override public boolean updateQuery(final String sql) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - public Boolean call(NettyClientBase client) throws ServiceException { + QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(sql); + builder.setIsJson(false); + ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(sql); - builder.setIsJson(false); - ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - - if (response.getResultCode() == ClientProtos.ResultCode.OK) { - connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return true; - } else { - if (response.hasErrorMessage()) { - System.err.println("ERROR: " + response.getErrorMessage()); - } - return false; - } + if (response.getResultCode() == ClientProtos.ResultCode.OK) { + connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + return true; + } else { + if (response.hasErrorMessage()) { + LOG.error("ERROR: " + response.getErrorMessage()); } - }.withRetries(); + return false; + } } @Override public boolean updateQueryWithJson(final String json) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(json); - builder.setIsJson(true); - ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.OK) { - return true; - } else { - if (response.hasErrorMessage()) { - System.err.println("ERROR: " + response.getErrorMessage()); - } - return false; - } + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(json); + builder.setIsJson(true); + ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); + if (response.getResultCode() == ClientProtos.ResultCode.OK) { + return true; + } else { + if (response.hasErrorMessage()) { + LOG.error("ERROR: " + response.getErrorMessage()); } - }.withRetries(); + return false; + } } @Override public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException { - return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build()); - return res.getQueryListList(); - - } - }.withRetries(); + ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build()); + return res.getQueryListList(); } @Override public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException { - return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build()); - return res.getQueryListList(); - - } - }.withRetries(); + ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build()); + return res.getQueryListList(); } @Override public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException { - return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build()); - return res.getWorkerListList(); - } - - }.withRetries(); + ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build()); + return res.getWorkerListList(); } @Override @@ -540,7 +478,7 @@ public class QueryClientImpl implements QueryClient { NettyClientBase tmClient = null; try { /* send a kill to the TM */ - tmClient = connection.getTajoMasterConnection(false); + tmClient = connection.getTajoMasterConnection(); TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); connection.checkSessionAndGet(tmClient); @@ -581,25 +519,20 @@ public class QueryClientImpl implements QueryClient { } public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException { - return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - public QueryInfoProto call(NettyClientBase client) throws ServiceException { - connection.checkSessionAndGet(client); - - QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); - if (res.getResultCode() == ResultCode.OK) { - return res.getQueryInfo(); - } else { - abort(); - throw new ServiceException(res.getErrorMessage()); - } - } - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + + QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); + if (res.getResultCode() == ResultCode.OK) { + return res.getQueryInfo(); + } else { + throw new ServiceException(res.getErrorMessage()); + } } public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException { @@ -611,24 +544,31 @@ public class QueryClientImpl implements QueryClient { InetSocketAddress qmAddress = new InetSocketAddress( queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort()); - return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress, - QueryMasterClientProtocol.class, false) { - public QueryHistoryProto call(NettyClientBase client) throws ServiceException { - connection.checkSessionAndGet(client); + RpcClientManager manager = RpcClientManager.getInstance(); + NettyClientBase queryMasterClient; + try { + queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false, + manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false); + } catch (Exception e) { + throw new ServiceException(e); + } - QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); + try { + connection.checkSessionAndGet(connection.getTajoMasterConnection()); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); - GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build()); - if (res.getResultCode() == ResultCode.OK) { - return res.getQueryHistory(); - } else { - abort(); - throw new ServiceException(res.getErrorMessage()); - } + QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + + QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub(); + GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build()); + if (res.getResultCode() == ResultCode.OK) { + return res.getQueryHistory(); + } else { + throw new ServiceException(res.getErrorMessage()); } - }.withRetries(); + } finally { + queryMasterClient.close(); + } } }
