This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.4 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d7a30c2941a0ca031d5afbcbebfce62c678684a8 Author: Peng Junzhi <[email protected]> AuthorDate: Wed May 7 15:12:04 2025 +0800 Revert "ShutdownNow all pipe connector subTask works to interrupt all threads blocking in lock. (#15411)" (#15462) This reverts commit acdf8cc0f9a04f9c8601e835bccd46c3d2f09af4. (cherry picked from commit 750f1deb877a5a5241f66367157047b01a321679) --- .../task/execution/PipeSubtaskExecutorManager.java | 7 - .../pipeconsensus/PipeConsensusAsyncConnector.java | 157 ++++++--------------- .../iotdb/db/service/DataNodeShutdownHook.java | 4 - .../agent/task/execution/PipeSubtaskExecutor.java | 2 +- 4 files changed, 47 insertions(+), 123 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java index 312091c79c7..eadb75463bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java @@ -48,13 +48,6 @@ public class PipeSubtaskExecutorManager { return consensusExecutor; } - public void shutdownAll() { - processorExecutor.shutdown(); - connectorExecutor.shutdown(); - subscriptionExecutor.shutdown(); - consensusExecutor.shutdown(); - } - ///////////////////////// Singleton Instance Holder ///////////////////////// private PipeSubtaskExecutorManager() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java index 1f555e8f4a9..49a9241c1e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java @@ -74,8 +74,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME; @@ -110,7 +108,6 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse private IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> asyncTransferClientManager; private PipeConsensusAsyncBatchReqBuilder tabletBatchBuilder; private volatile long currentReplicateProgress = 0; - private final Lock lock = new ReentrantLock(); @Override public void validate(final PipeParameterValidator validator) throws Exception { @@ -208,44 +205,33 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse * if one event is successfully processed by receiver in PipeConsensus, we will remove this event * from transferBuffer in order to transfer other event. */ - public void removeEventFromBuffer(EnrichedEvent event) { - try { - lock.lockInterruptibly(); - } catch (InterruptedException e) { - LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted. Will exit directly", e); - Thread.currentThread().interrupt(); + public synchronized void removeEventFromBuffer(EnrichedEvent event) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "PipeConsensus-ConsensusGroup-{}: one event-{} successfully received by the follower, will be removed from queue, queue size = {}, limit size = {}", + consensusGroupId, + event, + transferBuffer.size(), + IOTDB_CONFIG.getIotConsensusV2PipelineSize()); + } + if (transferBuffer.isEmpty()) { + LOGGER.info( + "PipeConsensus-ConsensusGroup-{}: try to remove event-{} after pipeConsensusAsyncConnector being closed. Ignore it.", + consensusGroupId, + event); return; } - try { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "PipeConsensus-ConsensusGroup-{}: one event-{} successfully received by the follower, will be removed from queue, queue size = {}, limit size = {}", - consensusGroupId, - event, - transferBuffer.size(), - IOTDB_CONFIG.getIotConsensusV2PipelineSize()); - } - if (transferBuffer.isEmpty()) { - LOGGER.info( - "PipeConsensus-ConsensusGroup-{}: try to remove event-{} after pipeConsensusAsyncConnector being closed. Ignore it.", - consensusGroupId, - event); - return; - } - Iterator<EnrichedEvent> iterator = transferBuffer.iterator(); - EnrichedEvent current = iterator.next(); - while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) { - current = iterator.next(); - } - iterator.remove(); - // update replicate progress - currentReplicateProgress = - Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2()); - // decrease reference count - event.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), true); - } finally { - lock.unlock(); + Iterator<EnrichedEvent> iterator = transferBuffer.iterator(); + EnrichedEvent current = iterator.next(); + while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) { + current = iterator.next(); } + iterator.remove(); + // update replicate progress + currentReplicateProgress = + Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2()); + // decrease reference count + event.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), true); } @Override @@ -467,14 +453,7 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse */ private void syncTransferQueuedEventsIfNecessary() throws Exception { while (!retryEventQueue.isEmpty()) { - try { - lock.lockInterruptibly(); - } catch (InterruptedException e) { - LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted. Will exit directly", e); - Thread.currentThread().interrupt(); - return; - } - try { + synchronized (this) { if (isClosed.get() || retryEventQueue.isEmpty()) { return; } @@ -517,8 +496,6 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse // poll it from transferBuffer removeEventFromBuffer((EnrichedEvent) polledEvent); } - } finally { - lock.unlock(); } } } @@ -563,47 +540,19 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse } } - public void clearRetryEventsReferenceCount() { - boolean needUnLock = true; - try { - lock.lockInterruptibly(); - } catch (InterruptedException e) { - LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e); - Thread.currentThread().interrupt(); - needUnLock = false; - } - try { - while (!retryEventQueue.isEmpty()) { - final Event event = retryEventQueue.poll(); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName()); - } - } - } finally { - if (needUnLock) { - lock.unlock(); + public synchronized void clearRetryEventsReferenceCount() { + while (!retryEventQueue.isEmpty()) { + final Event event = retryEventQueue.poll(); + if (event instanceof EnrichedEvent) { + ((EnrichedEvent) event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName()); } } } - public void clearTransferBufferReferenceCount() { - boolean needUnLock = true; - try { - lock.lockInterruptibly(); - } catch (InterruptedException e) { - LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e); - Thread.currentThread().interrupt(); - needUnLock = false; - } - try { - while (!transferBuffer.isEmpty()) { - final EnrichedEvent event = transferBuffer.poll(); - event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName()); - } - } finally { - if (needUnLock) { - lock.unlock(); - } + public synchronized void clearTransferBufferReferenceCount() { + while (!transferBuffer.isEmpty()) { + final EnrichedEvent event = transferBuffer.poll(); + event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName()); } } @@ -629,35 +578,21 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse // synchronized to avoid close connector when transfer event @Override - public void close() { - boolean needUnLock = true; - try { - lock.lockInterruptibly(); - } catch (InterruptedException e) { - LOGGER.info("PipeConsensusAsyncConnector try to lock interrupted.", e); - Thread.currentThread().interrupt(); - needUnLock = false; - } - try { - super.close(); - isClosed.set(true); - - retryConnector.close(); - clearRetryEventsReferenceCount(); - clearTransferBufferReferenceCount(); + public synchronized void close() { + super.close(); + isClosed.set(true); - if (tabletBatchBuilder != null) { - tabletBatchBuilder.close(); - } + retryConnector.close(); + clearRetryEventsReferenceCount(); + clearTransferBufferReferenceCount(); - PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr()) - .removeConsensusPipeConnector(new ConsensusPipeName(consensusPipeName)); - MetricService.getInstance().removeMetricSet(this.pipeConsensusConnectorMetrics); - } finally { - if (needUnLock) { - lock.unlock(); - } + if (tabletBatchBuilder != null) { + tabletBatchBuilder.close(); } + + PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr()) + .removeConsensusPipeConnector(new ConsensusPipeName(consensusPipeName)); + MetricService.getInstance().removeMetricSet(this.pipeConsensusConnectorMetrics); } //////////////////////////// APIs provided for metric framework //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index cb05d5d2153..34dfe8d35b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -28,7 +28,6 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; -import org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; @@ -90,9 +89,6 @@ public class DataNodeShutdownHook extends Thread { triggerSnapshotForAllDataRegion(); } - // Shutdown all pipe connector executors - PipeSubtaskExecutorManager.getInstance().shutdownAll(); - // Actually stop all services started by the DataNode. // If we don't call this, services like the RestService are not stopped and I can't re-start // it. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java index 7b4498f4b44..4ea7714962b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java @@ -150,7 +150,7 @@ public abstract class PipeSubtaskExecutor { subtask.disallowSubmittingSelf(); } - subtaskWorkerThreadPoolExecutor.shutdownNow(); + subtaskWorkerThreadPoolExecutor.shutdown(); } public final boolean isShutdown() {
