This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 679fc98021f Pipe: fix infinite loop with lock when retrying syncing
tsfiles in async connector (which may cause selector & connector worker
deadlock) (#12501)
679fc98021f is described below
commit 679fc98021fa2d896055237cb8a8cfe2fd7c244a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat May 11 12:35:23 2024 +0800
Pipe: fix infinite loop with lock when retrying syncing tsfiles in async
connector (which may cause selector & connector worker deadlock) (#12501)
---
.../client/IoTDBDataNodeAsyncClientManager.java | 4 +
.../async/IoTDBDataRegionAsyncConnector.java | 85 ++++++++++++----------
.../PipeTransferTsFileInsertionEventHandler.java | 12 +--
.../async/AsyncPipeDataTransferServiceClient.java | 3 +-
4 files changed, 58 insertions(+), 46 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index b034a8182ed..729c3dbc8bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBClientManager;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -202,6 +203,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
waitHandshakeFinished(isHandshakeFinished);
@@ -219,6 +222,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
resp.set(null);
exception.set(null);
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(
PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 1e01b824220..aef8f2f340a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -54,10 +54,10 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Objects;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
@@ -83,14 +83,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private IoTDBDataNodeAsyncClientManager clientManager;
private final IoTDBDataRegionSyncConnector retryConnector = new
IoTDBDataRegionSyncConnector();
- private final PriorityBlockingQueue<Event> retryEventQueue =
- new PriorityBlockingQueue<>(
- 11,
- Comparator.comparing(
- e ->
- // Non-enriched events will be put at the front of the queue,
- // because they are more likely to be lost and need to be
retried first.
- e instanceof EnrichedEvent ? ((EnrichedEvent)
e).getCommitId() : 0));
+ private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
@@ -359,37 +352,43 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
* @see PipeConnector#transfer(TabletInsertionEvent) for more details.
* @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
*/
- private synchronized void transferQueuedEventsIfNecessary() throws Exception
{
+ private void transferQueuedEventsIfNecessary() throws Exception {
while (!retryEventQueue.isEmpty()) {
- final Event peekedEvent = retryEventQueue.peek();
-
- if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- retryConnector.transfer((PipeInsertNodeTabletInsertionEvent)
peekedEvent);
- } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
- retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
- } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
- // Using the async connector to transfer the event for performance.
- transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent);
- } else {
- LOGGER.warn(
- "IoTDBThriftAsyncConnector does not support transfer generic
event: {}.", peekedEvent);
- }
+ synchronized (this) {
+ if (isClosed.get() || retryEventQueue.isEmpty()) {
+ return;
+ }
- if (peekedEvent instanceof EnrichedEvent) {
- ((EnrichedEvent) peekedEvent)
-
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true);
- }
+ final Event peekedEvent = retryEventQueue.peek();
+
+ if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ retryConnector.transfer((PipeInsertNodeTabletInsertionEvent)
peekedEvent);
+ } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
+ retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
+ } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
+ retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
+ } else {
+ LOGGER.warn(
+ "IoTDBThriftAsyncConnector does not support transfer generic
event: {}.",
+ peekedEvent);
+ }
- final Event polledEvent = retryEventQueue.poll();
- if (polledEvent != peekedEvent) {
- LOGGER.error(
- "The event polled from the queue is not the same as the event
peeked from the queue. "
- + "Peeked event: {}, polled event: {}.",
- peekedEvent,
- polledEvent);
- }
- if (polledEvent != null && LOGGER.isDebugEnabled()) {
- LOGGER.debug("Polled event {} from retry queue.", polledEvent);
+ if (peekedEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent) peekedEvent)
+
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true);
+ }
+
+ final Event polledEvent = retryEventQueue.poll();
+ if (polledEvent != peekedEvent) {
+ LOGGER.error(
+ "The event polled from the queue is not the same as the event
peeked from the queue. "
+ + "Peeked event: {}, polled event: {}.",
+ peekedEvent,
+ polledEvent);
+ }
+ if (polledEvent != null && LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Polled event {} from retry queue.", polledEvent);
+ }
}
}
}
@@ -410,7 +409,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
*
* @param event event to retry
*/
- public synchronized void addFailureEventToRetryQueue(final Event event) {
+ public void addFailureEventToRetryQueue(final Event event) {
if (isClosed.get()) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
@@ -422,6 +421,12 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Added event {} to retry queue.", event);
}
+
+ if (isClosed.get()) {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+ }
+ }
}
/**
@@ -429,7 +434,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
*
* @param events events to retry
*/
- public synchronized void addFailureEventsToRetryQueue(final Iterable<Event>
events) {
+ public void addFailureEventsToRetryQueue(final Iterable<Event> events) {
for (final Event event : events) {
addFailureEventToRetryQueue(event);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 213d980c186..aebff325920 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -242,11 +242,13 @@ public class PipeTransferTsFileInsertionEventHandler
} catch (final IOException e) {
LOGGER.warn("Failed to close file reader when failed to transfer file.",
e);
} finally {
- connector.addFailureEventToRetryQueue(event);
-
- if (client != null) {
- client.setShouldReturnSelf(true);
- client.returnSelf();
+ try {
+ if (client != null) {
+ client.setShouldReturnSelf(true);
+ client.returnSelf();
+ }
+ } finally {
+ connector.addFailureEventToRetryQueue(event);
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 186db8ede4c..4572516fbee 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -141,7 +141,8 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
LOGGER.error(
"Unexpected exception occurs in {}, error msg is {}",
this,
- ExceptionUtils.getRootCause(e).toString());
+ ExceptionUtils.getRootCause(e).toString(),
+ e);
}
return false;
}