This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-parallel-connector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bf1b56b3f34f27fd1c8d2bf4c18538c44c640e68 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jun 8 02:44:45 2023 +0800 prepare for async client mode dev iteration --- .../iotdb/commons/client/ClientPoolFactory.java | 24 ++++ .../async/AsyncPipeDataTransferServiceClient.java | 158 +++++++++++++++++++++ .../{ => v1}/IoTDBThriftConnectorClient.java | 2 +- .../pipe/connector/v1/IoTDBThriftConnectorV1.java | 1 - 4 files changed, 183 insertions(+), 2 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index b156ec47851..972ffeea774 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient; +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.client.property.ClientPoolProperty; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; @@ -220,4 +221,27 @@ public class ClientPoolFactory { .getConfig()); } } + + public static class AsyncPipeDataTransferServiceClientPoolFactory + implements IClientPoolFactory<TEndPoint, AsyncPipeDataTransferServiceClient> { + + @Override + public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> createClientPool( + ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) { + return new GenericKeyedObjectPool<>( + new AsyncPipeDataTransferServiceClient.Factory( + manager, + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) + .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .build(), + ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), + new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>() + .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode()) + .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode()) + .build() + .getConfig()); + } + } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java new file mode 100644 index 00000000000..5703bf29224 --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.async; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ThriftClient; +import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; +import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.rpc.TNonblockingSocketWrapper; +import org.apache.iotdb.service.rpc.thrift.IClientRPCService; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.async.TAsyncClientManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncClient + implements ThriftClient { + + private static final Logger LOGGER = + LoggerFactory.getLogger(AsyncPipeDataTransferServiceClient.class); + + private final boolean printLogWhenEncounterException; + + private final TEndPoint endpoint; + private final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager; + + public AsyncPipeDataTransferServiceClient( + ThriftClientProperty property, + TEndPoint endpoint, + TAsyncClientManager tClientManager, + ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager) + throws IOException { + super( + property.getProtocolFactory(), + tClientManager, + TNonblockingSocketWrapper.wrap( + endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); + this.endpoint = endpoint; + this.clientManager = clientManager; + } + + @Override + public void onComplete() { + super.onComplete(); + returnSelf(); + } + + @Override + public void onError(Exception e) { + super.onError(e); + ThriftClient.resolveException(e, this); + returnSelf(); + } + + @Override + public void invalidate() { + if (!hasError()) { + super.onError(new Exception("This client has been invalidated")); + } + } + + @Override + public void invalidateAll() { + clientManager.clear(endpoint); + } + + @Override + public boolean printLogWhenEncounterException() { + return printLogWhenEncounterException; + } + + /** + * return self, the method doesn't need to be called by the user and will be triggered after the + * RPC is finished. + */ + private void returnSelf() { + clientManager.returnClient(endpoint, this); + } + + private void close() { + ___transport.close(); + ___currentMethod = null; + } + + private boolean isReady() { + try { + checkReady(); + return true; + } catch (Exception e) { + if (printLogWhenEncounterException) { + LOGGER.error("Unexpected exception occurs in {} : {}", this, e.getMessage()); + } + return false; + } + } + + @Override + public String toString() { + return String.format("AsyncPipeDataTransferServiceClient{%s}", endpoint); + } + + public static class Factory + extends AsyncThriftClientFactory<TEndPoint, AsyncPipeDataTransferServiceClient> { + + public Factory( + ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> clientManager, + ThriftClientProperty thriftClientProperty, + String threadName) { + super(clientManager, thriftClientProperty, threadName); + } + + @Override + public void destroyObject( + TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) { + pooledObject.getObject().close(); + } + + @Override + public PooledObject<AsyncPipeDataTransferServiceClient> makeObject(TEndPoint endPoint) + throws Exception { + return new DefaultPooledObject<>( + new AsyncPipeDataTransferServiceClient( + thriftClientProperty, + endPoint, + tManagers[clientCnt.incrementAndGet() % tManagers.length], + clientManager)); + } + + @Override + public boolean validateObject( + TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) { + return pooledObject.getObject().isReady(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java similarity index 97% rename from server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java rename to server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java index db0047372db..c8f506fdded 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.connector; +package org.apache.iotdb.db.pipe.connector.v1; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java index 04719ad2f44..919d345acde 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java @@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.config.PipeConnectorConstant; -import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient; import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq; import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
