This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 39548ab550b Pipe: Fixed event forever pinned and retry never invoked 
when TException encountered in IoTDBThriftAsyncConnector (#11560)
39548ab550b is described below

commit 39548ab550b68e15166b4e0ed066b3fb6be42c65
Author: Caideyipi <[email protected]>
AuthorDate: Fri Nov 17 19:13:07 2023 +0800

    Pipe: Fixed event forever pinned and retry never invoked when TException 
encountered in IoTDBThriftAsyncConnector (#11560)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../thrift/async/IoTDBThriftAsyncConnector.java    | 118 ++++++++-------------
 .../PipeTransferTabletBatchEventHandler.java       |   2 +-
 .../PipeTransferTabletInsertionEventHandler.java   |   2 +-
 .../PipeTransferTsFileInsertionEventHandler.java   |   5 +-
 .../protocol/websocket/WebSocketConnector.java     |  11 +-
 5 files changed, 59 insertions(+), 79 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 9354e10aecd..1e34ef7c4be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -54,13 +54,13 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -79,8 +79,9 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector 
{
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftAsyncConnector.class);
 
-  private static final String FAILED_TO_BORROW_CLIENT_FORMATTER =
-      "Failed to borrow client from client pool for receiver %s:%s.";
+  private static final String THRIFT_ERROR_FORMATTER =
+      "Failed to borrow client from client pool or exception occurred "
+          + "when sending to receiver %s:%s.";
 
   private static final AtomicReference<
           IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
@@ -175,15 +176,13 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    final long requestCommitId = commitIdGenerator.incrementAndGet();
-
     if (isTabletBatchModeEnabled) {
+      final long requestCommitId = commitIdGenerator.incrementAndGet();
       if (tabletBatchBuilder.onEvent(tabletInsertionEvent, requestCommitId)) {
         final PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler =
             new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
 
         transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
-
         tabletBatchBuilder.onSuccess();
       }
     } else {
@@ -196,6 +195,8 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
                     pipeInsertNodeTabletInsertionEvent.getByteBuffer())
                 : PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
                     pipeInsertNodeTabletInsertionEvent.getInsertNode());
+
+        final long requestCommitId = commitIdGenerator.incrementAndGet();
         final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler =
             new PipeTransferTabletInsertNodeEventHandler(
                 requestCommitId, pipeInsertNodeTabletInsertionEvent, 
pipeTransferReq, this);
@@ -208,6 +209,8 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
             PipeTransferTabletRawReq.toTPipeTransferReq(
                 pipeRawTabletInsertionEvent.convertToTablet(),
                 pipeRawTabletInsertionEvent.isAligned());
+
+        final long requestCommitId = commitIdGenerator.incrementAndGet();
         final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
             new PipeTransferTabletRawEventHandler(
                 requestCommitId, pipeRawTabletInsertionEvent, 
pipeTransferTabletRawReq, this);
@@ -224,22 +227,12 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
-
-      try {
-        pipeTransferTabletBatchEventHandler.transfer(client);
-      } catch (TException e) {
-        LOGGER.warn(
-            String.format(
-                "Transfer batched insertion requests to receiver %s:%s error, 
retrying...",
-                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
-            e);
-      }
+      pipeTransferTabletBatchEventHandler.transfer(client);
     } catch (Exception ex) {
-      pipeTransferTabletBatchEventHandler.onError(ex);
       LOGGER.warn(
-          String.format(
-              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
+          String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
+      pipeTransferTabletBatchEventHandler.onError(ex);
     }
   }
 
@@ -250,22 +243,12 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
-
-      try {
-        pipeTransferInsertNodeReqHandler.transfer(client);
-      } catch (TException e) {
-        LOGGER.warn(
-            String.format(
-                "Transfer insert node to receiver %s:%s error, retrying...",
-                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
-            e);
-      }
+      pipeTransferInsertNodeReqHandler.transfer(client);
     } catch (Exception ex) {
-      pipeTransferInsertNodeReqHandler.onError(ex);
       LOGGER.warn(
-          String.format(
-              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
+          String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
+      pipeTransferInsertNodeReqHandler.onError(ex);
     }
   }
 
@@ -275,22 +258,12 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
-
-      try {
-        pipeTransferTabletReqHandler.transfer(client);
-      } catch (TException e) {
-        LOGGER.warn(
-            String.format(
-                "Transfer tablet to receiver %s:%s error, retrying...",
-                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
-            e);
-      }
+      pipeTransferTabletReqHandler.transfer(client);
     } catch (Exception ex) {
-      pipeTransferTabletReqHandler.onError(ex);
       LOGGER.warn(
-          String.format(
-              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
+          String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
+      pipeTransferTabletReqHandler.onError(ex);
     }
   }
 
@@ -327,6 +300,11 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
+    // Just in case. To avoid the case that exception occurred when 
constructing the handler.
+    if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
+      throw new 
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
+    }
+
     final long requestCommitId = commitIdGenerator.incrementAndGet();
     final PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler =
         new PipeTransferTsFileInsertionEventHandler(
@@ -342,22 +320,12 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
-
-      try {
-        pipeTransferTsFileInsertionEventHandler.transfer(client);
-      } catch (TException e) {
-        LOGGER.warn(
-            String.format(
-                "Transfer tsfile to receiver %s:%s error, retrying...",
-                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
-            e);
-      }
+      pipeTransferTsFileInsertionEventHandler.transfer(client);
     } catch (Exception ex) {
-      pipeTransferTsFileInsertionEventHandler.onError(ex);
       LOGGER.warn(
-          String.format(
-              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
+          String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
+      pipeTransferTsFileInsertionEventHandler.onError(ex);
     }
   }
 
@@ -373,20 +341,13 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
   }
 
   private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint 
targetNodeUrl)
-      throws PipeConnectionException {
-    try {
-      while (true) {
-        final AsyncPipeDataTransferServiceClient client =
-            asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
-        if (handshakeIfNecessary(targetNodeUrl, client)) {
-          return client;
-        }
+      throws Exception {
+    while (true) {
+      final AsyncPipeDataTransferServiceClient client =
+          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+      if (handshakeIfNecessary(targetNodeUrl, client)) {
+        return client;
       }
-    } catch (Exception e) {
-      throw new PipeConnectionException(
-          String.format(
-              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
-          e);
     }
   }
 
@@ -495,9 +456,7 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
             "IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.", event);
       }
 
-      if (event instanceof EnrichedEvent) {
-        commit(requestCommitId, (EnrichedEvent) event);
-      }
+      commit(requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent) 
event : null);
 
       retryEventQueue.poll();
     }
@@ -546,7 +505,16 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
 
     while (!commitQueue.isEmpty()) {
       final Pair<Long, Runnable> committer = commitQueue.peek();
-      if (lastCommitId.get() + 1 != committer.left) {
+
+      // If the commit id is less than or equals to the last commit id, it 
means that
+      // the event has been committed before, and has been retried. So the 
event can
+      // be ignored.
+      if (committer.left <= lastCommitId.get()) {
+        commitQueue.poll();
+        continue;
+      }
+
+      if (committer.left != lastCommitId.get() + 1) {
         break;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 8929bed1916..a3a316983cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -86,7 +86,7 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
   @Override
   public void onError(Exception exception) {
     LOGGER.warn(
-        "Failed to transfer TabletInsertionEvent {} (requestCommitId={}).",
+        "Failed to transfer TabletInsertionEvent batch {} (request commit 
ids={}).",
         events,
         requestCommitIds,
         exception);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index c76b81011e3..619a0f961b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -94,7 +94,7 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
   @Override
   public void onError(Exception exception) {
     LOGGER.warn(
-        "Failed to transfer TabletInsertionEvent {} (requestCommitId={}).",
+        "Failed to transfer TabletInsertionEvent {} (request commit id={}).",
         event,
         requestCommitId,
         exception);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 47bc2eb1e1f..f3b130eae64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -164,7 +164,10 @@ public class PipeTransferTsFileInsertionEventHandler
   @Override
   public void onError(Exception exception) {
     LOGGER.warn(
-        "Failed to transfer tsfile {} (request commit id {}).", tsFile, 
requestCommitId, exception);
+        "Failed to transfer TsFileInsertionEvent {} (request commit id {}).",
+        tsFile,
+        requestCommitId,
+        exception);
 
     try {
       if (reader != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 0bdc67498ba..4fc3c9a41e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -172,7 +172,16 @@ public class WebSocketConnector implements PipeConnector {
 
     while (!commitQueue.isEmpty()) {
       final Pair<Long, Runnable> committer = commitQueue.peek();
-      if (lastCommitId.get() + 1 != committer.left) {
+
+      // If the commit id is less than or equals to the last commit id, it 
means that
+      // the event has been committed before, and has been retried. So the 
event can
+      // be ignored.
+      if (committer.left <= lastCommitId.get()) {
+        commitQueue.poll();
+        continue;
+      }
+
+      if (committer.left != lastCommitId.get() + 1) {
         break;
       }
 

Reply via email to