TAJO-1571: Merge TAJO-1497 and TAJO-1569 to 0.10.1. (jinho) Closes #544
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47008c58 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47008c58 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47008c58 Branch: refs/heads/branch-0.10.1 Commit: 47008c58ea866a9609f56405b09f968665c66d47 Parents: 5e1fa93 Author: Jinho Kim <[email protected]> Authored: Mon Apr 20 14:12:09 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Apr 20 14:12:09 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/catalog/AbstractCatalogClient.java | 84 ++++---- .../tajo/client/CatalogAdminClientImpl.java | 40 ++-- .../org/apache/tajo/client/QueryClientImpl.java | 50 ++--- .../apache/tajo/client/SessionConnection.java | 44 +++-- .../org/apache/tajo/master/QueryInProgress.java | 6 +- .../apache/tajo/master/TajoContainerProxy.java | 37 ++-- .../apache/tajo/querymaster/QueryMaster.java | 24 +-- .../tajo/worker/ExecutionBlockContext.java | 15 +- .../tajo/worker/TajoResourceAllocator.java | 19 +- .../tajo/worker/WorkerHeartbeatService.java | 16 +- .../ConnectivityCheckerRuleForTajoWorker.java | 26 +-- .../org/apache/tajo/rpc/AsyncRpcClient.java | 128 +++++------- .../org/apache/tajo/rpc/AsyncRpcServer.java | 82 ++++---- .../org/apache/tajo/rpc/BlockingRpcClient.java | 157 ++++++--------- .../org/apache/tajo/rpc/BlockingRpcServer.java | 85 ++++---- .../tajo/rpc/ConnectionCloseFutureListener.java | 35 ++++ .../org/apache/tajo/rpc/NettyClientBase.java | 190 +++++++++--------- .../tajo/rpc/ProtoChannelInitializer.java | 11 +- .../org/apache/tajo/rpc/RpcClientManager.java | 185 ++++++++++++++++++ .../org/apache/tajo/rpc/RpcConnectionPool.java | 194 ------------------- .../org/apache/tajo/rpc/ServerCallable.java | 36 ++-- .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 90 +++++++-- .../org/apache/tajo/rpc/TestBlockingRpc.java | 149 ++++++++++---- .../apache/tajo/rpc/TestRpcClientManager.java | 97 ++++++++++ 25 files changed, 946 insertions(+), 856 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 806da48..e9b5886 100644 --- a/CHANGES +++ b/CHANGES @@ -83,6 +83,8 @@ Release 0.10.1 - unreleased TASKS + TAJO-1571: Merge TAJO-1497 and TAJO-1569 to 0.10.1. (jinho) + TAJO-1568: Apply UnpooledByteBufAllocator when a tajo.test.enabled is set to enable. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index d8350a3..bdb8c2c 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -30,7 +30,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; @@ -50,14 +50,14 @@ public abstract class AbstractCatalogClient implements CatalogService { private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class); protected ServiceTracker serviceTracker; - protected RpcConnectionPool pool; + protected RpcClientManager manager; protected InetSocketAddress catalogServerAddr; protected TajoConf conf; abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client); public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) { - this.pool = RpcConnectionPool.getPool(); + this.manager = RpcClientManager.getInstance(); this.catalogServerAddr = catalogServerAddr; this.serviceTracker = ServiceTrackerFactory.get(conf); this.conf = conf; @@ -79,7 +79,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) { try { - return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); @@ -98,7 +98,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean dropTablespace(final String tablespaceName) { try { - return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue(); @@ -113,7 +113,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean existTablespace(final String tablespaceName) { try { - return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue(); @@ -128,7 +128,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection<String> getAllTablespaceNames() { try { - return new ServerCallable<Collection<String>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Collection<String> call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO); @@ -144,7 +144,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<TablespaceProto> getAllTablespaces() { try { - return new ServerCallable<List<TablespaceProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<List<TablespaceProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List<TablespaceProto> call(NettyClientBase client) throws Exception { @@ -162,7 +162,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public TablespaceProto getTablespace(final String tablespaceName) { try { - return new ServerCallable<TablespaceProto>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<TablespaceProto>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public TablespaceProto call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName)); @@ -177,7 +177,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) { try { - return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.alterTablespace(null, alterTablespace).getValue(); @@ -192,7 +192,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) { try { - return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); @@ -213,7 +213,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean dropDatabase(final String databaseName) { try { - return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue(); @@ -228,7 +228,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Boolean existDatabase(final String databaseName) { try { - return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue(); @@ -243,7 +243,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection<String> getAllDatabaseNames() { try { - return new ServerCallable<Collection<String>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Collection<String> call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO); @@ -259,7 +259,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<DatabaseProto> getAllDatabases() { try { - return new ServerCallable<List<DatabaseProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<List<DatabaseProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List<DatabaseProto> call(NettyClientBase client) throws Exception { @@ -277,7 +277,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final TableDesc getTableDesc(final String databaseName, final String tableName) { try { - return new ServerCallable<TableDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<TableDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public TableDesc call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); builder.setDatabaseName(databaseName); @@ -302,7 +302,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<TableDescriptorProto> getAllTables() { try { - return new ServerCallable<List<TableDescriptorProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<List<TableDescriptorProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List<TableDescriptorProto> call(NettyClientBase client) throws Exception { @@ -320,7 +320,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<TableOptionProto> getAllTableOptions() { try { - return new ServerCallable<List<TableOptionProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<List<TableOptionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List<TableOptionProto> call(NettyClientBase client) throws Exception { @@ -338,7 +338,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<TableStatsProto> getAllTableStats() { try { - return new ServerCallable<List<TableStatsProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<List<TableStatsProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List<TableStatsProto> call(NettyClientBase client) throws Exception { @@ -356,7 +356,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<ColumnProto> getAllColumns() { try { - return new ServerCallable<List<ColumnProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<List<ColumnProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List<ColumnProto> call(NettyClientBase client) throws Exception { @@ -374,7 +374,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) { try { - return new ServerCallable<PartitionMethodDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public PartitionMethodDesc call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@ -382,7 +382,7 @@ public abstract class AbstractCatalogClient implements CatalogService { builder.setTableName(tableName); CatalogProtocolService.BlockingInterface stub = getStub(client); - return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build())); + return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build())); } }.withRetries(); } catch (ServiceException e) { @@ -394,7 +394,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean existPartitionMethod(final String databaseName, final String tableName) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@ -414,7 +414,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<TablePartitionProto> getAllPartitions() { try { - return new ServerCallable<List<TablePartitionProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<List<TablePartitionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List<TablePartitionProto> call(NettyClientBase client) throws Exception { @@ -432,7 +432,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection<String> getAllTableNames(final String databaseName) { try { - return new ServerCallable<Collection<String>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Collection<String>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Collection<String> call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName)); @@ -448,7 +448,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final Collection<FunctionDesc> getFunctions() { try { - return new ServerCallable<Collection<FunctionDesc>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Collection<FunctionDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException { List<FunctionDesc> list = new ArrayList<FunctionDesc>(); GetFunctionsResponse response; @@ -475,7 +475,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean createTable(final TableDesc desc) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.createTable(null, desc.getProto()).getValue(); @@ -494,7 +494,7 @@ public abstract class AbstractCatalogClient implements CatalogService { final String simpleName = splitted[1]; try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@ -518,7 +518,7 @@ public abstract class AbstractCatalogClient implements CatalogService { "tableName cannot be composed of multiple parts, but it is \"" + tableName + "\""); } try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); @@ -543,7 +543,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean createIndex(final IndexDesc index) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.createIndex(null, index.getProto()).getValue(); @@ -558,7 +558,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean existIndexByName(final String databaseName, final String indexName) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { IndexNameProto.Builder builder = IndexNameProto.newBuilder(); builder.setDatabaseName(databaseName); @@ -577,7 +577,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); @@ -597,7 +597,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final IndexDesc getIndexByName(final String databaseName, final String indexName) { try { - return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public IndexDesc call(NettyClientBase client) throws ServiceException { IndexNameProto.Builder builder = IndexNameProto.newBuilder(); @@ -619,7 +619,7 @@ public abstract class AbstractCatalogClient implements CatalogService { final String tableName, final String columnName) { try { - return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public IndexDesc call(NettyClientBase client) throws ServiceException { GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); @@ -640,7 +640,7 @@ public abstract class AbstractCatalogClient implements CatalogService { public boolean dropIndex(final String databaseName, final String indexName) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { IndexNameProto.Builder builder = IndexNameProto.newBuilder(); @@ -660,7 +660,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public List<IndexProto> getAllIndexes() { try { - return new ServerCallable<List<IndexProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { @Override public List<IndexProto> call(NettyClientBase client) throws Exception { @@ -678,7 +678,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean createFunction(final FunctionDesc funcDesc) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.createFunction(null, funcDesc.getProto()).getValue(); @@ -693,7 +693,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean dropFunction(final String signature) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder(); builder.setSignature(signature); @@ -726,7 +726,7 @@ public abstract class AbstractCatalogClient implements CatalogService { FunctionDescProto descProto = null; try { - descProto = new ServerCallable<FunctionDescProto>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + descProto = new ServerCallable<FunctionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public FunctionDescProto call(NettyClientBase client) throws ServiceException { try { CatalogProtocolService.BlockingInterface stub = getStub(client); @@ -776,7 +776,7 @@ public abstract class AbstractCatalogClient implements CatalogService { } try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.containFunction(null, builder.build()).getValue(); @@ -791,7 +791,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public final boolean alterTable(final AlterTableDesc desc) { try { - return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.alterTable(null, desc.getProto()).getValue(); @@ -806,7 +806,7 @@ public abstract class AbstractCatalogClient implements CatalogService { @Override public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) { try { - return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CatalogProtocolService.BlockingInterface stub = getStub(client); return stub.updateTableStats(null, updateTableStatsProto).getValue(); http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 6347ad1..9d0e427 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -48,8 +48,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean createDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -64,8 +64,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean existDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -80,8 +80,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean dropDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -96,8 +96,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public List<String> getAllDatabaseNames() throws ServiceException { - return new ServerCallable<List<String>>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List<String> call(NettyClientBase client) throws ServiceException { @@ -111,8 +111,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { public boolean existTable(final String tableName) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { connection.checkSessionAndGet(client); @@ -133,8 +133,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) throws SQLException, ServiceException { - return new ServerCallable<TableDesc>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { @@ -169,8 +169,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public boolean dropTable(final String tableName, final boolean purge) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -190,8 +190,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public List<String> getTableList(@Nullable final String databaseName) throws ServiceException { - return new ServerCallable<List<String>>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List<String> call(NettyClientBase client) throws ServiceException { @@ -213,8 +213,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public TableDesc getTableDesc(final String tableName) throws ServiceException { - return new ServerCallable<TableDesc>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { @@ -238,8 +238,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { @Override public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { - return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.connPool, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException { http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 4444a31..99c58b6 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -19,7 +19,6 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.*; @@ -33,7 +32,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.util.ProtoUtil; @@ -115,8 +113,6 @@ public class QueryClientImpl implements QueryClient { tajoMaster.closeNonForwardQuery(null, builder.build()); } catch (Exception e) { LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); - } finally { - connection.connPool.closeConnection(tmClient); } } @@ -158,8 +154,8 @@ public class QueryClientImpl implements QueryClient { @Override public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException { - return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { @@ -184,8 +180,8 @@ public class QueryClientImpl implements QueryClient { @Override public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException { - return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { @@ -321,8 +317,6 @@ public class QueryClientImpl implements QueryClient { } catch (Exception e) { throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(tmClient); } return new QueryStatus(res); } @@ -367,8 +361,6 @@ public class QueryClientImpl implements QueryClient { } catch (Exception e) { throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(tmClient); } } @@ -378,8 +370,8 @@ public class QueryClientImpl implements QueryClient { try { final ServerCallable<ClientProtos.SerializedResultSet> callable = - new ServerCallable<ClientProtos.SerializedResultSet>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException { @@ -424,8 +416,8 @@ public class QueryClientImpl implements QueryClient { @Override public boolean updateQuery(final String sql) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -454,8 +446,8 @@ public class QueryClientImpl implements QueryClient { @Override public boolean updateQueryWithJson(final String json) throws ServiceException { - return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { @@ -482,8 +474,8 @@ public class QueryClientImpl implements QueryClient { @Override public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException { - return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException { @@ -502,8 +494,8 @@ public class QueryClientImpl implements QueryClient { @Override public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException { - return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException { @@ -522,8 +514,8 @@ public class QueryClientImpl implements QueryClient { @Override public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException { - return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException { @@ -574,8 +566,6 @@ public class QueryClientImpl implements QueryClient { } catch(Exception e) { LOG.debug("Error when checking for application status", e); - } finally { - connection.connPool.releaseConnection(tmClient); } return status; } @@ -591,8 +581,8 @@ public class QueryClientImpl implements QueryClient { } public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException { - return new ServerCallable<QueryInfoProto>(connection.connPool, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public QueryInfoProto call(NettyClientBase client) throws ServiceException { connection.checkSessionAndGet(client); @@ -621,8 +611,8 @@ public class QueryClientImpl implements QueryClient { InetSocketAddress qmAddress = new InetSocketAddress( queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort()); - return new ServerCallable<QueryHistoryProto>(connection.connPool, qmAddress, - QueryMasterClientProtocol.class, false, true) { + return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress, + QueryMasterClientProtocol.class, false) { public QueryHistoryProto call(NettyClientBase client) throws ServiceException { connection.checkSessionAndGet(client); http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index d05d3b1..b0cc662 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -30,7 +30,7 @@ import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.KeyValueSet; @@ -55,7 +55,7 @@ public class SessionConnection implements Closeable { private final Log LOG = LogFactory.getLog(TajoClientImpl.class); - final RpcConnectionPool connPool; + final RpcClientManager manager; private final String baseDatabase; @@ -86,8 +86,8 @@ public class SessionConnection implements Closeable { this.properties = properties; - connPool = RpcConnectionPool.getPool(); - userInfo = UserRoleInfo.getCurrentUser(); + this.manager = RpcClientManager.getInstance(); + this.userInfo = UserRoleInfo.getCurrentUser(); this.baseDatabase = baseDatabase != null ? baseDatabase : null; this.serviceTracker = tracker; @@ -99,12 +99,12 @@ public class SessionConnection implements Closeable { public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); + return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); } public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode) throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - return connPool.getConnection(addr, protocolClass, asyncMode); + return manager.getClient(addr, protocolClass, asyncMode); } protected KeyValueSet getProperties() { @@ -127,8 +127,8 @@ public class SessionConnection implements Closeable { public boolean isConnected() { if(!closed.get()){ try { - return connPool.getConnection(serviceTracker.getClientServiceAddress(), - TajoMasterClientProtocol.class, false).isActive(); + return manager.getClient(serviceTracker.getClientServiceAddress(), + TajoMasterClientProtocol.class, false).isConnected(); } catch (Throwable e) { return false; } @@ -141,7 +141,7 @@ public class SessionConnection implements Closeable { } public String getCurrentDatabase() throws ServiceException { - return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public String call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -153,8 +153,8 @@ public class SessionConnection implements Closeable { } public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException { - return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), + TajoMasterClientProtocol.class, false) { public Map<String, String> call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -179,7 +179,7 @@ public class SessionConnection implements Closeable { } public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException { - return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public Map<String, String> call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -209,7 +209,7 @@ public class SessionConnection implements Closeable { } public String getSessionVariable(final String varname) throws ServiceException { - return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public String call(NettyClientBase client) throws ServiceException { @@ -229,7 +229,7 @@ public class SessionConnection implements Closeable { } public Boolean existSessionVariable(final String varname) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -247,8 +247,8 @@ public class SessionConnection implements Closeable { } public Map<String, String> getAllSessionVariables() throws ServiceException { - return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, - false, true) { + return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, + false) { public Map<String, String> call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -260,7 +260,7 @@ public class SessionConnection implements Closeable { } public Boolean selectDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -278,13 +278,15 @@ public class SessionConnection implements Closeable { } // remove session + NettyClientBase client = null; try { - - NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); + client = manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub(); tajoMaster.removeSession(null, sessionId); - } catch (Throwable e) { + // ignore + } finally { + RpcClientManager.cleanup(client); } } @@ -321,7 +323,7 @@ public class SessionConnection implements Closeable { } public boolean reconnect() throws Exception { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { public Boolean call(NettyClientBase client) throws ServiceException { CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 668a770..d2286cf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -33,7 +33,7 @@ import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; @@ -112,7 +112,7 @@ public class QueryInProgress { masterContext.getResourceManager().releaseQueryMaster(queryId); if(queryMasterRpc != null) { - RpcConnectionPool.getPool().closeConnection(queryMasterRpc); + RpcClientManager.cleanup(queryMasterRpc); } try { @@ -157,7 +157,7 @@ public class QueryInProgress { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); queryMasterRpc = - RpcConnectionPool.getPool().getConnection(addr, QueryMasterProtocol.class, true); + RpcClientManager.getInstance().getClient(addr, QueryMasterProtocol.class, true); queryMasterRpcClient = queryMasterRpc.getStub(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 6128df3..9d54ce1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -36,7 +36,7 @@ import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.worker.TajoWorker; @@ -83,14 +83,12 @@ public class TajoContainerProxy extends ContainerProxy { NettyClientBase tajoWorkerRpc = null; try { InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get()); } catch (Throwable e) { /* Worker RPC failure */ context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); - } finally { - RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -99,7 +97,7 @@ public class TajoContainerProxy extends ContainerProxy { try { InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); PlanProto.ShuffleType shuffleType = @@ -120,8 +118,6 @@ public class TajoContainerProxy extends ContainerProxy { tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get()); } catch (Throwable e) { LOG.error(e.getMessage(), e); - } finally { - RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -173,23 +169,18 @@ public class TajoContainerProxy extends ContainerProxy { containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId)); } - RpcConnectionPool connPool = RpcConnectionPool.getPool(); + RpcClientManager manager = RpcClientManager.getInstance(); NettyClientBase tmClient = null; - try { - ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - masterClientService.releaseWorkerResource(null, - QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder() - .setExecutionBlockId(executionBlockId.getProto()) - .addAllContainerIds(containerIdProtos) - .build(), - NullCallback.get()); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } + ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker(); + tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + + QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); + masterClientService.releaseWorkerResource(null, + QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder() + .setExecutionBlockId(executionBlockId.getProto()) + .addAllContainerIds(containerIdProtos) + .build(), + NullCallback.get()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index bf23133..2b22955 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -41,7 +41,7 @@ import org.apache.tajo.master.event.QueryStopEvent; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.NetUtils; @@ -88,7 +88,7 @@ public class QueryMaster extends CompositeService implements EventHandler { private TajoWorker.WorkerContext workerContext; - private RpcConnectionPool connPool; + private RpcClientManager manager; private ExecutorService eventExecutor; @@ -104,7 +104,7 @@ public class QueryMaster extends CompositeService implements EventHandler { } try { this.systemConf = (TajoConf)conf; - this.connPool = RpcConnectionPool.getPool(); + this.manager = RpcClientManager.getInstance(); querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); queryMasterContext = new QueryMasterContext(systemConf); @@ -190,7 +190,7 @@ public class QueryMaster extends CompositeService implements EventHandler { for (WorkerResourceProto worker : workers) { try { TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); - rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), + rpc = manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); @@ -200,8 +200,6 @@ public class QueryMaster extends CompositeService implements EventHandler { continue; } catch (Exception e) { continue; - } finally { - connPool.releaseConnection(rpc); } } } @@ -214,15 +212,13 @@ public class QueryMaster extends CompositeService implements EventHandler { for (WorkerResourceProto worker : workers) { try { TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); - rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), + rpc = manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get()); } catch (Exception e) { LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(rpc); } } } @@ -237,7 +233,7 @@ public class QueryMaster extends CompositeService implements EventHandler { // update master address in worker context. ServiceTracker serviceTracker = workerContext.getServiceTracker(); - rpc = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterService = rpc.getStub(); CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>(); @@ -248,8 +244,6 @@ public class QueryMaster extends CompositeService implements EventHandler { return workerResourcesRequest.getWorkerResourcesList(); } catch (Exception e) { LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(rpc); } return new ArrayList<WorkerResourceProto>(); } @@ -345,7 +339,7 @@ public class QueryMaster extends CompositeService implements EventHandler { NettyClientBase tmClient = null; try { - tmClient = connPool.getConnection(workerContext.getServiceTracker().getUmbilicalAddress(), + tmClient = manager.getClient(workerContext.getServiceTracker().getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); @@ -355,8 +349,6 @@ public class QueryMaster extends CompositeService implements EventHandler { //When tajo do stop cluster, tajo master maybe throw closed connection exception LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); } try { @@ -451,7 +443,7 @@ public class QueryMaster extends CompositeService implements EventHandler { try { ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker(); - tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), + tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index c2b63eb..9c3fbe4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -38,7 +38,7 @@ import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; @@ -76,7 +76,7 @@ public class ExecutionBlockContext { private ExecutionBlockSharedResource resource; private TajoQueryEngine queryEngine; - private RpcConnectionPool connPool; + private RpcClientManager connManager; private InetSocketAddress qmMasterAddr; private WorkerConnectionInfo queryMaster; private TajoConf systemConf; @@ -100,7 +100,7 @@ public class ExecutionBlockContext { PlanProto.ShuffleType shuffleType) throws Throwable { this.manager = manager; this.executionBlockId = executionBlockId; - this.connPool = RpcConnectionPool.getPool(); + this.connManager = RpcClientManager.getInstance(); this.queryMaster = queryMaster; this.systemConf = conf; this.reporter = new Reporter(); @@ -149,13 +149,8 @@ public class ExecutionBlockContext { public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - NettyClientBase clientBase = null; - try { - clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true); - return clientBase.getStub(); - } finally { - connPool.releaseConnection(clientBase); - } + NettyClientBase clientBase = connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true); + return clientBase.getStub(); } public void stop(){ http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 47a9fda..49cb1e9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -50,16 +50,14 @@ import org.apache.tajo.querymaster.StageState; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.ApplicationIdUtils; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; public class TajoResourceAllocator extends AbstractResourceAllocator { @@ -193,14 +191,12 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { NettyClientBase tajoWorkerRpc = null; try { InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort()); - tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get()); } catch (Throwable e) { LOG.error(e.getMessage(), e); - } finally { - RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } } @@ -278,17 +274,16 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) .build(); - RpcConnectionPool connPool = RpcConnectionPool.getPool(); + NettyClientBase tmClient = null; try { ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); - tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + tmClient = RpcClientManager.getInstance(). + getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.allocateWorkerResources(null, request, callBack); } catch (Throwable e) { LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); } WorkerResourceAllocationResponse response = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 5493b37..ad67f94 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -18,7 +18,6 @@ package org.apache.tajo.worker; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; @@ -26,14 +25,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto; import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.storage.DiskDeviceInfo; @@ -57,7 +55,7 @@ public class WorkerHeartbeatService extends AbstractService { private final TajoWorker.WorkerContext context; private TajoConf systemConf; - private RpcConnectionPool connectionPool; + private RpcClientManager connectionManager; private WorkerHeartbeatThread thread; private static final float HDFS_DATANODE_STORAGE_SIZE; @@ -72,10 +70,12 @@ public class WorkerHeartbeatService extends AbstractService { @Override public void serviceInit(Configuration conf) throws Exception { - Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance."); + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } this.systemConf = (TajoConf) conf; - connectionPool = RpcConnectionPool.getPool(); + this.connectionManager = RpcClientManager.getInstance(); super.serviceInit(conf); } @@ -184,7 +184,7 @@ public class WorkerHeartbeatService extends AbstractService { CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>(); ServiceTracker serviceTracker = context.getServiceTracker(); - rmClient = connectionPool.getConnection(serviceTracker.getResourceTrackerAddress(), + rmClient = connectionManager.getClient(serviceTracker.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub(); resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack); @@ -207,8 +207,6 @@ public class WorkerHeartbeatService extends AbstractService { LOG.warn("Heartbeat response is being delayed.", te); } catch (Exception e) { LOG.error(e.getMessage(), e); - } finally { - connectionPool.releaseConnection(rmClient); } try { http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java index 4b76c73..f94bd78 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -19,20 +19,15 @@ package org.apache.tajo.worker.rule; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rule.*; import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; -import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; -import java.net.InetSocketAddress; - /** * With this rule, Tajo worker will check the connectivity to tajo master server. */ @@ -42,20 +37,11 @@ import java.net.InetSocketAddress; public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule { private void checkTajoMasterConnectivity(TajoConf tajoConf) throws Exception { - RpcConnectionPool pool = RpcConnectionPool.getPool(); - NettyClientBase masterClient = null; - InetSocketAddress masterAddress = null; - - try { - ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf); - masterClient = pool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); - masterClient.getStub(); - } finally { - if (masterClient != null) { - pool.releaseConnection(masterClient); - } - } - + RpcClientManager manager = RpcClientManager.getInstance(); + + ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf); + NettyClientBase masterClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + masterClient.getStub(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index 5845229..e6dbf2c 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -20,76 +20,53 @@ package org.apache.tajo.rpc; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.*; - import io.netty.channel.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.GenericFutureListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.GenericFutureListener; - import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; +import java.util.concurrent.ConcurrentMap; public class AsyncRpcClient extends NettyClientBase { private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); - private final ChannelInitializer<Channel> initializer; - private final ProxyRpcChannel rpcChannel; - - private final AtomicInteger sequence = new AtomicInteger(0); - private final Map<Integer, ResponseCallback> requests = + private final ConcurrentMap<Integer, ResponseCallback> requests = new ConcurrentHashMap<Integer, ResponseCallback>(); - private final Class<?> protocol; private final Method stubMethod; - - private RpcConnectionKey key; + private final ProxyRpcChannel rpcChannel; + private final ClientChannelInboundHandler inboundHandler; /** * Intentionally make this method package-private, avoiding user directly * new an instance through this constructor. */ - AsyncRpcClient(final Class<?> protocol, - final InetSocketAddress addr, int retries) - throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException { - - this.protocol = protocol; - String serviceClassName = protocol.getName() + "$" - + protocol.getSimpleName() + "Service"; - Class<?> serviceClass = Class.forName(serviceClassName); - stubMethod = serviceClass.getMethod("newStub", RpcChannel.class); - - initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), - RpcResponse.getDefaultInstance()); - super.init(addr, initializer, retries); - rpcChannel = new ProxyRpcChannel(); - this.key = new RpcConnectionKey(addr, protocol, true); + AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries) + throws ClassNotFoundException, NoSuchMethodException { + this(rpcConnectionKey, retries, 0); } - @Override - public RpcConnectionKey getKey() { - return key; + AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds) + throws ClassNotFoundException, NoSuchMethodException { + super(rpcConnectionKey, retries); + stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class); + rpcChannel = new ProxyRpcChannel(); + inboundHandler = new ClientChannelInboundHandler(); + init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds)); } @Override public <T> T getStub() { - try { - return (T) stubMethod.invoke(null, rpcChannel); - } catch (Exception e) { - throw new RemoteException(e.getMessage(), e); - } - } - - public RpcChannel getRpcChannel() { - return this.rpcChannel; + return getStub(stubMethod, rpcChannel); } protected void sendExceptions(String message) { @@ -113,17 +90,6 @@ public class AsyncRpcClient extends NettyClientBase { } private class ProxyRpcChannel implements RpcChannel { - private final ClientChannelInboundHandler handler; - - public ProxyRpcChannel() { - this.handler = getChannel().pipeline() - .get(ClientChannelInboundHandler.class); - - if (handler == null) { - throw new IllegalArgumentException("Channel does not have " + - "proper handler"); - } - } public void callMethod(final MethodDescriptor method, final RpcController controller, @@ -135,7 +101,7 @@ public class AsyncRpcClient extends NettyClientBase { Message rpcRequest = buildRequest(nextSeqId, method, param); - handler.registerCallback(nextSeqId, + inboundHandler.registerCallback(nextSeqId, new ResponseCallback(controller, responseType, done)); ChannelPromise channelPromise = getChannel().newPromise(); @@ -144,7 +110,7 @@ public class AsyncRpcClient extends NettyClientBase { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - handler.exceptionCaught(null, new ServiceException(future.cause())); + inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); } } }); @@ -160,7 +126,7 @@ public class AsyncRpcClient extends NettyClientBase { .setMethodName(method.getName()); if (param != null) { - requestBuilder.setRequestMessage(param.toByteString()); + requestBuilder.setRequestMessage(param.toByteString()); } return requestBuilder.build(); @@ -215,52 +181,56 @@ public class AsyncRpcClient extends NettyClientBase { } @ChannelHandler.Sharable - private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { + private class ClientChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> { - synchronized void registerCallback(int seqId, ResponseCallback callback) { + void registerCallback(int seqId, ResponseCallback callback) { - if (requests.containsKey(seqId)) { + if (requests.putIfAbsent(seqId, callback) != null) { throw new RemoteException( getErrorMessage("Duplicate Sequence Id "+ seqId)); } - - requests.put(seqId, callback); } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - if (msg instanceof RpcResponse) { - try { - RpcResponse response = (RpcResponse) msg; - ResponseCallback callback = requests.remove(response.getId()); + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { + ResponseCallback callback = requests.remove(response.getId()); - if (callback == null) { - LOG.warn("Dangling rpc call"); - } else { - callback.run(response); - } - } finally { - ReferenceCountUtil.release(msg); - } + if (callback == null) { + LOG.warn("Dangling rpc call"); + } else { + callback.run(response); } } @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.info("Connection established successfully : " + ctx.channel().remoteAddress()); + } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause); sendExceptions(cause.getMessage()); - + if(LOG.isDebugEnabled()) { LOG.error(cause.getMessage(), cause); } else { LOG.error("RPC Exception:" + cause.getMessage()); } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + /* If all requests is done and event is triggered, channel will be closed. */ + if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) { + ctx.close(); + LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress()); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java index 3b5a747..e4109fe 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -18,17 +18,17 @@ package org.apache.tajo.rpc; -import com.google.protobuf.*; import com.google.protobuf.Descriptors.MethodDescriptor; - +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import io.netty.util.ReferenceCountUtil; - import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -57,7 +57,7 @@ public class AsyncRpcServer extends NettyServerBase { } @ChannelHandler.Sharable - private class ServerHandler extends ChannelInboundHandlerAdapter { + private class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @@ -78,55 +78,46 @@ public class AsyncRpcServer extends NettyServerBase { } @Override - public void channelRead(final ChannelHandlerContext ctx, Object msg) - throws Exception { - if (msg instanceof RpcRequest) { - try { - final RpcRequest request = (RpcRequest) msg; - - String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); + protected void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) throws Exception { - if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); - } - - Message paramProto = null; - if (request.hasRequestMessage()) { - try { - paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() - .mergeFrom(request.getRequestMessage()).build(); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - } + String methodName = request.getMethodName(); + MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); - final RpcController controller = new NettyRpcController(); + if (methodDescriptor == null) { + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + } - RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() { + Message paramProto = null; + if (request.hasRequestMessage()) { + try { + paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() + .mergeFrom(request.getRequestMessage()).build(); + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + } - public void run(Message returnValue) { + final RpcController controller = new NettyRpcController(); - RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() { - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } + public void run(Message returnValue) { - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); - ctx.writeAndFlush(builder.build()); - } - }; + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } - service.callMethod(methodDescriptor, controller, paramProto, callback); + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } - } finally { - ReferenceCountUtil.release(msg); + ctx.writeAndFlush(builder.build()); } - } + }; + + service.callMethod(methodDescriptor, controller, paramProto, callback); } @Override @@ -138,11 +129,6 @@ public class AsyncRpcServer extends NettyServerBase { } else { LOG.error(cause.getMessage()); } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } } - } } \ No newline at end of file
