This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 39548ab550b Pipe: Fixed event forever pinned and retry never invoked
when TException encountered in IoTDBThriftAsyncConnector (#11560)
39548ab550b is described below
commit 39548ab550b68e15166b4e0ed066b3fb6be42c65
Author: Caideyipi <[email protected]>
AuthorDate: Fri Nov 17 19:13:07 2023 +0800
Pipe: Fixed event forever pinned and retry never invoked when TException
encountered in IoTDBThriftAsyncConnector (#11560)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../thrift/async/IoTDBThriftAsyncConnector.java | 118 ++++++++-------------
.../PipeTransferTabletBatchEventHandler.java | 2 +-
.../PipeTransferTabletInsertionEventHandler.java | 2 +-
.../PipeTransferTsFileInsertionEventHandler.java | 5 +-
.../protocol/websocket/WebSocketConnector.java | 11 +-
5 files changed, 59 insertions(+), 79 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 9354e10aecd..1e34ef7c4be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -54,13 +54,13 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
@@ -79,8 +79,9 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector
{
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftAsyncConnector.class);
- private static final String FAILED_TO_BORROW_CLIENT_FORMATTER =
- "Failed to borrow client from client pool for receiver %s:%s.";
+ private static final String THRIFT_ERROR_FORMATTER =
+ "Failed to borrow client from client pool or exception occurred "
+ + "when sending to receiver %s:%s.";
private static final AtomicReference<
IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
@@ -175,15 +176,13 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
return;
}
- final long requestCommitId = commitIdGenerator.incrementAndGet();
-
if (isTabletBatchModeEnabled) {
+ final long requestCommitId = commitIdGenerator.incrementAndGet();
if (tabletBatchBuilder.onEvent(tabletInsertionEvent, requestCommitId)) {
final PipeTransferTabletBatchEventHandler
pipeTransferTabletBatchEventHandler =
new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
-
tabletBatchBuilder.onSuccess();
}
} else {
@@ -196,6 +195,8 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
pipeInsertNodeTabletInsertionEvent.getByteBuffer())
: PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getInsertNode());
+
+ final long requestCommitId = commitIdGenerator.incrementAndGet();
final PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler =
new PipeTransferTabletInsertNodeEventHandler(
requestCommitId, pipeInsertNodeTabletInsertionEvent,
pipeTransferReq, this);
@@ -208,6 +209,8 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
PipeTransferTabletRawReq.toTPipeTransferReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned());
+
+ final long requestCommitId = commitIdGenerator.incrementAndGet();
final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
new PipeTransferTabletRawEventHandler(
requestCommitId, pipeRawTabletInsertionEvent,
pipeTransferTabletRawReq, this);
@@ -224,22 +227,12 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
try {
final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
-
- try {
- pipeTransferTabletBatchEventHandler.transfer(client);
- } catch (TException e) {
- LOGGER.warn(
- String.format(
- "Transfer batched insertion requests to receiver %s:%s error,
retrying...",
- targetNodeUrl.getIp(), targetNodeUrl.getPort()),
- e);
- }
+ pipeTransferTabletBatchEventHandler.transfer(client);
} catch (Exception ex) {
- pipeTransferTabletBatchEventHandler.onError(ex);
LOGGER.warn(
- String.format(
- FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
+ String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
ex);
+ pipeTransferTabletBatchEventHandler.onError(ex);
}
}
@@ -250,22 +243,12 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
try {
final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
-
- try {
- pipeTransferInsertNodeReqHandler.transfer(client);
- } catch (TException e) {
- LOGGER.warn(
- String.format(
- "Transfer insert node to receiver %s:%s error, retrying...",
- targetNodeUrl.getIp(), targetNodeUrl.getPort()),
- e);
- }
+ pipeTransferInsertNodeReqHandler.transfer(client);
} catch (Exception ex) {
- pipeTransferInsertNodeReqHandler.onError(ex);
LOGGER.warn(
- String.format(
- FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
+ String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
ex);
+ pipeTransferInsertNodeReqHandler.onError(ex);
}
}
@@ -275,22 +258,12 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
try {
final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
-
- try {
- pipeTransferTabletReqHandler.transfer(client);
- } catch (TException e) {
- LOGGER.warn(
- String.format(
- "Transfer tablet to receiver %s:%s error, retrying...",
- targetNodeUrl.getIp(), targetNodeUrl.getPort()),
- e);
- }
+ pipeTransferTabletReqHandler.transfer(client);
} catch (Exception ex) {
- pipeTransferTabletReqHandler.onError(ex);
LOGGER.warn(
- String.format(
- FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
+ String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
ex);
+ pipeTransferTabletReqHandler.onError(ex);
}
}
@@ -327,6 +300,11 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
return;
}
+ // Just in case. To avoid the case that exception occurred when
constructing the handler.
+ if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
+ throw new
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
+ }
+
final long requestCommitId = commitIdGenerator.incrementAndGet();
final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler =
new PipeTransferTsFileInsertionEventHandler(
@@ -342,22 +320,12 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
try {
final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
-
- try {
- pipeTransferTsFileInsertionEventHandler.transfer(client);
- } catch (TException e) {
- LOGGER.warn(
- String.format(
- "Transfer tsfile to receiver %s:%s error, retrying...",
- targetNodeUrl.getIp(), targetNodeUrl.getPort()),
- e);
- }
+ pipeTransferTsFileInsertionEventHandler.transfer(client);
} catch (Exception ex) {
- pipeTransferTsFileInsertionEventHandler.onError(ex);
LOGGER.warn(
- String.format(
- FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
+ String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
ex);
+ pipeTransferTsFileInsertionEventHandler.onError(ex);
}
}
@@ -373,20 +341,13 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
}
private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint
targetNodeUrl)
- throws PipeConnectionException {
- try {
- while (true) {
- final AsyncPipeDataTransferServiceClient client =
- asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
- if (handshakeIfNecessary(targetNodeUrl, client)) {
- return client;
- }
+ throws Exception {
+ while (true) {
+ final AsyncPipeDataTransferServiceClient client =
+ asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+ if (handshakeIfNecessary(targetNodeUrl, client)) {
+ return client;
}
- } catch (Exception e) {
- throw new PipeConnectionException(
- String.format(
- FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
- e);
}
}
@@ -495,9 +456,7 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
"IoTDBThriftAsyncConnector does not support transfer generic
event: {}.", event);
}
- if (event instanceof EnrichedEvent) {
- commit(requestCommitId, (EnrichedEvent) event);
- }
+ commit(requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent)
event : null);
retryEventQueue.poll();
}
@@ -546,7 +505,16 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
while (!commitQueue.isEmpty()) {
final Pair<Long, Runnable> committer = commitQueue.peek();
- if (lastCommitId.get() + 1 != committer.left) {
+
+ // If the commit id is less than or equals to the last commit id, it
means that
+ // the event has been committed before, and has been retried. So the
event can
+ // be ignored.
+ if (committer.left <= lastCommitId.get()) {
+ commitQueue.poll();
+ continue;
+ }
+
+ if (committer.left != lastCommitId.get() + 1) {
break;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 8929bed1916..a3a316983cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -86,7 +86,7 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
@Override
public void onError(Exception exception) {
LOGGER.warn(
- "Failed to transfer TabletInsertionEvent {} (requestCommitId={}).",
+ "Failed to transfer TabletInsertionEvent batch {} (request commit
ids={}).",
events,
requestCommitIds,
exception);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index c76b81011e3..619a0f961b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -94,7 +94,7 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
@Override
public void onError(Exception exception) {
LOGGER.warn(
- "Failed to transfer TabletInsertionEvent {} (requestCommitId={}).",
+ "Failed to transfer TabletInsertionEvent {} (request commit id={}).",
event,
requestCommitId,
exception);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 47bc2eb1e1f..f3b130eae64 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -164,7 +164,10 @@ public class PipeTransferTsFileInsertionEventHandler
@Override
public void onError(Exception exception) {
LOGGER.warn(
- "Failed to transfer tsfile {} (request commit id {}).", tsFile,
requestCommitId, exception);
+ "Failed to transfer TsFileInsertionEvent {} (request commit id {}).",
+ tsFile,
+ requestCommitId,
+ exception);
try {
if (reader != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 0bdc67498ba..4fc3c9a41e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -172,7 +172,16 @@ public class WebSocketConnector implements PipeConnector {
while (!commitQueue.isEmpty()) {
final Pair<Long, Runnable> committer = commitQueue.peek();
- if (lastCommitId.get() + 1 != committer.left) {
+
+ // If the commit id is less than or equals to the last commit id, it
means that
+ // the event has been committed before, and has been retried. So the
event can
+ // be ignored.
+ if (committer.left <= lastCommitId.get()) {
+ commitQueue.poll();
+ continue;
+ }
+
+ if (committer.left != lastCommitId.get() + 1) {
break;
}