This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.4
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d7a30c2941a0ca031d5afbcbebfce62c678684a8
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed May 7 15:12:04 2025 +0800

    Revert "ShutdownNow all pipe connector subTask works to interrupt all 
threads blocking in lock. (#15411)" (#15462)
    
    This reverts commit acdf8cc0f9a04f9c8601e835bccd46c3d2f09af4.
    
    (cherry picked from commit 750f1deb877a5a5241f66367157047b01a321679)
---
 .../task/execution/PipeSubtaskExecutorManager.java |   7 -
 .../pipeconsensus/PipeConsensusAsyncConnector.java | 157 ++++++---------------
 .../iotdb/db/service/DataNodeShutdownHook.java     |   4 -
 .../agent/task/execution/PipeSubtaskExecutor.java  |   2 +-
 4 files changed, 47 insertions(+), 123 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
index 312091c79c7..eadb75463bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
@@ -48,13 +48,6 @@ public class PipeSubtaskExecutorManager {
     return consensusExecutor;
   }
 
-  public void shutdownAll() {
-    processorExecutor.shutdown();
-    connectorExecutor.shutdown();
-    subscriptionExecutor.shutdown();
-    consensusExecutor.shutdown();
-  }
-
   /////////////////////////  Singleton Instance Holder  
/////////////////////////
 
   private PipeSubtaskExecutorManager() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 1f555e8f4a9..49a9241c1e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -74,8 +74,6 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
@@ -110,7 +108,6 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
   private IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncTransferClientManager;
   private PipeConsensusAsyncBatchReqBuilder tabletBatchBuilder;
   private volatile long currentReplicateProgress = 0;
-  private final Lock lock = new ReentrantLock();
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -208,44 +205,33 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
    * if one event is successfully processed by receiver in PipeConsensus, we 
will remove this event
    * from transferBuffer in order to transfer other event.
    */
-  public void removeEventFromBuffer(EnrichedEvent event) {
-    try {
-      lock.lockInterruptibly();
-    } catch (InterruptedException e) {
-      LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted. Will 
exit directly", e);
-      Thread.currentThread().interrupt();
+  public synchronized void removeEventFromBuffer(EnrichedEvent event) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "PipeConsensus-ConsensusGroup-{}: one event-{} successfully received 
by the follower, will be removed from queue, queue size = {}, limit size = {}",
+          consensusGroupId,
+          event,
+          transferBuffer.size(),
+          IOTDB_CONFIG.getIotConsensusV2PipelineSize());
+    }
+    if (transferBuffer.isEmpty()) {
+      LOGGER.info(
+          "PipeConsensus-ConsensusGroup-{}: try to remove event-{} after 
pipeConsensusAsyncConnector being closed. Ignore it.",
+          consensusGroupId,
+          event);
       return;
     }
-    try {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug(
-            "PipeConsensus-ConsensusGroup-{}: one event-{} successfully 
received by the follower, will be removed from queue, queue size = {}, limit 
size = {}",
-            consensusGroupId,
-            event,
-            transferBuffer.size(),
-            IOTDB_CONFIG.getIotConsensusV2PipelineSize());
-      }
-      if (transferBuffer.isEmpty()) {
-        LOGGER.info(
-            "PipeConsensus-ConsensusGroup-{}: try to remove event-{} after 
pipeConsensusAsyncConnector being closed. Ignore it.",
-            consensusGroupId,
-            event);
-        return;
-      }
-      Iterator<EnrichedEvent> iterator = transferBuffer.iterator();
-      EnrichedEvent current = iterator.next();
-      while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) {
-        current = iterator.next();
-      }
-      iterator.remove();
-      // update replicate progress
-      currentReplicateProgress =
-          Math.max(currentReplicateProgress, 
event.getReplicateIndexForIoTV2());
-      // decrease reference count
-      
event.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), true);
-    } finally {
-      lock.unlock();
+    Iterator<EnrichedEvent> iterator = transferBuffer.iterator();
+    EnrichedEvent current = iterator.next();
+    while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) {
+      current = iterator.next();
     }
+    iterator.remove();
+    // update replicate progress
+    currentReplicateProgress =
+        Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2());
+    // decrease reference count
+    event.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), 
true);
   }
 
   @Override
@@ -467,14 +453,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
    */
   private void syncTransferQueuedEventsIfNecessary() throws Exception {
     while (!retryEventQueue.isEmpty()) {
-      try {
-        lock.lockInterruptibly();
-      } catch (InterruptedException e) {
-        LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted. Will 
exit directly", e);
-        Thread.currentThread().interrupt();
-        return;
-      }
-      try {
+      synchronized (this) {
         if (isClosed.get() || retryEventQueue.isEmpty()) {
           return;
         }
@@ -517,8 +496,6 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
           // poll it from transferBuffer
           removeEventFromBuffer((EnrichedEvent) polledEvent);
         }
-      } finally {
-        lock.unlock();
       }
     }
   }
@@ -563,47 +540,19 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
     }
   }
 
-  public void clearRetryEventsReferenceCount() {
-    boolean needUnLock = true;
-    try {
-      lock.lockInterruptibly();
-    } catch (InterruptedException e) {
-      LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e);
-      Thread.currentThread().interrupt();
-      needUnLock = false;
-    }
-    try {
-      while (!retryEventQueue.isEmpty()) {
-        final Event event = retryEventQueue.poll();
-        if (event instanceof EnrichedEvent) {
-          ((EnrichedEvent) 
event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
-        }
-      }
-    } finally {
-      if (needUnLock) {
-        lock.unlock();
+  public synchronized void clearRetryEventsReferenceCount() {
+    while (!retryEventQueue.isEmpty()) {
+      final Event event = retryEventQueue.poll();
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) 
event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
       }
     }
   }
 
-  public void clearTransferBufferReferenceCount() {
-    boolean needUnLock = true;
-    try {
-      lock.lockInterruptibly();
-    } catch (InterruptedException e) {
-      LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e);
-      Thread.currentThread().interrupt();
-      needUnLock = false;
-    }
-    try {
-      while (!transferBuffer.isEmpty()) {
-        final EnrichedEvent event = transferBuffer.poll();
-        event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
-      }
-    } finally {
-      if (needUnLock) {
-        lock.unlock();
-      }
+  public synchronized void clearTransferBufferReferenceCount() {
+    while (!transferBuffer.isEmpty()) {
+      final EnrichedEvent event = transferBuffer.poll();
+      event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
     }
   }
 
@@ -629,35 +578,21 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
 
   // synchronized to avoid close connector when transfer event
   @Override
-  public void close() {
-    boolean needUnLock = true;
-    try {
-      lock.lockInterruptibly();
-    } catch (InterruptedException e) {
-      LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e);
-      Thread.currentThread().interrupt();
-      needUnLock = false;
-    }
-    try {
-      super.close();
-      isClosed.set(true);
-
-      retryConnector.close();
-      clearRetryEventsReferenceCount();
-      clearTransferBufferReferenceCount();
+  public synchronized void close() {
+    super.close();
+    isClosed.set(true);
 
-      if (tabletBatchBuilder != null) {
-        tabletBatchBuilder.close();
-      }
+    retryConnector.close();
+    clearRetryEventsReferenceCount();
+    clearTransferBufferReferenceCount();
 
-      PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
-          .removeConsensusPipeConnector(new 
ConsensusPipeName(consensusPipeName));
-      
MetricService.getInstance().removeMetricSet(this.pipeConsensusConnectorMetrics);
-    } finally {
-      if (needUnLock) {
-        lock.unlock();
-      }
+    if (tabletBatchBuilder != null) {
+      tabletBatchBuilder.close();
     }
+
+    PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
+        .removeConsensusPipeConnector(new 
ConsensusPipeName(consensusPipeName));
+    
MetricService.getInstance().removeMetricSet(this.pipeConsensusConnectorMetrics);
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index cb05d5d2153..34dfe8d35b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
-import 
org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
@@ -90,9 +89,6 @@ public class DataNodeShutdownHook extends Thread {
       triggerSnapshotForAllDataRegion();
     }
 
-    // Shutdown all pipe connector executors
-    PipeSubtaskExecutorManager.getInstance().shutdownAll();
-
     // Actually stop all services started by the DataNode.
     // If we don't call this, services like the RestService are not stopped 
and I can't re-start
     // it.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index 7b4498f4b44..4ea7714962b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -150,7 +150,7 @@ public abstract class PipeSubtaskExecutor {
       subtask.disallowSubmittingSelf();
     }
 
-    subtaskWorkerThreadPoolExecutor.shutdownNow();
+    subtaskWorkerThreadPoolExecutor.shutdown();
   }
 
   public final boolean isShutdown() {

Reply via email to