This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d81881fd8b [IOTDB-6261] Pipe: support leader cache in 
iotdb-thrift-async-connector (#11807)
3d81881fd8b is described below

commit 3d81881fd8b57a76f32ad8954873b9f657193ff9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Dec 29 15:25:44 2023 +0800

    [IOTDB-6261] Pipe: support leader cache in iotdb-thrift-async-connector 
(#11807)
---
 .../protocol/thrift/IoTDBThriftClientManager.java  |  39 ++++
 .../async/IoTDBThriftAsyncClientManager.java       | 200 ++++++++++++++++++++
 .../thrift/async/IoTDBThriftAsyncConnector.java    | 210 +++++----------------
 .../PipeTransferTabletInsertNodeEventHandler.java  |  12 ++
 .../PipeTransferTabletInsertionEventHandler.java   |  17 +-
 .../handler/PipeTransferTabletRawEventHandler.java |   7 +
 .../thrift/sync/IoTDBThriftSyncClientManager.java  |  30 ++-
 .../thrift/sync/IoTDBThriftSyncConnector.java      |   2 +-
 .../async/AsyncPipeDataTransferServiceClient.java  |   8 +
 9 files changed, 341 insertions(+), 184 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftClientManager.java
new file mode 100644
index 00000000000..9a631020a44
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftClientManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.thrift;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+import java.util.List;
+
+public abstract class IoTDBThriftClientManager {
+
+  protected final List<TEndPoint> endPointList;
+
+  protected long currentClientIndex = 0;
+
+  protected final boolean useLeaderCache;
+  protected static final LeaderCacheManager leaderCacheManager = new 
LeaderCacheManager();
+
+  protected IoTDBThriftClientManager(List<TEndPoint> endPointList, boolean 
useLeaderCache) {
+    this.endPointList = endPointList;
+    this.useLeaderCache = useLeaderCache;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
new file mode 100644
index 00000000000..477165d2bd8
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.thrift.async;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class IoTDBThriftAsyncClientManager extends IoTDBThriftClientManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftAsyncClientManager.class);
+
+  private final Set<TEndPoint> endPointSet;
+
+  private static final AtomicReference<
+          IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
+      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
+  private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
endPoint2Client;
+
+  public IoTDBThriftAsyncClientManager(List<TEndPoint> endPoints, boolean 
useLeaderCache) {
+    super(endPoints, useLeaderCache);
+
+    endPointSet = new HashSet<>(endPoints);
+
+    if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
+      synchronized (IoTDBThriftAsyncConnector.class) {
+        if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
+          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
+              new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
+                  .createClientManager(
+                      new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
+        }
+      }
+    }
+    endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
+  }
+
+  public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+    final int clientSize = endPointList.size();
+    while (true) {
+      final TEndPoint targetNodeUrl = endPointList.get((int) 
(currentClientIndex++ % clientSize));
+      final AsyncPipeDataTransferServiceClient client = 
endPoint2Client.borrowClient(targetNodeUrl);
+      if (handshakeIfNecessary(targetNodeUrl, client)) {
+        return client;
+      }
+    }
+  }
+
+  public AsyncPipeDataTransferServiceClient borrowClient(String deviceId) 
throws Exception {
+    if (!useLeaderCache) {
+      return borrowClient();
+    }
+
+    final TEndPoint endPoint = leaderCacheManager.getLeaderEndPoint(deviceId);
+    if (endPoint == null) {
+      return borrowClient();
+    }
+
+    try {
+      final AsyncPipeDataTransferServiceClient client = 
endPoint2Client.borrowClient(endPoint);
+      if (handshakeIfNecessary(endPoint, client)) {
+        return client;
+      }
+    } catch (Exception e) {
+      LOGGER.warn(
+          "failed to borrow client {}:{} for cached leader.",
+          endPoint.getIp(),
+          endPoint.getPort(),
+          e);
+    }
+
+    return borrowClient();
+  }
+
+  /**
+   * Handshake with the target if necessary.
+   *
+   * @param client client to handshake
+   * @return true if the handshake is already finished, false if the handshake 
is not finished yet
+   *     and finished in this method
+   * @throws Exception if an error occurs.
+   */
+  private boolean handshakeIfNecessary(
+      TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client) 
throws Exception {
+    if (client.isHandshakeFinished()) {
+      return true;
+    }
+
+    final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
+    final AtomicReference<Exception> exception = new AtomicReference<>();
+
+    client.pipeTransfer(
+        PipeTransferHandshakeReq.toTPipeTransferReq(
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+        new AsyncMethodCallback<TPipeTransferResp>() {
+          @Override
+          public void onComplete(TPipeTransferResp response) {
+            if (response.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              LOGGER.warn(
+                  "Handshake error with receiver {}:{}, code: {}, message: 
{}.",
+                  targetNodeUrl.getIp(),
+                  targetNodeUrl.getPort(),
+                  response.getStatus().getCode(),
+                  response.getStatus().getMessage());
+              exception.set(
+                  new PipeConnectionException(
+                      String.format(
+                          "Handshake error with receiver %s:%s, code: %d, 
message: %s.",
+                          targetNodeUrl.getIp(),
+                          targetNodeUrl.getPort(),
+                          response.getStatus().getCode(),
+                          response.getStatus().getMessage())));
+            } else {
+              LOGGER.info(
+                  "Handshake successfully with receiver {}:{}.",
+                  targetNodeUrl.getIp(),
+                  targetNodeUrl.getPort());
+              client.markHandshakeFinished();
+            }
+
+            isHandshakeFinished.set(true);
+          }
+
+          @Override
+          public void onError(Exception e) {
+            LOGGER.warn(
+                "Handshake error with receiver {}:{}.",
+                targetNodeUrl.getIp(),
+                targetNodeUrl.getPort(),
+                e);
+            exception.set(e);
+
+            isHandshakeFinished.set(true);
+          }
+        });
+
+    try {
+      while (!isHandshakeFinished.get()) {
+        Thread.sleep(10);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new PipeException("Interrupted while waiting for handshake 
response.", e);
+    }
+
+    if (exception.get() != null) {
+      throw new PipeConnectionException("Failed to handshake.", 
exception.get());
+    }
+
+    return false;
+  }
+
+  public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
+    if (!useLeaderCache) {
+      return;
+    }
+
+    if (!endPointSet.contains(endPoint)) {
+      endPointList.add(endPoint);
+      endPointSet.add(endPoint);
+    }
+
+    leaderCacheManager.updateLeaderEndPoint(deviceId, endPoint);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index e608b39609c..e8460af85bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -20,13 +20,9 @@
 package org.apache.iotdb.db.pipe.connector.protocol.thrift.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientPoolFactory;
-import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.IoTDBConnector;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -47,41 +43,37 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
 
 public class IoTDBThriftAsyncConnector extends IoTDBConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftAsyncConnector.class);
 
-  private static final String THRIFT_ERROR_FORMATTER =
+  private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
+      "Failed to borrow client from client pool or exception occurred "
+          + "when sending to receiver.";
+  private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
       "Failed to borrow client from client pool or exception occurred "
           + "when sending to receiver %s:%s.";
 
-  private static final AtomicReference<
-          IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
-      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
-  private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
-      asyncPipeDataTransferClientManager;
+  private IoTDBThriftAsyncClientManager clientManager;
 
   private final IoTDBThriftSyncConnector retryConnector = new 
IoTDBThriftSyncConnector();
   private final PriorityBlockingQueue<Event> retryEventQueue =
@@ -95,20 +87,6 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
 
   private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
 
-  public IoTDBThriftAsyncConnector() {
-    if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
-      synchronized (IoTDBThriftAsyncConnector.class) {
-        if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
-          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
-              new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
-                  .createClientManager(
-                      new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
-        }
-      }
-    }
-    asyncPipeDataTransferClientManager = 
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
-  }
-
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     super.validate(validator);
@@ -132,6 +110,13 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
     retryParameters.getAttribute().put(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, 
"false");
     retryConnector.customize(retryParameters, configuration);
 
+    clientManager =
+        new IoTDBThriftAsyncClientManager(
+            nodeUrls,
+            parameters.getBooleanOrDefault(
+                Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
+                CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE));
+
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new 
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
     }
@@ -173,14 +158,12 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    final long commitId = ((EnrichedEvent) tabletInsertionEvent).getCommitId();
-
     if (isTabletBatchModeEnabled) {
       if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
         final PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler =
             new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
 
-        transfer(commitId, pipeTransferTabletBatchEventHandler);
+        transfer(pipeTransferTabletBatchEventHandler);
 
         tabletBatchBuilder.onSuccess();
       }
@@ -198,7 +181,7 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
             new PipeTransferTabletInsertNodeEventHandler(
                 pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
 
-        transfer(commitId, pipeTransferInsertNodeReqHandler);
+        transfer(pipeTransferInsertNodeReqHandler);
       } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
         final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
             (PipeRawTabletInsertionEvent) tabletInsertionEvent;
@@ -210,54 +193,40 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
             new PipeTransferTabletRawEventHandler(
                 pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
 
-        transfer(commitId, pipeTransferTabletReqHandler);
+        transfer(pipeTransferTabletReqHandler);
       }
     }
   }
 
-  private void transfer(
-      long requestCommitId,
-      PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler) 
{
-    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
-
+  private void transfer(PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler) {
+    AsyncPipeDataTransferServiceClient client = null;
     try {
-      final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
+      client = clientManager.borrowClient();
       pipeTransferTabletBatchEventHandler.transfer(client);
     } catch (Exception ex) {
-      LOGGER.warn(
-          String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
-          ex);
+      logOnClientException(client, ex);
       pipeTransferTabletBatchEventHandler.onError(ex);
     }
   }
 
-  private void transfer(
-      long requestCommitId,
-      PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler) {
-    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
-
+  private void transfer(PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler) {
+    AsyncPipeDataTransferServiceClient client = null;
     try {
-      final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
+      client = clientManager.borrowClient();
       pipeTransferInsertNodeReqHandler.transfer(client);
     } catch (Exception ex) {
-      LOGGER.warn(
-          String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
-          ex);
+      logOnClientException(client, ex);
       pipeTransferInsertNodeReqHandler.onError(ex);
     }
   }
 
-  private void transfer(
-      long requestCommitId, PipeTransferTabletRawEventHandler 
pipeTransferTabletReqHandler) {
-    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
-
+  private void transfer(PipeTransferTabletRawEventHandler 
pipeTransferTabletReqHandler) {
+    AsyncPipeDataTransferServiceClient client = null;
     try {
-      final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
+      client = clientManager.borrowClient();
       pipeTransferTabletReqHandler.transfer(client);
     } catch (Exception ex) {
-      LOGGER.warn(
-          String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
-          ex);
+      logOnClientException(client, ex);
       pipeTransferTabletReqHandler.onError(ex);
     }
   }
@@ -303,21 +272,17 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
     final PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler =
         new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, 
this);
 
-    transfer(pipeTsFileInsertionEvent.getCommitId(), 
pipeTransferTsFileInsertionEventHandler);
+    transfer(pipeTransferTsFileInsertionEventHandler);
   }
 
   private void transfer(
-      long requestCommitId,
       PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler) {
-    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
-
+    AsyncPipeDataTransferServiceClient client = null;
     try {
-      final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
+      client = clientManager.borrowClient();
       pipeTransferTsFileInsertionEventHandler.transfer(client);
     } catch (Exception ex) {
-      LOGGER.warn(
-          String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
-          ex);
+      logOnClientException(client, ex);
       pipeTransferTsFileInsertionEventHandler.onError(ex);
     }
   }
@@ -333,93 +298,21 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
     }
   }
 
-  private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint 
targetNodeUrl)
-      throws Exception {
-    while (true) {
-      final AsyncPipeDataTransferServiceClient client =
-          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
-      if (handshakeIfNecessary(targetNodeUrl, client)) {
-        return client;
-      }
-    }
-  }
-
-  /**
-   * Handshake with the target if necessary.
-   *
-   * @param client client to handshake
-   * @return true if the handshake is already finished, false if the handshake 
is not finished yet
-   *     and finished in this method
-   * @throws Exception if an error occurs.
-   */
-  private boolean handshakeIfNecessary(
-      TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client) 
throws Exception {
-    if (client.isHandshakeFinished()) {
-      return true;
-    }
+  //////////////////////////// Leader cache update ////////////////////////////
 
-    final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
-    final AtomicReference<Exception> exception = new AtomicReference<>();
-
-    client.pipeTransfer(
-        PipeTransferHandshakeReq.toTPipeTransferReq(
-            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
-        new AsyncMethodCallback<TPipeTransferResp>() {
-          @Override
-          public void onComplete(TPipeTransferResp response) {
-            if (response.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              LOGGER.warn(
-                  "Handshake error with receiver {}:{}, code: {}, message: 
{}.",
-                  targetNodeUrl.getIp(),
-                  targetNodeUrl.getPort(),
-                  response.getStatus().getCode(),
-                  response.getStatus().getMessage());
-              exception.set(
-                  new PipeConnectionException(
-                      String.format(
-                          "Handshake error with receiver %s:%s, code: %d, 
message: %s.",
-                          targetNodeUrl.getIp(),
-                          targetNodeUrl.getPort(),
-                          response.getStatus().getCode(),
-                          response.getStatus().getMessage())));
-            } else {
-              LOGGER.info(
-                  "Handshake successfully with receiver {}:{}.",
-                  targetNodeUrl.getIp(),
-                  targetNodeUrl.getPort());
-              client.markHandshakeFinished();
-            }
-
-            isHandshakeFinished.set(true);
-          }
-
-          @Override
-          public void onError(Exception e) {
-            LOGGER.warn(
-                "Handshake error with receiver {}:{}.",
-                targetNodeUrl.getIp(),
-                targetNodeUrl.getPort(),
-                e);
-            exception.set(e);
-
-            isHandshakeFinished.set(true);
-          }
-        });
+  public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
+    clientManager.updateLeaderCache(deviceId, endPoint);
+  }
 
-    try {
-      while (!isHandshakeFinished.get()) {
-        Thread.sleep(10);
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new PipeException("Interrupted while waiting for handshake 
response.", e);
-    }
+  //////////////////////////// Exception handlers ////////////////////////////
 
-    if (exception.get() != null) {
-      throw new PipeConnectionException("Failed to handshake.", 
exception.get());
+  private void logOnClientException(AsyncPipeDataTransferServiceClient client, 
Exception e) {
+    if (client == null) {
+      LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e);
+    } else {
+      LOGGER.warn(
+          String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, client.getIp(), 
client.getPort()), e);
     }
-
-    return false;
   }
 
   /**
@@ -462,14 +355,7 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    // requestCommitId can not be generated by commitIdGenerator because the 
commit id must
-    // be bind to a specific InsertTabletEvent or TsFileInsertionEvent, 
otherwise the commit
-    // process will stuck.
-    final long requestCommitId = tabletBatchBuilder.getLastCommitId();
-    final PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler =
-        new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
-
-    transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
+    transfer(new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, 
this));
 
     tabletBatchBuilder.onSuccess();
   }
@@ -483,6 +369,8 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
     retryEventQueue.offer(event);
   }
 
+  //////////////////////////// Operations for close 
////////////////////////////
+
   /**
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
    * its queued events in the output pipe connector.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index c0dcf8670f4..d30b4115218 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
@@ -42,4 +44,14 @@ public class PipeTransferTabletInsertNodeEventHandler
       throws TException {
     client.pipeTransfer(req, this);
   }
+
+  @Override
+  protected void updateLeaderCache(TSStatus status) {
+    final InsertNode insertNode =
+        ((PipeInsertNodeTabletInsertionEvent) 
event).getInsertNodeViaCacheIfPossible();
+    if (insertNode != null) {
+      connector.updateLeaderCache(
+          insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index d1b0ae18c50..6a7f79a3162 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
@@ -39,10 +40,10 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
 
-  private final TabletInsertionEvent event;
-  private final TPipeTransferReq req;
+  protected final TabletInsertionEvent event;
+  protected final TPipeTransferReq req;
 
-  private final IoTDBThriftAsyncConnector connector;
+  protected final IoTDBThriftAsyncConnector connector;
 
   protected PipeTransferTabletInsertionEventHandler(
       TabletInsertionEvent event, TPipeTransferReq req, 
IoTDBThriftAsyncConnector connector) {
@@ -71,16 +72,22 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
       return;
     }
 
-    if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+    final TSStatus status = response.getStatus();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       if (event instanceof EnrichedEvent) {
         ((EnrichedEvent) event)
             
.decreaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName(),
 true);
       }
+      if (status.isSetRedirectNode()) {
+        updateLeaderCache(status);
+      }
     } else {
-      onError(new PipeException(response.getStatus().getMessage()));
+      onError(new PipeException(status.getMessage()));
     }
   }
 
+  protected abstract void updateLeaderCache(TSStatus status);
+
   @Override
   public void onError(Exception exception) {
     LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index 4cf8a768048..78d5983e768 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -42,4 +43,10 @@ public class PipeTransferTabletRawEventHandler
       throws TException {
     client.pipeTransfer(req, this);
   }
+
+  @Override
+  protected void updateLeaderCache(TSStatus status) {
+    connector.updateLeaderCache(
+        ((PipeRawTabletInsertionEvent) event).getDeviceId(), 
status.getRedirectNode());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index 090045624e7..4425439c07f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.client.IoTDBThriftSyncConnectorClient;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.LeaderCacheManager;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -42,7 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class IoTDBThriftSyncClientManager implements Closeable {
+public class IoTDBThriftSyncClientManager extends IoTDBThriftClientManager 
implements Closeable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftSyncClientManager.class);
 
@@ -52,30 +52,22 @@ public class IoTDBThriftSyncClientManager implements 
Closeable {
   private final String trustStorePath;
   private final String trustStorePwd;
 
-  private final boolean useLeaderCache;
-
-  private final List<TEndPoint> endPoints;
   private final Map<TEndPoint, Pair<IoTDBThriftSyncConnectorClient, Boolean>>
       endPoint2ClientAndStatus = new ConcurrentHashMap<>();
 
-  private final LeaderCacheManager leaderCacheManager = new 
LeaderCacheManager();
-
-  private long currentClientIndex = 0;
-
   public IoTDBThriftSyncClientManager(
       List<TEndPoint> endPoints,
       boolean useSSL,
       String trustStorePath,
       String trustStorePwd,
       boolean useLeaderCache) {
+    super(endPoints, useLeaderCache);
+
     this.useSSL = useSSL;
     this.trustStorePath = trustStorePath;
     this.trustStorePwd = trustStorePwd;
 
-    this.useLeaderCache = useLeaderCache;
-
-    this.endPoints = endPoints;
-    for (TEndPoint endPoint : endPoints) {
+    for (final TEndPoint endPoint : endPoints) {
       endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false));
     }
   }
@@ -167,12 +159,12 @@ public class IoTDBThriftSyncClientManager implements 
Closeable {
   }
 
   public Pair<IoTDBThriftSyncConnectorClient, Boolean> getClient() {
-    final int clientSize = endPoints.size();
+    final int clientSize = endPointList.size();
     // Round-robin, find the next alive client
     for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
       final int clientIndex = (int) (currentClientIndex++ % clientSize);
       final Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus =
-          endPoint2ClientAndStatus.get(endPoints.get(clientIndex));
+          endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
       if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
         return clientAndStatus;
       }
@@ -198,7 +190,7 @@ public class IoTDBThriftSyncClientManager implements 
Closeable {
 
     try {
       if (!endPoint2ClientAndStatus.containsKey(endPoint)) {
-        endPoints.add(endPoint);
+        endPointList.add(endPoint);
         endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false));
         reconstructClient(endPoint);
       }
@@ -233,7 +225,11 @@ public class IoTDBThriftSyncClientManager implements 
Closeable {
         LOGGER.info("Client {}:{} closed.", endPoint.getIp(), 
endPoint.getPort());
       } catch (Exception e) {
         LOGGER.warn(
-            "Failed to close client {}:{}, because: {}.", endPoint.getIp(), 
endPoint.getPort(), e);
+            "Failed to close client {}:{}, because: {}.",
+            endPoint.getIp(),
+            endPoint.getPort(),
+            e.getMessage(),
+            e);
       } finally {
         clientAndStatus.setRight(false);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index c87d00ce7f0..0d4ef07d539 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -283,7 +283,7 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
     }
   }
 
-  private void doTransfer() throws IOException {
+  private void doTransfer() {
     Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus = 
clientManager.getClient();
     final TPipeTransferResp resp;
     try {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 83f47956812..2b8110bc7bc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -146,6 +146,14 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     LOGGER.info("Handshake finished for client {}", this);
   }
 
+  public String getIp() {
+    return endpoint.getIp();
+  }
+
+  public int getPort() {
+    return endpoint.getPort();
+  }
+
   @Override
   public String toString() {
     return String.format("AsyncPipeDataTransferServiceClient{%s}, id = {%d}", 
endpoint, id);


Reply via email to