This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-parallel-connector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bf1b56b3f34f27fd1c8d2bf4c18538c44c640e68
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jun 8 02:44:45 2023 +0800

    prepare for async client mode dev iteration
---
 .../iotdb/commons/client/ClientPoolFactory.java    |  24 ++++
 .../async/AsyncPipeDataTransferServiceClient.java  | 158 +++++++++++++++++++++
 .../{ => v1}/IoTDBThriftConnectorClient.java       |   2 +-
 .../pipe/connector/v1/IoTDBThriftConnectorV1.java  |   1 -
 4 files changed, 183 insertions(+), 2 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index b156ec47851..972ffeea774 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.client.property.ClientPoolProperty;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -220,4 +221,27 @@ public class ClientPoolFactory {
               .getConfig());
     }
   }
+
+  public static class AsyncPipeDataTransferServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, 
AsyncPipeDataTransferServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> 
createClientPool(
+        ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new AsyncPipeDataTransferServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                  .build(),
+              ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
+          new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
+              .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
+              .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+              .build()
+              .getConfig());
+    }
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
new file mode 100644
index 00000000000..5703bf29224
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncClient
+    implements ThriftClient {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AsyncPipeDataTransferServiceClient.class);
+
+  private final boolean printLogWhenEncounterException;
+
+  private final TEndPoint endpoint;
+  private final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager;
+
+  public AsyncPipeDataTransferServiceClient(
+      ThriftClientProperty property,
+      TEndPoint endpoint,
+      TAsyncClientManager tClientManager,
+      ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager)
+      throws IOException {
+    super(
+        property.getProtocolFactory(),
+        tClientManager,
+        TNonblockingSocketWrapper.wrap(
+            endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
+    this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+  }
+
+  @Override
+  public void onComplete() {
+    super.onComplete();
+    returnSelf();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    super.onError(e);
+    ThriftClient.resolveException(e, this);
+    returnSelf();
+  }
+
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  @Override
+  public boolean printLogWhenEncounterException() {
+    return printLogWhenEncounterException;
+  }
+
+  /**
+   * return self, the method doesn't need to be called by the user and will be 
triggered after the
+   * RPC is finished.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  private boolean isReady() {
+    try {
+      checkReady();
+      return true;
+    } catch (Exception e) {
+      if (printLogWhenEncounterException) {
+        LOGGER.error("Unexpected exception occurs in {} : {}", this, 
e.getMessage());
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("AsyncPipeDataTransferServiceClient{%s}", endpoint);
+  }
+
+  public static class Factory
+      extends AsyncThriftClientFactory<TEndPoint, 
AsyncPipeDataTransferServiceClient> {
+
+    public Factory(
+        ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager,
+        ThriftClientProperty thriftClientProperty,
+        String threadName) {
+      super(clientManager, thriftClientProperty, threadName);
+    }
+
+    @Override
+    public void destroyObject(
+        TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> 
pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<AsyncPipeDataTransferServiceClient> 
makeObject(TEndPoint endPoint)
+        throws Exception {
+      return new DefaultPooledObject<>(
+          new AsyncPipeDataTransferServiceClient(
+              thriftClientProperty,
+              endPoint,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> 
pooledObject) {
+      return pooledObject.getObject().isReady();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
index db0047372db..c8f506fdded 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/IoTDBThriftConnectorClient.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorClient.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector;
+package org.apache.iotdb.db.pipe.connector.v1;
 
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index 04719ad2f44..919d345acde 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
-import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorClient;
 import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;

Reply via email to