This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-sync-infinite-loop in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a4dd3e5803c1399068b12e49af75ed4eda156c5d Author: Steve Yurong Su <[email protected]> AuthorDate: Fri May 10 18:17:15 2024 +0800 Pipe: fix infinite loop with lock in async connector (which may cause selector & connector worker deadlock) --- .../async/IoTDBDataRegionAsyncConnector.java | 79 +++++++++++----------- .../PipeTransferTsFileInsertionEventHandler.java | 13 ++-- 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 1e01b824220..a5b69cea473 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -54,10 +54,10 @@ import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; -import java.util.Comparator; import java.util.HashMap; import java.util.Objects; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY; @@ -83,14 +83,7 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { private IoTDBDataNodeAsyncClientManager clientManager; private final IoTDBDataRegionSyncConnector retryConnector = new IoTDBDataRegionSyncConnector(); - private final PriorityBlockingQueue<Event> retryEventQueue = - new PriorityBlockingQueue<>( - 11, - Comparator.comparing( - e -> - // Non-enriched events will be put at the front of the queue, - // because they are more likely to be lost and need to be retried first. - e instanceof EnrichedEvent ? ((EnrichedEvent) e).getCommitId() : 0)); + private final BlockingQueue<Event> retryEventQueue = new LinkedBlockingQueue<>(); private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder; @@ -359,37 +352,47 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { * @see PipeConnector#transfer(TabletInsertionEvent) for more details. * @see PipeConnector#transfer(TsFileInsertionEvent) for more details. */ - private synchronized void transferQueuedEventsIfNecessary() throws Exception { + private void transferQueuedEventsIfNecessary() throws Exception { + final int maxRetryTimes = retryEventQueue.size(); + int alreadyRetriedTimes = 0; + while (!retryEventQueue.isEmpty()) { - final Event peekedEvent = retryEventQueue.peek(); - - if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) { - retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) peekedEvent); - } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) { - retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent); - } else if (peekedEvent instanceof PipeTsFileInsertionEvent) { - // Using the async connector to transfer the event for performance. - transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent); - } else { - LOGGER.warn( - "IoTDBThriftAsyncConnector does not support transfer generic event: {}.", peekedEvent); - } + synchronized (this) { + if (isClosed.get() || retryEventQueue.isEmpty() || alreadyRetriedTimes++ >= maxRetryTimes) { + return; + } - if (peekedEvent instanceof EnrichedEvent) { - ((EnrichedEvent) peekedEvent) - .decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true); - } + final Event peekedEvent = retryEventQueue.peek(); + + if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) { + retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) peekedEvent); + } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) { + retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent); + } else if (peekedEvent instanceof PipeTsFileInsertionEvent) { + // Using the async connector to transfer the event for performance. + transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent); + } else { + LOGGER.warn( + "IoTDBThriftAsyncConnector does not support transfer generic event: {}.", + peekedEvent); + } - final Event polledEvent = retryEventQueue.poll(); - if (polledEvent != peekedEvent) { - LOGGER.error( - "The event polled from the queue is not the same as the event peeked from the queue. " - + "Peeked event: {}, polled event: {}.", - peekedEvent, - polledEvent); - } - if (polledEvent != null && LOGGER.isDebugEnabled()) { - LOGGER.debug("Polled event {} from retry queue.", polledEvent); + if (peekedEvent instanceof EnrichedEvent) { + ((EnrichedEvent) peekedEvent) + .decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true); + } + + final Event polledEvent = retryEventQueue.poll(); + if (polledEvent != peekedEvent) { + LOGGER.error( + "The event polled from the queue is not the same as the event peeked from the queue. " + + "Peeked event: {}, polled event: {}.", + peekedEvent, + polledEvent); + } + if (polledEvent != null && LOGGER.isDebugEnabled()) { + LOGGER.debug("Polled event {} from retry queue.", polledEvent); + } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java index 213d980c186..1c108a12691 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java @@ -242,11 +242,14 @@ public class PipeTransferTsFileInsertionEventHandler } catch (final IOException e) { LOGGER.warn("Failed to close file reader when failed to transfer file.", e); } finally { - connector.addFailureEventToRetryQueue(event); - - if (client != null) { - client.setShouldReturnSelf(true); - client.returnSelf(); + try { + // addFailureEventToRetryQueue's execution may be blocked, so return the client first + if (client != null) { + client.setShouldReturnSelf(true); + client.returnSelf(); + } + } finally { + connector.addFailureEventToRetryQueue(event); } } }
