This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 679fc98021f Pipe: fix infinite loop with lock when retrying syncing 
tsfiles in async connector (which may cause selector & connector worker 
deadlock) (#12501)
679fc98021f is described below

commit 679fc98021fa2d896055237cb8a8cfe2fd7c244a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat May 11 12:35:23 2024 +0800

    Pipe: fix infinite loop with lock when retrying syncing tsfiles in async 
connector (which may cause selector & connector worker deadlock) (#12501)
---
 .../client/IoTDBDataNodeAsyncClientManager.java    |  4 +
 .../async/IoTDBDataRegionAsyncConnector.java       | 85 ++++++++++++----------
 .../PipeTransferTsFileInsertionEventHandler.java   | 12 +--
 .../async/AsyncPipeDataTransferServiceClient.java  |  3 +-
 4 files changed, 58 insertions(+), 46 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index b034a8182ed..729c3dbc8bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBClientManager;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -202,6 +203,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     params.put(
         PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
         CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+
+    
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
     
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
     waitHandshakeFinished(isHandshakeFinished);
 
@@ -219,6 +222,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       resp.set(null);
       exception.set(null);
 
+      
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
       client.pipeTransfer(
           PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
               
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 1e01b824220..aef8f2f340a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -54,10 +54,10 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Objects;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
@@ -83,14 +83,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private IoTDBDataNodeAsyncClientManager clientManager;
 
   private final IoTDBDataRegionSyncConnector retryConnector = new 
IoTDBDataRegionSyncConnector();
-  private final PriorityBlockingQueue<Event> retryEventQueue =
-      new PriorityBlockingQueue<>(
-          11,
-          Comparator.comparing(
-              e ->
-                  // Non-enriched events will be put at the front of the queue,
-                  // because they are more likely to be lost and need to be 
retried first.
-                  e instanceof EnrichedEvent ? ((EnrichedEvent) 
e).getCommitId() : 0));
+  private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
 
   private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
 
@@ -359,37 +352,43 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
    * @see PipeConnector#transfer(TabletInsertionEvent) for more details.
    * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
    */
-  private synchronized void transferQueuedEventsIfNecessary() throws Exception 
{
+  private void transferQueuedEventsIfNecessary() throws Exception {
     while (!retryEventQueue.isEmpty()) {
-      final Event peekedEvent = retryEventQueue.peek();
-
-      if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-        retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) 
peekedEvent);
-      } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
-        retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
-      } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
-        // Using the async connector to transfer the event for performance.
-        transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent);
-      } else {
-        LOGGER.warn(
-            "IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.", peekedEvent);
-      }
+      synchronized (this) {
+        if (isClosed.get() || retryEventQueue.isEmpty()) {
+          return;
+        }
 
-      if (peekedEvent instanceof EnrichedEvent) {
-        ((EnrichedEvent) peekedEvent)
-            
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true);
-      }
+        final Event peekedEvent = retryEventQueue.peek();
+
+        if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+          retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) 
peekedEvent);
+        } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
+          retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
+        } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
+          retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
+        } else {
+          LOGGER.warn(
+              "IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.",
+              peekedEvent);
+        }
 
-      final Event polledEvent = retryEventQueue.poll();
-      if (polledEvent != peekedEvent) {
-        LOGGER.error(
-            "The event polled from the queue is not the same as the event 
peeked from the queue. "
-                + "Peeked event: {}, polled event: {}.",
-            peekedEvent,
-            polledEvent);
-      }
-      if (polledEvent != null && LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Polled event {} from retry queue.", polledEvent);
+        if (peekedEvent instanceof EnrichedEvent) {
+          ((EnrichedEvent) peekedEvent)
+              
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true);
+        }
+
+        final Event polledEvent = retryEventQueue.poll();
+        if (polledEvent != peekedEvent) {
+          LOGGER.error(
+              "The event polled from the queue is not the same as the event 
peeked from the queue. "
+                  + "Peeked event: {}, polled event: {}.",
+              peekedEvent,
+              polledEvent);
+        }
+        if (polledEvent != null && LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Polled event {} from retry queue.", polledEvent);
+        }
       }
     }
   }
@@ -410,7 +409,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
    *
    * @param event event to retry
    */
-  public synchronized void addFailureEventToRetryQueue(final Event event) {
+  public void addFailureEventToRetryQueue(final Event event) {
     if (isClosed.get()) {
       if (event instanceof EnrichedEvent) {
         ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
@@ -422,6 +421,12 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Added event {} to retry queue.", event);
     }
+
+    if (isClosed.get()) {
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+      }
+    }
   }
 
   /**
@@ -429,7 +434,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
    *
    * @param events events to retry
    */
-  public synchronized void addFailureEventsToRetryQueue(final Iterable<Event> 
events) {
+  public void addFailureEventsToRetryQueue(final Iterable<Event> events) {
     for (final Event event : events) {
       addFailureEventToRetryQueue(event);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 213d980c186..aebff325920 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -242,11 +242,13 @@ public class PipeTransferTsFileInsertionEventHandler
     } catch (final IOException e) {
       LOGGER.warn("Failed to close file reader when failed to transfer file.", 
e);
     } finally {
-      connector.addFailureEventToRetryQueue(event);
-
-      if (client != null) {
-        client.setShouldReturnSelf(true);
-        client.returnSelf();
+      try {
+        if (client != null) {
+          client.setShouldReturnSelf(true);
+          client.returnSelf();
+        }
+      } finally {
+        connector.addFailureEventToRetryQueue(event);
       }
     }
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 186db8ede4c..4572516fbee 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -141,7 +141,8 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
         LOGGER.error(
             "Unexpected exception occurs in {}, error msg is {}",
             this,
-            ExceptionUtils.getRootCause(e).toString());
+            ExceptionUtils.getRootCause(e).toString(),
+            e);
       }
       return false;
     }

Reply via email to