This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch client_manager in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 85f4bba214677dfa61bdec81a1e666a0531d7c1e Author: LebronAl <[email protected]> AuthorDate: Tue Apr 19 14:49:45 2022 +0800 init --- .../iotdb/confignode/client/ClientPoolFactory.java | 68 +++++++++++ .../iotdb/confignode/conf/ConfigNodeConf.java | 27 ++++- node-commons/pom.xml | 10 +- .../commons/client/AsyncBaseClientFactory.java | 47 ++++++++ .../iotdb/commons/client/BaseClientFactory.java | 41 +++++++ .../apache/iotdb/commons/client/ClientManager.java | 71 ++++++++++++ .../commons/client/ClientManagerProperty.java | 123 ++++++++++++++++++++ .../iotdb/commons/client/IClientManager.java | 32 ++++++ .../iotdb/commons/client/IClientPoolFactory.java | 30 +++++ .../async/AsyncDataNodeInternalServiceClient.java | 126 +++++++++++++++++++++ .../sync/SyncDataNodeInternalServiceClient.java | 116 +++++++++++++++++++ .../apache/iotdb/db/client/ClientPoolFactory.java | 101 +++++++++++++++++ .../async/AsyncConfigNodeIServiceClient.java | 125 ++++++++++++++++++++ .../async/AsyncDataNodeDataBlockServiceClient.java | 125 ++++++++++++++++++++ .../client/sync/SyncConfigNodeIServiceClient.java | 114 +++++++++++++++++++ .../sync/SyncDataNodeDataBlockServiceClient.java | 115 +++++++++++++++++++ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 ++++ 17 files changed, 1290 insertions(+), 5 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java new file mode 100644 index 0000000000..73bb856065 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.client; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.commons.client.IClientPoolFactory; +import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.confignode.conf.ConfigNodeConf; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import org.apache.commons.pool2.KeyedObjectPool; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; + +public class ClientPoolFactory { + + private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf(); + + public static class SyncDataNodeInternalServiceClientPoolFactory + implements IClientPoolFactory<SyncDataNodeInternalServiceClient> { + @Override + public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool( + ClientManager<SyncDataNodeInternalServiceClient> manager) { + ClientManagerProperty<SyncDataNodeInternalServiceClient> property = + new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>() + .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) + .build(); + return new GenericKeyedObjectPool<>( + new SyncDataNodeInternalServiceClient.Factory(manager, property), property.getConfig()); + } + } + + public static class AsyncDataNodeInternalServiceClientPoolFactory + implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> { + @Override + public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool( + ClientManager<AsyncDataNodeInternalServiceClient> manager) { + ClientManagerProperty<AsyncDataNodeInternalServiceClient> property = + new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>() + .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) + .setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool()) + .build(); + return new GenericKeyedObjectPool<>( + new AsyncDataNodeInternalServiceClient.Factory(manager, property), property.getConfig()); + } + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java index 27a60ea35b..9a9b6aa336 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java @@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.RpcUtils; import java.io.File; import java.util.Collections; +import java.util.concurrent.TimeUnit; public class ConfigNodeConf { @@ -32,10 +33,12 @@ public class ConfigNodeConf { /** used for communication between data node and config node */ private int rpcPort = 22277; - /** used for communication between config node and config node */ private int internalPort = 22278; + /** Thrift socket and connection timeout between data node and config node */ + private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20); + /** ConfigNode consensus protocol */ private String configNodeConsensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus"; @@ -46,6 +49,15 @@ public class ConfigNodeConf { private Endpoint[] configNodeGroupAddressList = Collections.singletonList(new Endpoint("0.0.0.0", 22278)).toArray(new Endpoint[0]); + /** + * ClientPool will have so many selector threads (TAsyncClientManager) to distribute to its + * clients. + */ + private final int selectorNumOfClientPool = + Runtime.getRuntime().availableProcessors() / 4 > 0 + ? Runtime.getRuntime().availableProcessors() / 4 + : 1; + /** Number of SeriesPartitionSlots per StorageGroup */ private int seriesPartitionSlotNum = 10000; @@ -141,6 +153,10 @@ public class ConfigNodeConf { this.seriesPartitionExecutorClass = seriesPartitionExecutorClass; } + public int getSelectorNumOfClientPool() { + return selectorNumOfClientPool; + } + public long getTimePartitionInterval() { return timePartitionInterval; } @@ -213,6 +229,15 @@ public class ConfigNodeConf { this.internalPort = internalPort; } + public int getConnectionTimeoutInMS() { + return connectionTimeoutInMS; + } + + public ConfigNodeConf setConnectionTimeoutInMS(int connectionTimeoutInMS) { + this.connectionTimeoutInMS = connectionTimeoutInMS; + return this; + } + public String getConsensusDir() { return consensusDir; } diff --git a/node-commons/pom.xml b/node-commons/pom.xml index 3997cc6c00..f4e3587e38 100644 --- a/node-commons/pom.xml +++ b/node-commons/pom.xml @@ -39,7 +39,6 @@ <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> - <version>${thrift.version}</version> </dependency> <dependency> <groupId>org.apache.iotdb</groupId> @@ -47,9 +46,12 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>[${guava.version},)</version> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-pool2</artifactId> </dependency> </dependencies> <build> diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java new file mode 100644 index 0000000000..b710f5482c --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client; + +import org.apache.thrift.async.TAsyncClientManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class AsyncBaseClientFactory<K, T> extends BaseClientFactory<K, T> { + + private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class); + protected TAsyncClientManager[] tManagers; + protected AtomicInteger clientCnt = new AtomicInteger(); + + protected AsyncBaseClientFactory( + ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) { + super(clientManager, clientManagerProperty); + tManagers = new TAsyncClientManager[clientManagerProperty.getSelectorNumOfAsyncClientPool()]; + for (int i = 0; i < tManagers.length; i++) { + try { + tManagers[i] = new TAsyncClientManager(); + } catch (IOException e) { + logger.error("Cannot create Async client factory", e); + } + } + } +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java new file mode 100644 index 0000000000..f628692152 --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client; + +import org.apache.commons.pool2.KeyedPooledObjectFactory; +import org.apache.commons.pool2.PooledObject; + +public abstract class BaseClientFactory<K, T> implements KeyedPooledObjectFactory<K, T> { + + protected ClientManager<T> clientManager; + protected ClientManagerProperty<T> clientManagerProperty; + + public BaseClientFactory( + ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) { + this.clientManager = clientManager; + this.clientManagerProperty = clientManagerProperty; + } + + @Override + public void activateObject(K node, PooledObject<T> pooledObject) {} + + @Override + public void passivateObject(K node, PooledObject<T> pooledObject) {} +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java new file mode 100644 index 0000000000..10ad2090c0 --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; + +import org.apache.commons.pool2.KeyedObjectPool; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; + +public class ClientManager<E> implements IClientManager<E> { + + private static final Logger logger = LoggerFactory.getLogger(ClientManager.class); + + private final KeyedObjectPool<EndPoint, E> pool; + + public ClientManager(IClientPoolFactory<E> factory) { + pool = factory.createClientPool(this); + } + + @Override + public Optional<E> borrowClient(EndPoint node) throws IOException { + Optional<E> client = Optional.empty(); + try { + client = Optional.of(pool.borrowObject(node)); + } catch (TTransportException e) { + // external needs to check transport related exception + throw new IOException(e); + } catch (IOException e) { + // external needs the IOException to check connection + throw e; + } catch (Exception e) { + // external doesn't care of other exceptions + logger.error(String.format("Borrow client from pool for node %s failed.", node), e); + } + return client; + } + + @Override + public void returnClient(EndPoint node, E client) { + if (client != null && node != null) { + try { + pool.returnObject(node, client); + } catch (Exception e) { + logger.error( + String.format("Return client %s for node %s to pool failed.", client, node), e); + } + } + } +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java new file mode 100644 index 0000000000..b35f163257 --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client; + +import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocolFactory; + +import java.time.Duration; + +public class ClientManagerProperty<T> { + + private final GenericKeyedObjectPoolConfig<T> config; + + // thrift client config + private final TProtocolFactory protocolFactory; + private int connectionTimeoutMs = 20_000; + private int selectorNumOfAsyncClientPool = 1; + + public ClientManagerProperty( + GenericKeyedObjectPoolConfig<T> config, + TProtocolFactory protocolFactory, + int connectionTimeoutMs, + int selectorNumOfAsyncClientPool) { + this.config = config; + this.protocolFactory = protocolFactory; + this.connectionTimeoutMs = connectionTimeoutMs; + this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool; + } + + public GenericKeyedObjectPoolConfig<T> getConfig() { + return config; + } + + public TProtocolFactory getProtocolFactory() { + return protocolFactory; + } + + public int getConnectionTimeoutMs() { + return connectionTimeoutMs; + } + + public int getSelectorNumOfAsyncClientPool() { + return selectorNumOfAsyncClientPool; + } + + public static class Builder<T> { + + // pool config + private long waitClientTimeoutMS = 20_000; + private int maxConnectionForEachNode = 1_000; + private int maxIdleConnectionForEachNode = 1_000; + + // thrift client config + private boolean rpcThriftCompressionEnabled = false; + private int connectionTimeoutMs = 20_000; + private int selectorNumOfAsyncClientPool = 1; + + public Builder<T> setWaitClientTimeoutMS(long waitClientTimeoutMS) { + this.waitClientTimeoutMS = waitClientTimeoutMS; + return this; + } + + public Builder<T> setMaxConnectionForEachNode(int maxConnectionForEachNode) { + this.maxConnectionForEachNode = maxConnectionForEachNode; + return this; + } + + public Builder<T> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) { + this.maxIdleConnectionForEachNode = maxIdleConnectionForEachNode; + return this; + } + + public Builder<T> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) { + this.rpcThriftCompressionEnabled = rpcThriftCompressionEnabled; + return this; + } + + public Builder<T> setConnectionTimeoutMs(int connectionTimeoutMs) { + this.connectionTimeoutMs = connectionTimeoutMs; + return this; + } + + public Builder<T> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) { + this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool; + return this; + } + + public ClientManagerProperty<T> build() { + GenericKeyedObjectPoolConfig<T> poolConfig = new GenericKeyedObjectPoolConfig<>(); + poolConfig.setMaxTotalPerKey(maxConnectionForEachNode); + poolConfig.setMaxIdlePerKey(maxIdleConnectionForEachNode); + poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS)); + poolConfig.setTestOnReturn(true); + poolConfig.setTestOnBorrow(true); + return new ClientManagerProperty<>( + poolConfig, + rpcThriftCompressionEnabled + ? new TCompactProtocol.Factory() + : new TBinaryProtocol.Factory(), + connectionTimeoutMs, + selectorNumOfAsyncClientPool); + } + } +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java new file mode 100644 index 0000000000..11883aefda --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; + +import java.io.IOException; +import java.util.Optional; + +public interface IClientManager<E> { + + Optional<E> borrowClient(EndPoint node) throws IOException; + + void returnClient(EndPoint node, E client); +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java new file mode 100644 index 0000000000..80341bf5f5 --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; + +import org.apache.commons.pool2.KeyedObjectPool; + +public interface IClientPoolFactory<E> { + + KeyedObjectPool<EndPoint, E> createClientPool( + ClientManager<E> manager); +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java new file mode 100644 index 0000000000..6119b799a8 --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.async; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; +import org.apache.iotdb.commons.client.AsyncBaseClientFactory; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.mpp.rpc.thrift.InternalService; +import org.apache.iotdb.rpc.TNonblockingSocketWrapper; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.async.TAsyncClientManager; +import org.apache.thrift.protocol.TProtocolFactory; + +import java.io.IOException; + +/** We put the client in this module because it is used by both ConfigNode and Datanode */ +public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncClient { + + private final EndPoint endpoint; + private final ClientManager<AsyncDataNodeInternalServiceClient> clientManager; + + public AsyncDataNodeInternalServiceClient( + TProtocolFactory protocolFactory, + int connectionTimeout, + EndPoint endpoint, + TAsyncClientManager tClientManager, + ClientManager<AsyncDataNodeInternalServiceClient> clientManager) + throws IOException { + super( + protocolFactory, + tClientManager, + TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); + this.endpoint = endpoint; + this.clientManager = clientManager; + } + + public void close() { + ___transport.close(); + ___currentMethod = null; + } + + public boolean isValid() { + return ___transport != null; + } + + /** + * return self if clientPool is not null, the method doesn't need to call by user, it will trigger + * once client transport complete. + */ + private void returnSelf() { + if (clientManager != null) { + clientManager.returnClient(endpoint, this); + } + } + + @Override + public void onComplete() { + super.onComplete(); + returnSelf(); + } + + public boolean isReady() { + try { + checkReady(); + return true; + } catch (Exception e) { + return false; + } + } + + public static class Factory + extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeInternalServiceClient> { + + public Factory( + ClientManager<AsyncDataNodeInternalServiceClient> clientManager, + ClientManagerProperty<AsyncDataNodeInternalServiceClient> clientManagerProperty) { + super(clientManager, clientManagerProperty); + } + + @Override + public void destroyObject( + EndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> pooledObject) { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<AsyncDataNodeInternalServiceClient> makeObject(EndPoint endPoint) + throws Exception { + TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length]; + tManager = tManager == null ? new TAsyncClientManager() : tManager; + return new DefaultPooledObject<>( + new AsyncDataNodeInternalServiceClient( + clientManagerProperty.getProtocolFactory(), + clientManagerProperty.getConnectionTimeoutMs(), + endPoint, + tManager, + clientManager)); + } + + @Override + public boolean validateObject( + EndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> pooledObject) { + return pooledObject.getObject() != null && pooledObject.getObject().isValid(); + } + } +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java new file mode 100644 index 0000000000..a40104bbbc --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.sync; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; +import org.apache.iotdb.commons.client.BaseClientFactory; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.mpp.rpc.thrift.InternalService; +import org.apache.iotdb.rpc.RpcTransportFactory; +import org.apache.iotdb.rpc.TConfigurationConst; +import org.apache.iotdb.rpc.TimeoutChangeableTransport; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; + +import java.net.SocketException; + +/** We put the client in this module because it is used by both ConfigNode and Datanode */ +public class SyncDataNodeInternalServiceClient extends InternalService.Client { + + private final EndPoint endpoint; + private final ClientManager<SyncDataNodeInternalServiceClient> clientManager; + + public SyncDataNodeInternalServiceClient( + TProtocolFactory protocolFactory, + int connectionTimeout, + EndPoint endpoint, + ClientManager<SyncDataNodeInternalServiceClient> clientManager) + throws TTransportException { + super( + protocolFactory.getProtocol( + RpcTransportFactory.INSTANCE.getTransport( + new TSocket( + TConfigurationConst.defaultTConfiguration, + endpoint.getIp(), + endpoint.getPort(), + connectionTimeout)))); + this.endpoint = endpoint; + this.clientManager = clientManager; + getInputProtocol().getTransport().open(); + } + + public void returnSelf() { + if (clientManager != null) { + clientManager.returnClient(endpoint, this); + } + } + + public void setTimeout(int timeout) { + // the same transport is used in both input and output + ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout); + } + + public void close() { + getInputProtocol().getTransport().close(); + } + + public int getTimeout() throws SocketException { + return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut(); + } + + public static class Factory + extends BaseClientFactory<EndPoint, SyncDataNodeInternalServiceClient> { + + public Factory( + ClientManager<SyncDataNodeInternalServiceClient> clientManager, + ClientManagerProperty<SyncDataNodeInternalServiceClient> clientManagerProperty) { + super(clientManager, clientManagerProperty); + } + + @Override + public void destroyObject( + EndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> pooledObject) { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<SyncDataNodeInternalServiceClient> makeObject(EndPoint endpoint) + throws Exception { + return new DefaultPooledObject<>( + new SyncDataNodeInternalServiceClient( + clientManagerProperty.getProtocolFactory(), + clientManagerProperty.getConnectionTimeoutMs(), + endpoint, + clientManager)); + } + + @Override + public boolean validateObject( + EndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> pooledObject) { + return pooledObject.getObject() != null + && pooledObject.getObject().getInputProtocol().getTransport().isOpen(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java new file mode 100644 index 0000000000..63f6c7a45c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.client; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.commons.client.IClientPoolFactory; +import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.db.client.async.AsyncConfigNodeIServiceClient; +import org.apache.iotdb.db.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.apache.commons.pool2.KeyedObjectPool; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; + +public class ClientPoolFactory { + + private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); + + public static class SyncConfigNodeIServiceClientPoolFactory + implements IClientPoolFactory<SyncConfigNodeIServiceClient> { + @Override + public KeyedObjectPool<EndPoint, SyncConfigNodeIServiceClient> createClientPool( + ClientManager<SyncConfigNodeIServiceClient> manager) { + ClientManagerProperty<SyncConfigNodeIServiceClient> property = + new ClientManagerProperty.Builder<SyncConfigNodeIServiceClient>() + .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable()) + .build(); + return new GenericKeyedObjectPool<>( + new SyncConfigNodeIServiceClient.Factory(manager, property), property.getConfig()); + } + } + + public static class AsyncConfigNodeIServiceClientPoolFactory + implements IClientPoolFactory<AsyncConfigNodeIServiceClient> { + @Override + public KeyedObjectPool<EndPoint, AsyncConfigNodeIServiceClient> createClientPool( + ClientManager<AsyncConfigNodeIServiceClient> manager) { + ClientManagerProperty<AsyncConfigNodeIServiceClient> property = + new ClientManagerProperty.Builder<AsyncConfigNodeIServiceClient>() + .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable()) + .setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool()) + .build(); + return new GenericKeyedObjectPool<>( + new AsyncConfigNodeIServiceClient.Factory(manager, property), property.getConfig()); + } + } + + public static class SyncDataNodeInternalServiceClientPoolFactory + implements IClientPoolFactory<SyncDataNodeInternalServiceClient> { + @Override + public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool( + ClientManager<SyncDataNodeInternalServiceClient> manager) { + ClientManagerProperty<SyncDataNodeInternalServiceClient> property = + new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>() + .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable()) + .build(); + return new GenericKeyedObjectPool<>( + new SyncDataNodeInternalServiceClient.Factory(manager, property), property.getConfig()); + } + } + + public static class AsyncDataNodeInternalServiceClientPoolFactory + implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> { + @Override + public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool( + ClientManager<AsyncDataNodeInternalServiceClient> manager) { + ClientManagerProperty<AsyncDataNodeInternalServiceClient> property = + new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>() + .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable()) + .setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool()) + .build(); + return new GenericKeyedObjectPool<>( + new AsyncDataNodeInternalServiceClient.Factory(manager, property), property.getConfig()); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java new file mode 100644 index 0000000000..00ac180429 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.client.async; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; +import org.apache.iotdb.commons.client.AsyncBaseClientFactory; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.confignode.rpc.thrift.ConfigIService; +import org.apache.iotdb.rpc.TNonblockingSocketWrapper; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.async.TAsyncClientManager; +import org.apache.thrift.protocol.TProtocolFactory; + +import java.io.IOException; + +public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient { + + private final EndPoint endpoint; + private final ClientManager<AsyncConfigNodeIServiceClient> clientManager; + + public AsyncConfigNodeIServiceClient( + TProtocolFactory protocolFactory, + int connectionTimeout, + EndPoint endpoint, + TAsyncClientManager tClientManager, + ClientManager<AsyncConfigNodeIServiceClient> clientManager) + throws IOException { + super( + protocolFactory, + tClientManager, + TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); + this.endpoint = endpoint; + this.clientManager = clientManager; + } + + public void close() { + ___transport.close(); + ___currentMethod = null; + } + + public boolean isValid() { + return ___transport != null; + } + + /** + * return self if clientPool is not null, the method doesn't need to call by user, it will trigger + * once client transport complete. + */ + private void returnSelf() { + if (clientManager != null) { + clientManager.returnClient(endpoint, this); + } + } + + @Override + public void onComplete() { + super.onComplete(); + returnSelf(); + } + + public boolean isReady() { + try { + checkReady(); + return true; + } catch (Exception e) { + return false; + } + } + + public static class Factory + extends AsyncBaseClientFactory<EndPoint, AsyncConfigNodeIServiceClient> { + + public Factory( + ClientManager<AsyncConfigNodeIServiceClient> clientManager, + ClientManagerProperty<AsyncConfigNodeIServiceClient> clientManagerProperty) { + super(clientManager, clientManagerProperty); + } + + @Override + public void destroyObject( + EndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> pooledObject) { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<AsyncConfigNodeIServiceClient> makeObject(EndPoint endPoint) + throws Exception { + TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length]; + tManager = tManager == null ? new TAsyncClientManager() : tManager; + return new DefaultPooledObject<>( + new AsyncConfigNodeIServiceClient( + clientManagerProperty.getProtocolFactory(), + clientManagerProperty.getConnectionTimeoutMs(), + endPoint, + tManager, + clientManager)); + } + + @Override + public boolean validateObject( + EndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> pooledObject) { + return pooledObject.getObject() != null && pooledObject.getObject().isValid(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java new file mode 100644 index 0000000000..1701083464 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.client.async; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; +import org.apache.iotdb.commons.client.AsyncBaseClientFactory; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.mpp.rpc.thrift.DataBlockService; +import org.apache.iotdb.rpc.TNonblockingSocketWrapper; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.async.TAsyncClientManager; +import org.apache.thrift.protocol.TProtocolFactory; + +import java.io.IOException; + +public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncClient { + + private final EndPoint endpoint; + private final ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager; + + public AsyncDataNodeDataBlockServiceClient( + TProtocolFactory protocolFactory, + int connectionTimeout, + EndPoint endpoint, + TAsyncClientManager tClientManager, + ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager) + throws IOException { + super( + protocolFactory, + tClientManager, + TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout)); + this.endpoint = endpoint; + this.clientManager = clientManager; + } + + public void close() { + ___transport.close(); + ___currentMethod = null; + } + + public boolean isValid() { + return ___transport != null; + } + + /** + * return self if clientPool is not null, the method doesn't need to call by user, it will trigger + * once client transport complete. + */ + private void returnSelf() { + if (clientManager != null) { + clientManager.returnClient(endpoint, this); + } + } + + @Override + public void onComplete() { + super.onComplete(); + returnSelf(); + } + + public boolean isReady() { + try { + checkReady(); + return true; + } catch (Exception e) { + return false; + } + } + + public static class Factory + extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeDataBlockServiceClient> { + + public Factory( + ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager, + ClientManagerProperty<AsyncDataNodeDataBlockServiceClient> clientManagerProperty) { + super(clientManager, clientManagerProperty); + } + + @Override + public void destroyObject( + EndPoint endPoint, PooledObject<AsyncDataNodeDataBlockServiceClient> pooledObject) { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<AsyncDataNodeDataBlockServiceClient> makeObject(EndPoint endPoint) + throws Exception { + TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length]; + tManager = tManager == null ? new TAsyncClientManager() : tManager; + return new DefaultPooledObject<>( + new AsyncDataNodeDataBlockServiceClient( + clientManagerProperty.getProtocolFactory(), + clientManagerProperty.getConnectionTimeoutMs(), + endPoint, + tManager, + clientManager)); + } + + @Override + public boolean validateObject( + EndPoint endPoint, PooledObject<AsyncDataNodeDataBlockServiceClient> pooledObject) { + return pooledObject.getObject() != null && pooledObject.getObject().isValid(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java new file mode 100644 index 0000000000..60e01bfc0c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.client.sync; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; +import org.apache.iotdb.commons.client.BaseClientFactory; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.confignode.rpc.thrift.ConfigIService; +import org.apache.iotdb.rpc.RpcTransportFactory; +import org.apache.iotdb.rpc.TConfigurationConst; +import org.apache.iotdb.rpc.TimeoutChangeableTransport; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; + +import java.net.SocketException; + +public class SyncConfigNodeIServiceClient extends ConfigIService.Client { + + private final EndPoint endpoint; + private final ClientManager<SyncConfigNodeIServiceClient> clientManager; + + public SyncConfigNodeIServiceClient( + TProtocolFactory protocolFactory, + int connectionTimeout, + EndPoint endpoint, + ClientManager<SyncConfigNodeIServiceClient> clientManager) + throws TTransportException { + super( + protocolFactory.getProtocol( + RpcTransportFactory.INSTANCE.getTransport( + new TSocket( + TConfigurationConst.defaultTConfiguration, + endpoint.getIp(), + endpoint.getPort(), + connectionTimeout)))); + this.endpoint = endpoint; + this.clientManager = clientManager; + getInputProtocol().getTransport().open(); + } + + public void returnSelf() { + if (clientManager != null) { + clientManager.returnClient(endpoint, this); + } + } + + public void setTimeout(int timeout) { + // the same transport is used in both input and output + ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout); + } + + public void close() { + getInputProtocol().getTransport().close(); + } + + public int getTimeout() throws SocketException { + return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut(); + } + + public static class Factory extends BaseClientFactory<EndPoint, SyncConfigNodeIServiceClient> { + + public Factory( + ClientManager<SyncConfigNodeIServiceClient> clientManager, + ClientManagerProperty<SyncConfigNodeIServiceClient> clientManagerProperty) { + super(clientManager, clientManagerProperty); + } + + @Override + public void destroyObject( + EndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> pooledObject) { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<SyncConfigNodeIServiceClient> makeObject(EndPoint endpoint) + throws Exception { + return new DefaultPooledObject<>( + new SyncConfigNodeIServiceClient( + clientManagerProperty.getProtocolFactory(), + clientManagerProperty.getConnectionTimeoutMs(), + endpoint, + clientManager)); + } + + @Override + public boolean validateObject( + EndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> pooledObject) { + return pooledObject.getObject() != null + && pooledObject.getObject().getInputProtocol().getTransport().isOpen(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java new file mode 100644 index 0000000000..f6188709a9 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.client.sync; + +import org.apache.iotdb.common.rpc.thrift.EndPoint; +import org.apache.iotdb.commons.client.BaseClientFactory; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ClientManagerProperty; +import org.apache.iotdb.mpp.rpc.thrift.DataBlockService; +import org.apache.iotdb.rpc.RpcTransportFactory; +import org.apache.iotdb.rpc.TConfigurationConst; +import org.apache.iotdb.rpc.TimeoutChangeableTransport; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; + +import java.net.SocketException; + +public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client { + + private final EndPoint endpoint; + private final ClientManager<SyncDataNodeDataBlockServiceClient> clientManager; + + public SyncDataNodeDataBlockServiceClient( + TProtocolFactory protocolFactory, + int connectionTimeout, + EndPoint endpoint, + ClientManager<SyncDataNodeDataBlockServiceClient> clientManager) + throws TTransportException { + super( + protocolFactory.getProtocol( + RpcTransportFactory.INSTANCE.getTransport( + new TSocket( + TConfigurationConst.defaultTConfiguration, + endpoint.getIp(), + endpoint.getPort(), + connectionTimeout)))); + this.endpoint = endpoint; + this.clientManager = clientManager; + getInputProtocol().getTransport().open(); + } + + public void returnSelf() { + if (clientManager != null) { + clientManager.returnClient(endpoint, this); + } + } + + public void setTimeout(int timeout) { + // the same transport is used in both input and output + ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout); + } + + public void close() { + getInputProtocol().getTransport().close(); + } + + public int getTimeout() throws SocketException { + return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut(); + } + + public static class Factory + extends BaseClientFactory<EndPoint, SyncDataNodeDataBlockServiceClient> { + + public Factory( + ClientManager<SyncDataNodeDataBlockServiceClient> clientManager, + ClientManagerProperty<SyncDataNodeDataBlockServiceClient> clientManagerProperty) { + super(clientManager, clientManagerProperty); + } + + @Override + public void destroyObject( + EndPoint endpoint, PooledObject<SyncDataNodeDataBlockServiceClient> pooledObject) { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<SyncDataNodeDataBlockServiceClient> makeObject(EndPoint endpoint) + throws Exception { + return new DefaultPooledObject<>( + new SyncDataNodeDataBlockServiceClient( + clientManagerProperty.getProtocolFactory(), + clientManagerProperty.getConnectionTimeoutMs(), + endpoint, + clientManager)); + } + + @Override + public boolean validateObject( + EndPoint endpoint, PooledObject<SyncDataNodeDataBlockServiceClient> pooledObject) { + return pooledObject.getObject() != null + && pooledObject.getObject().getInputProtocol().getTransport().isOpen(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c702a9dba6..e9ee063e4f 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -875,6 +875,18 @@ public class IoTDBConfig { /** Thread keep alive time in ms of data block manager. */ private int dataBlockManagerKeepAliveTimeInMs = 1000; + /** Thrift socket and connection timeout between data node and config node */ + private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20); + + /** + * ClientPool will have so many selector threads (TAsyncClientManager) to distribute to its + * clients. + */ + private final int selectorNumOfClientPool = + Runtime.getRuntime().availableProcessors() / 4 > 0 + ? Runtime.getRuntime().availableProcessors() / 4 + : 1; + public float getUdfMemoryBudgetInMB() { return udfMemoryBudgetInMB; } @@ -2769,6 +2781,18 @@ public class IoTDBConfig { this.dataBlockManagerKeepAliveTimeInMs = dataBlockManagerKeepAliveTimeInMs; } + public int getConnectionTimeoutInMS() { + return connectionTimeoutInMS; + } + + public void setConnectionTimeoutInMS(int connectionTimeoutInMS) { + this.connectionTimeoutInMS = connectionTimeoutInMS; + } + + public int getSelectorNumOfClientPool() { + return selectorNumOfClientPool; + } + public boolean isMppMode() { return mppMode; }
