This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch client_manager in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0953f6a72ad54349fd3aced1b4e492c18d31743d Author: LebronAl <[email protected]> AuthorDate: Tue Apr 19 17:42:28 2022 +0800 make interface more generic --- .../iotdb/confignode/client/ClientPoolFactory.java | 8 +-- .../iotdb/consensus/ratis/RatisClientFactory.java | 67 ++++++++++++++++++++++ .../iotdb/consensus/ratis/RatisConsensus.java | 56 +++++++++++++----- node-commons/pom.xml | 4 ++ .../commons/client/AsyncBaseClientFactory.java | 4 +- .../iotdb/commons/client/BaseClientFactory.java | 12 ++-- .../apache/iotdb/commons/client/ClientManager.java | 19 +++--- .../commons/client/ClientManagerProperty.java | 26 ++++----- .../iotdb/commons/client/IClientManager.java | 11 ++-- .../iotdb/commons/client/IClientPoolFactory.java | 7 +-- .../async/AsyncDataNodeInternalServiceClient.java | 6 +- .../sync/SyncDataNodeInternalServiceClient.java | 6 +- .../apache/iotdb/db/client/ClientPoolFactory.java | 16 +++--- .../async/AsyncConfigNodeIServiceClient.java | 6 +- .../async/AsyncDataNodeDataBlockServiceClient.java | 6 +- .../client/sync/SyncConfigNodeIServiceClient.java | 6 +- .../sync/SyncDataNodeDataBlockServiceClient.java | 6 +- 17 files changed, 177 insertions(+), 89 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java index 73bb856065..d8731cb952 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java @@ -36,10 +36,10 @@ public class ClientPoolFactory { private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf(); public static class SyncDataNodeInternalServiceClientPoolFactory - implements IClientPoolFactory<SyncDataNodeInternalServiceClient> { + implements IClientPoolFactory<EndPoint, SyncDataNodeInternalServiceClient> { @Override public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool( - ClientManager<SyncDataNodeInternalServiceClient> manager) { + ClientManager<EndPoint, SyncDataNodeInternalServiceClient> manager) { ClientManagerProperty<SyncDataNodeInternalServiceClient> property = new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>() .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) @@ -51,10 +51,10 @@ public class ClientPoolFactory { } public static class AsyncDataNodeInternalServiceClientPoolFactory - implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> { + implements IClientPoolFactory<EndPoint, AsyncDataNodeInternalServiceClient> { @Override public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool( - ClientManager<AsyncDataNodeInternalServiceClient> manager) { + ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> manager) { ClientManagerProperty<AsyncDataNodeInternalServiceClient> property = new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>() .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java new file mode 100644 index 0000000000..27d5f7a640 --- /dev/null +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.ratis; + +import org.apache.iotdb.commons.client.BaseClientFactory; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftGroup; + +public class RatisClientFactory extends BaseClientFactory<RaftGroup, RaftClient> { + + private final RaftProperties raftProperties; + private final RaftClientRpc clientRpc; + + public RatisClientFactory( + ClientManager<RaftGroup, RaftClient> clientManager, + ClientManagerProperty<RaftClient> clientManagerProperty, + RaftProperties raftProperties, + RaftClientRpc clientRpc) { + super(clientManager, clientManagerProperty); + this.raftProperties = raftProperties; + this.clientRpc = clientRpc; + } + + @Override + public void destroyObject(RaftGroup key, PooledObject<RaftClient> pooledObject) throws Exception { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<RaftClient> makeObject(RaftGroup group) throws Exception { + return new DefaultPooledObject<>( + RaftClient.newBuilder() + .setProperties(raftProperties) + .setRaftGroup(group) + .setClientRpc(clientRpc) + .build()); + } + + @Override + public boolean validateObject(RaftGroup key, PooledObject<RaftClient> pooledObject) { + return true; + } +} diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index a87cafd2fe..cbf3f713f6 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -21,6 +21,9 @@ package org.apache.iotdb.consensus.ratis; import org.apache.iotdb.common.rpc.thrift.EndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.commons.client.IClientPoolFactory; import org.apache.iotdb.commons.cluster.Endpoint; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.IConsensus; @@ -37,7 +40,10 @@ import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; import org.apache.iotdb.consensus.exception.RatisRequestFailedException; import org.apache.iotdb.consensus.statemachine.IStateMachine; +import org.apache.commons.pool2.KeyedObjectPool; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; @@ -74,41 +80,40 @@ import java.util.stream.Collectors; * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details. */ class RatisConsensus implements IConsensus { + + private final Logger logger = LoggerFactory.getLogger(RatisConsensus.class); + // the unique net communication endpoint private final RaftPeer myself; - private final RaftServer server; - private final Map<RaftGroupId, RaftClient> clientMap; - private final Map<RaftGroupId, RaftGroup> raftGroupMap; + private final RaftProperties properties = new RaftProperties();; + private final RaftClientRpc clientRpc; + + private final ClientManager<RaftGroup, RaftClient> clientManager = + new ClientManager<>(new RatisClientPoolFactory()); + private final Map<RaftGroupId, RaftGroup> raftGroupMap = new ConcurrentHashMap<>(); + private final Map<RaftGroupId, RaftClient> clientMap = new ConcurrentHashMap<>(); - private ClientId localFakeId; - private AtomicLong localFakeCallId; + private final ClientId localFakeId = ClientId.randomId(); + private final AtomicLong localFakeCallId = new AtomicLong(0); private static final int DEFAULT_PRIORITY = 0; private static final int LEADER_PRIORITY = 1; - private Logger logger = LoggerFactory.getLogger(RatisConsensus.class); - public RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry) throws IOException { - this.clientMap = new ConcurrentHashMap<>(); - this.raftGroupMap = new ConcurrentHashMap<>(); - this.localFakeId = ClientId.randomId(); - this.localFakeCallId = new AtomicLong(0); - // create a RaftPeer as endpoint of comm String address = Utils.IPAddress(endpoint); myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY); - RaftProperties properties = new RaftProperties(); - RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir)); // set the port which server listen to in RaftProperty object final int port = NetUtils.createSocketAddr(address).getPort(); GrpcConfigKeys.Server.setPort(properties, port); + clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties); server = RaftServer.newBuilder() @@ -128,6 +133,7 @@ class RatisConsensus implements IConsensus { @Override public void stop() throws IOException { + clientManager.close(); server.close(); } @@ -443,7 +449,7 @@ class RatisConsensus implements IConsensus { isLeader = server.getDivision(raftGroupId).getInfo().isLeader(); } catch (IOException exception) { // if the query fails, simply return not leader - logger.warn("isLeader request failed with exception: ", exception); + logger.info("isLeader request failed with exception: ", exception); isLeader = false; } return isLeader; @@ -516,6 +522,15 @@ class RatisConsensus implements IConsensus { return client; } + private RaftClient getRaftClient(RaftGroup group) { + try { + return clientManager.borrowClient(group); + } catch (IOException e) { + logger.error(String.format("Borrow client from pool for group %s failed.", group), e); + return null; + } + } + private void closeRaftClient(RaftGroupId groupId) { RaftClient client = clientMap.get(groupId); if (client != null) { @@ -563,4 +578,15 @@ class RatisConsensus implements IConsensus { } return reply; } + + private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RaftClient> { + @Override + public KeyedObjectPool<RaftGroup, RaftClient> createClientPool( + ClientManager<RaftGroup, RaftClient> manager) { + ClientManagerProperty<RaftClient> property = + new ClientManagerProperty.Builder<RaftClient>().build(); + return new GenericKeyedObjectPool<>( + new RatisClientFactory(manager, property, properties, clientRpc), property.getConfig()); + } + } } diff --git a/node-commons/pom.xml b/node-commons/pom.xml index f4e3587e38..a88c22bbf3 100644 --- a/node-commons/pom.xml +++ b/node-commons/pom.xml @@ -53,6 +53,10 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> </dependencies> <build> <plugins> diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java index b710f5482c..b822b0043b 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java @@ -26,14 +26,14 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -public abstract class AsyncBaseClientFactory<K, T> extends BaseClientFactory<K, T> { +public abstract class AsyncBaseClientFactory<K, V> extends BaseClientFactory<K, V> { private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class); protected TAsyncClientManager[] tManagers; protected AtomicInteger clientCnt = new AtomicInteger(); protected AsyncBaseClientFactory( - ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) { + ClientManager<K, V> clientManager, ClientManagerProperty<V> clientManagerProperty) { super(clientManager, clientManagerProperty); tManagers = new TAsyncClientManager[clientManagerProperty.getSelectorNumOfAsyncClientPool()]; for (int i = 0; i < tManagers.length; i++) { diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java index f628692152..1f85805691 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java @@ -22,20 +22,20 @@ package org.apache.iotdb.commons.client; import org.apache.commons.pool2.KeyedPooledObjectFactory; import org.apache.commons.pool2.PooledObject; -public abstract class BaseClientFactory<K, T> implements KeyedPooledObjectFactory<K, T> { +public abstract class BaseClientFactory<K, V> implements KeyedPooledObjectFactory<K, V> { - protected ClientManager<T> clientManager; - protected ClientManagerProperty<T> clientManagerProperty; + protected ClientManager<K, V> clientManager; + protected ClientManagerProperty<V> clientManagerProperty; public BaseClientFactory( - ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) { + ClientManager<K, V> clientManager, ClientManagerProperty<V> clientManagerProperty) { this.clientManager = clientManager; this.clientManagerProperty = clientManagerProperty; } @Override - public void activateObject(K node, PooledObject<T> pooledObject) {} + public void activateObject(K node, PooledObject<V> pooledObject) {} @Override - public void passivateObject(K node, PooledObject<T> pooledObject) {} + public void passivateObject(K node, PooledObject<V> pooledObject) {} } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 0513af1a5d..de3b8947cb 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -19,31 +19,28 @@ package org.apache.iotdb.commons.client; -import org.apache.iotdb.common.rpc.thrift.EndPoint; - import org.apache.commons.pool2.KeyedObjectPool; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Optional; -public class ClientManager<E> implements IClientManager<E> { +public class ClientManager<K, V> implements IClientManager<K, V> { private static final Logger logger = LoggerFactory.getLogger(ClientManager.class); - private final KeyedObjectPool<EndPoint, E> pool; + private final KeyedObjectPool<K, V> pool; - public ClientManager(IClientPoolFactory<E> factory) { + public ClientManager(IClientPoolFactory<K, V> factory) { pool = factory.createClientPool(this); } @Override - public Optional<E> borrowClient(EndPoint node) throws IOException { - Optional<E> client = Optional.empty(); + public V borrowClient(K node) throws IOException { + V client = null; try { - client = Optional.of(pool.borrowObject(node)); + client = pool.borrowObject(node); } catch (TTransportException e) { // external needs to check transport related exception throw new IOException(e); @@ -58,7 +55,7 @@ public class ClientManager<E> implements IClientManager<E> { } @Override - public void returnClient(EndPoint node, E client) { + public void returnClient(K node, V client) { if (client != null && node != null) { try { pool.returnObject(node, client); @@ -70,7 +67,7 @@ public class ClientManager<E> implements IClientManager<E> { } @Override - public void clear(EndPoint node) { + public void clear(K node) { if (node != null) { try { pool.clear(node); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java index b35f163257..5fc95174db 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java @@ -26,9 +26,9 @@ import org.apache.thrift.protocol.TProtocolFactory; import java.time.Duration; -public class ClientManagerProperty<T> { +public class ClientManagerProperty<V> { - private final GenericKeyedObjectPoolConfig<T> config; + private final GenericKeyedObjectPoolConfig<V> config; // thrift client config private final TProtocolFactory protocolFactory; @@ -36,7 +36,7 @@ public class ClientManagerProperty<T> { private int selectorNumOfAsyncClientPool = 1; public ClientManagerProperty( - GenericKeyedObjectPoolConfig<T> config, + GenericKeyedObjectPoolConfig<V> config, TProtocolFactory protocolFactory, int connectionTimeoutMs, int selectorNumOfAsyncClientPool) { @@ -46,7 +46,7 @@ public class ClientManagerProperty<T> { this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool; } - public GenericKeyedObjectPoolConfig<T> getConfig() { + public GenericKeyedObjectPoolConfig<V> getConfig() { return config; } @@ -62,7 +62,7 @@ public class ClientManagerProperty<T> { return selectorNumOfAsyncClientPool; } - public static class Builder<T> { + public static class Builder<V> { // pool config private long waitClientTimeoutMS = 20_000; @@ -74,38 +74,38 @@ public class ClientManagerProperty<T> { private int connectionTimeoutMs = 20_000; private int selectorNumOfAsyncClientPool = 1; - public Builder<T> setWaitClientTimeoutMS(long waitClientTimeoutMS) { + public Builder<V> setWaitClientTimeoutMS(long waitClientTimeoutMS) { this.waitClientTimeoutMS = waitClientTimeoutMS; return this; } - public Builder<T> setMaxConnectionForEachNode(int maxConnectionForEachNode) { + public Builder<V> setMaxConnectionForEachNode(int maxConnectionForEachNode) { this.maxConnectionForEachNode = maxConnectionForEachNode; return this; } - public Builder<T> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) { + public Builder<V> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) { this.maxIdleConnectionForEachNode = maxIdleConnectionForEachNode; return this; } - public Builder<T> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) { + public Builder<V> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) { this.rpcThriftCompressionEnabled = rpcThriftCompressionEnabled; return this; } - public Builder<T> setConnectionTimeoutMs(int connectionTimeoutMs) { + public Builder<V> setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; return this; } - public Builder<T> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) { + public Builder<V> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) { this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool; return this; } - public ClientManagerProperty<T> build() { - GenericKeyedObjectPoolConfig<T> poolConfig = new GenericKeyedObjectPoolConfig<>(); + public ClientManagerProperty<V> build() { + GenericKeyedObjectPoolConfig<V> poolConfig = new GenericKeyedObjectPoolConfig<>(); poolConfig.setMaxTotalPerKey(maxConnectionForEachNode); poolConfig.setMaxIdlePerKey(maxIdleConnectionForEachNode); poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS)); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java index 00ee3d45a5..529c78e8b1 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java @@ -19,18 +19,15 @@ package org.apache.iotdb.commons.client; -import org.apache.iotdb.common.rpc.thrift.EndPoint; - import java.io.IOException; -import java.util.Optional; -public interface IClientManager<E> { +public interface IClientManager<K, V> { - Optional<E> borrowClient(EndPoint node) throws IOException; + V borrowClient(K node) throws IOException; - void returnClient(EndPoint node, E client); + void returnClient(K node, V client); - void clear(EndPoint node); + void clear(K node); void close(); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java index 80341bf5f5..aa7980decb 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java @@ -19,12 +19,9 @@ package org.apache.iotdb.commons.client; -import org.apache.iotdb.common.rpc.thrift.EndPoint; - import org.apache.commons.pool2.KeyedObjectPool; -public interface IClientPoolFactory<E> { +public interface IClientPoolFactory<K, V> { - KeyedObjectPool<EndPoint, E> createClientPool( - ClientManager<E> manager); + KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java index 6119b799a8..8547d13890 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java @@ -37,14 +37,14 @@ import java.io.IOException; public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncClient { private final EndPoint endpoint; - private final ClientManager<AsyncDataNodeInternalServiceClient> clientManager; + private final ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager; public AsyncDataNodeInternalServiceClient( TProtocolFactory protocolFactory, int connectionTimeout, EndPoint endpoint, TAsyncClientManager tClientManager, - ClientManager<AsyncDataNodeInternalServiceClient> clientManager) + ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager) throws IOException { super( protocolFactory, @@ -92,7 +92,7 @@ public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncCli extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeInternalServiceClient> { public Factory( - ClientManager<AsyncDataNodeInternalServiceClient> clientManager, + ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager, ClientManagerProperty<AsyncDataNodeInternalServiceClient> clientManagerProperty) { super(clientManager, clientManagerProperty); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java index a40104bbbc..c273809554 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java @@ -40,13 +40,13 @@ import java.net.SocketException; public class SyncDataNodeInternalServiceClient extends InternalService.Client { private final EndPoint endpoint; - private final ClientManager<SyncDataNodeInternalServiceClient> clientManager; + private final ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager; public SyncDataNodeInternalServiceClient( TProtocolFactory protocolFactory, int connectionTimeout, EndPoint endpoint, - ClientManager<SyncDataNodeInternalServiceClient> clientManager) + ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager) throws TTransportException { super( protocolFactory.getProtocol( @@ -84,7 +84,7 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client { extends BaseClientFactory<EndPoint, SyncDataNodeInternalServiceClient> { public Factory( - ClientManager<SyncDataNodeInternalServiceClient> clientManager, + ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager, ClientManagerProperty<SyncDataNodeInternalServiceClient> clientManagerProperty) { super(clientManager, clientManagerProperty); } diff --git a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java index 63f6c7a45c..e24e5d4e94 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java @@ -38,10 +38,10 @@ public class ClientPoolFactory { private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); public static class SyncConfigNodeIServiceClientPoolFactory - implements IClientPoolFactory<SyncConfigNodeIServiceClient> { + implements IClientPoolFactory<EndPoint, SyncConfigNodeIServiceClient> { @Override public KeyedObjectPool<EndPoint, SyncConfigNodeIServiceClient> createClientPool( - ClientManager<SyncConfigNodeIServiceClient> manager) { + ClientManager<EndPoint, SyncConfigNodeIServiceClient> manager) { ClientManagerProperty<SyncConfigNodeIServiceClient> property = new ClientManagerProperty.Builder<SyncConfigNodeIServiceClient>() .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) @@ -53,10 +53,10 @@ public class ClientPoolFactory { } public static class AsyncConfigNodeIServiceClientPoolFactory - implements IClientPoolFactory<AsyncConfigNodeIServiceClient> { + implements IClientPoolFactory<EndPoint, AsyncConfigNodeIServiceClient> { @Override public KeyedObjectPool<EndPoint, AsyncConfigNodeIServiceClient> createClientPool( - ClientManager<AsyncConfigNodeIServiceClient> manager) { + ClientManager<EndPoint, AsyncConfigNodeIServiceClient> manager) { ClientManagerProperty<AsyncConfigNodeIServiceClient> property = new ClientManagerProperty.Builder<AsyncConfigNodeIServiceClient>() .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) @@ -69,10 +69,10 @@ public class ClientPoolFactory { } public static class SyncDataNodeInternalServiceClientPoolFactory - implements IClientPoolFactory<SyncDataNodeInternalServiceClient> { + implements IClientPoolFactory<EndPoint, SyncDataNodeInternalServiceClient> { @Override public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool( - ClientManager<SyncDataNodeInternalServiceClient> manager) { + ClientManager<EndPoint, SyncDataNodeInternalServiceClient> manager) { ClientManagerProperty<SyncDataNodeInternalServiceClient> property = new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>() .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) @@ -84,10 +84,10 @@ public class ClientPoolFactory { } public static class AsyncDataNodeInternalServiceClientPoolFactory - implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> { + implements IClientPoolFactory<EndPoint, AsyncDataNodeInternalServiceClient> { @Override public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool( - ClientManager<AsyncDataNodeInternalServiceClient> manager) { + ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> manager) { ClientManagerProperty<AsyncDataNodeInternalServiceClient> property = new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>() .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java index 00ac180429..48bb173b83 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java +++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java @@ -36,14 +36,14 @@ import java.io.IOException; public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient { private final EndPoint endpoint; - private final ClientManager<AsyncConfigNodeIServiceClient> clientManager; + private final ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager; public AsyncConfigNodeIServiceClient( TProtocolFactory protocolFactory, int connectionTimeout, EndPoint endpoint, TAsyncClientManager tClientManager, - ClientManager<AsyncConfigNodeIServiceClient> clientManager) + ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager) throws IOException { super( protocolFactory, @@ -91,7 +91,7 @@ public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient { extends AsyncBaseClientFactory<EndPoint, AsyncConfigNodeIServiceClient> { public Factory( - ClientManager<AsyncConfigNodeIServiceClient> clientManager, + ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager, ClientManagerProperty<AsyncConfigNodeIServiceClient> clientManagerProperty) { super(clientManager, clientManagerProperty); } diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java index 1701083464..5b4e81e775 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java +++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java @@ -36,14 +36,14 @@ import java.io.IOException; public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncClient { private final EndPoint endpoint; - private final ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager; + private final ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager; public AsyncDataNodeDataBlockServiceClient( TProtocolFactory protocolFactory, int connectionTimeout, EndPoint endpoint, TAsyncClientManager tClientManager, - ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager) + ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager) throws IOException { super( protocolFactory, @@ -91,7 +91,7 @@ public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncC extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeDataBlockServiceClient> { public Factory( - ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager, + ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager, ClientManagerProperty<AsyncDataNodeDataBlockServiceClient> clientManagerProperty) { super(clientManager, clientManagerProperty); } diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java index 60e01bfc0c..ab9121f8de 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java +++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java @@ -39,13 +39,13 @@ import java.net.SocketException; public class SyncConfigNodeIServiceClient extends ConfigIService.Client { private final EndPoint endpoint; - private final ClientManager<SyncConfigNodeIServiceClient> clientManager; + private final ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager; public SyncConfigNodeIServiceClient( TProtocolFactory protocolFactory, int connectionTimeout, EndPoint endpoint, - ClientManager<SyncConfigNodeIServiceClient> clientManager) + ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager) throws TTransportException { super( protocolFactory.getProtocol( @@ -82,7 +82,7 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client { public static class Factory extends BaseClientFactory<EndPoint, SyncConfigNodeIServiceClient> { public Factory( - ClientManager<SyncConfigNodeIServiceClient> clientManager, + ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager, ClientManagerProperty<SyncConfigNodeIServiceClient> clientManagerProperty) { super(clientManager, clientManagerProperty); } diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java index f6188709a9..7f598e6159 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java +++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java @@ -39,13 +39,13 @@ import java.net.SocketException; public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client { private final EndPoint endpoint; - private final ClientManager<SyncDataNodeDataBlockServiceClient> clientManager; + private final ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager; public SyncDataNodeDataBlockServiceClient( TProtocolFactory protocolFactory, int connectionTimeout, EndPoint endpoint, - ClientManager<SyncDataNodeDataBlockServiceClient> clientManager) + ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager) throws TTransportException { super( protocolFactory.getProtocol( @@ -83,7 +83,7 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client extends BaseClientFactory<EndPoint, SyncDataNodeDataBlockServiceClient> { public Factory( - ClientManager<SyncDataNodeDataBlockServiceClient> clientManager, + ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager, ClientManagerProperty<SyncDataNodeDataBlockServiceClient> clientManagerProperty) { super(clientManager, clientManagerProperty); }
