This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-sync-infinite-loop
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a4dd3e5803c1399068b12e49af75ed4eda156c5d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri May 10 18:17:15 2024 +0800

    Pipe: fix infinite loop with lock in async connector (which may cause 
selector & connector worker deadlock)
---
 .../async/IoTDBDataRegionAsyncConnector.java       | 79 +++++++++++-----------
 .../PipeTransferTsFileInsertionEventHandler.java   | 13 ++--
 2 files changed, 49 insertions(+), 43 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 1e01b824220..a5b69cea473 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -54,10 +54,10 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Objects;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
@@ -83,14 +83,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private IoTDBDataNodeAsyncClientManager clientManager;
 
   private final IoTDBDataRegionSyncConnector retryConnector = new 
IoTDBDataRegionSyncConnector();
-  private final PriorityBlockingQueue<Event> retryEventQueue =
-      new PriorityBlockingQueue<>(
-          11,
-          Comparator.comparing(
-              e ->
-                  // Non-enriched events will be put at the front of the queue,
-                  // because they are more likely to be lost and need to be 
retried first.
-                  e instanceof EnrichedEvent ? ((EnrichedEvent) 
e).getCommitId() : 0));
+  private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
 
   private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
 
@@ -359,37 +352,47 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
    * @see PipeConnector#transfer(TabletInsertionEvent) for more details.
    * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
    */
-  private synchronized void transferQueuedEventsIfNecessary() throws Exception 
{
+  private void transferQueuedEventsIfNecessary() throws Exception {
+    final int maxRetryTimes = retryEventQueue.size();
+    int alreadyRetriedTimes = 0;
+
     while (!retryEventQueue.isEmpty()) {
-      final Event peekedEvent = retryEventQueue.peek();
-
-      if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-        retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) 
peekedEvent);
-      } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
-        retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
-      } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
-        // Using the async connector to transfer the event for performance.
-        transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent);
-      } else {
-        LOGGER.warn(
-            "IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.", peekedEvent);
-      }
+      synchronized (this) {
+        if (isClosed.get() || retryEventQueue.isEmpty() || 
alreadyRetriedTimes++ >= maxRetryTimes) {
+          return;
+        }
 
-      if (peekedEvent instanceof EnrichedEvent) {
-        ((EnrichedEvent) peekedEvent)
-            
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true);
-      }
+        final Event peekedEvent = retryEventQueue.peek();
+
+        if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+          retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) 
peekedEvent);
+        } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
+          retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
+        } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
+          // Using the async connector to transfer the event for performance.
+          transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent);
+        } else {
+          LOGGER.warn(
+              "IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.",
+              peekedEvent);
+        }
 
-      final Event polledEvent = retryEventQueue.poll();
-      if (polledEvent != peekedEvent) {
-        LOGGER.error(
-            "The event polled from the queue is not the same as the event 
peeked from the queue. "
-                + "Peeked event: {}, polled event: {}.",
-            peekedEvent,
-            polledEvent);
-      }
-      if (polledEvent != null && LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Polled event {} from retry queue.", polledEvent);
+        if (peekedEvent instanceof EnrichedEvent) {
+          ((EnrichedEvent) peekedEvent)
+              
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true);
+        }
+
+        final Event polledEvent = retryEventQueue.poll();
+        if (polledEvent != peekedEvent) {
+          LOGGER.error(
+              "The event polled from the queue is not the same as the event 
peeked from the queue. "
+                  + "Peeked event: {}, polled event: {}.",
+              peekedEvent,
+              polledEvent);
+        }
+        if (polledEvent != null && LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Polled event {} from retry queue.", polledEvent);
+        }
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 213d980c186..1c108a12691 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -242,11 +242,14 @@ public class PipeTransferTsFileInsertionEventHandler
     } catch (final IOException e) {
       LOGGER.warn("Failed to close file reader when failed to transfer file.", 
e);
     } finally {
-      connector.addFailureEventToRetryQueue(event);
-
-      if (client != null) {
-        client.setShouldReturnSelf(true);
-        client.returnSelf();
+      try {
+        // addFailureEventToRetryQueue's execution may be blocked, so return 
the client first
+        if (client != null) {
+          client.setShouldReturnSelf(true);
+          client.returnSelf();
+        }
+      } finally {
+        connector.addFailureEventToRetryQueue(event);
       }
     }
   }

Reply via email to