This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f280381be91 [IOTDB-6067] Pipe: Improve the stability of
iotdb-thrift-connector-v2 during fault tolerance (avoid OOM) (#10550)
f280381be91 is described below
commit f280381be91083bc96ed054a61dd866dba9aaccd
Author: 马子坤 <[email protected]>
AuthorDate: Fri Jul 28 12:30:51 2023 +0800
[IOTDB-6067] Pipe: Improve the stability of iotdb-thrift-connector-v2
during fault tolerance (avoid OOM) (#10550)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 8 +
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 52 ++-
.../pipe/connector/v1/IoTDBThriftConnectorV1.java | 22 +-
.../pipe/connector/v2/IoTDBThriftConnectorV2.java | 404 ++++++++++++++++-----
...nsferInsertNodeTabletInsertionEventHandler.java | 12 +-
...PipeTransferRawTabletInsertionEventHandler.java | 13 +-
.../PipeTransferTabletInsertionEventHandler.java | 69 +---
.../PipeTransferTsFileInsertionEventHandler.java | 64 +---
.../execution/load/LoadTsFileManager.java | 7 +
.../resources/conf/iotdb-common.properties | 12 +
.../iotdb/commons/client/ClientPoolFactory.java | 16 +-
.../async/AsyncPipeDataTransferServiceClient.java | 19 +-
.../iotdb/commons/concurrent/ThreadName.java | 5 +
.../apache/iotdb/commons/conf/CommonConfig.java | 39 ++
.../iotdb/commons/conf/CommonDescriptor.java | 21 ++
.../iotdb/commons/pipe/config/PipeConfig.java | 16 +
16 files changed, 557 insertions(+), 222 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 391bdc97ba2..5112d1e730d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.runtime;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -107,6 +108,13 @@ public class PipeRuntimeAgent implements IService {
pipeTaskMeta,
pipeRuntimeException.getMessage(),
pipeRuntimeException);
+
pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
+
+ // Quick stop all pipes locally if critical exception occurs,
+ // no need to wait for the next heartbeat cycle.
+ if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
+ PipeAgent.task().stopAllPipesWithCriticalException();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 05ccda0ed1b..2ebe4156b34 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.agent.task;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -109,13 +111,13 @@ public class PipeTaskAgent {
List<PipeMeta> pipeMetaListFromConfigNode) {
acquireWriteLock();
try {
- return handlePipeMetaChangesWithoutLock(pipeMetaListFromConfigNode);
+ return handlePipeMetaChangesInternal(pipeMetaListFromConfigNode);
} finally {
releaseWriteLock();
}
}
- private List<TPushPipeMetaRespExceptionMessage>
handlePipeMetaChangesWithoutLock(
+ private List<TPushPipeMetaRespExceptionMessage>
handlePipeMetaChangesInternal(
List<PipeMeta> pipeMetaListFromConfigNode) {
// Do nothing if data node is removing or removed
if (PipeAgent.runtime().isShutdown()) {
@@ -294,13 +296,13 @@ public class PipeTaskAgent {
public synchronized void dropAllPipeTasks() {
acquireWriteLock();
try {
- dropAllPipeTasksWithoutLock();
+ dropAllPipeTasksInternal();
} finally {
releaseWriteLock();
}
}
- private void dropAllPipeTasksWithoutLock() {
+ private void dropAllPipeTasksInternal() {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
try {
dropPipe(
@@ -315,6 +317,40 @@ public class PipeTaskAgent {
}
}
+ public synchronized void stopAllPipesWithCriticalException() {
+ acquireWriteLock();
+ try {
+ stopAllPipesWithCriticalExceptionInternal();
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
+ private void stopAllPipesWithCriticalExceptionInternal() {
+ pipeMetaKeeper
+ .getPipeMetaList()
+ .forEach(
+ pipeMeta -> {
+ final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+ final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+
+ if (runtimeMeta.getStatus().get() == PipeStatus.RUNNING) {
+ runtimeMeta
+ .getConsensusGroupId2TaskMetaMap()
+ .values()
+ .forEach(
+ pipeTaskMeta -> {
+ for (final PipeRuntimeException e :
pipeTaskMeta.getExceptionMessages()) {
+ if (e instanceof PipeRuntimeCriticalException) {
+ stopPipe(staticMeta.getPipeName(),
staticMeta.getCreationTime());
+ return;
+ }
+ }
+ });
+ }
+ });
+ }
+
////////////////////////// Manage by Pipe Name //////////////////////////
/**
@@ -671,13 +707,13 @@ public class PipeTaskAgent {
throws TException {
acquireReadLock();
try {
- collectPipeMetaListWithoutLock(req, resp);
+ collectPipeMetaListInternal(req, resp);
} finally {
releaseReadLock();
}
}
- private void collectPipeMetaListWithoutLock(THeartbeatReq req,
THeartbeatResp resp)
+ private void collectPipeMetaListInternal(THeartbeatReq req, THeartbeatResp
resp)
throws TException {
// Do nothing if data node is removing or removed, or request does not
need pipe meta list
if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
@@ -700,13 +736,13 @@ public class PipeTaskAgent {
throws TException {
acquireReadLock();
try {
- collectPipeMetaListWithoutLock(req, resp);
+ collectPipeMetaListInternal(req, resp);
} finally {
releaseReadLock();
}
}
- private void collectPipeMetaListWithoutLock(TPipeHeartbeatReq req,
TPipeHeartbeatResp resp)
+ private void collectPipeMetaListInternal(TPipeHeartbeatReq req,
TPipeHeartbeatResp resp)
throws TException {
// Do nothing if data node is removing or removed, or request does not
need pipe meta list
if (PipeAgent.runtime().isShutdown()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index b12646bdf02..d7ca781e95a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +68,15 @@ public class IoTDBThriftConnectorV1 implements PipeConnector
{
private IoTDBThriftConnectorClient client;
+ public IoTDBThriftConnectorV1() {
+ // Do nothing
+ }
+
+ public IoTDBThriftConnectorV1(String ipAddress, int port) {
+ this.ipAddress = ipAddress;
+ this.port = port;
+ }
+
@Override
public void validate(PipeParameterValidator validator) throws Exception {
validator
@@ -134,9 +142,11 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
} else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
} else {
- throw new NotImplementedException(
+ LOGGER.warn(
"IoTDBThriftConnectorV1 only support "
- + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent.");
+ + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent. "
+ + "Ignore {}.",
+ tabletInsertionEvent);
}
} catch (TException e) {
throw new PipeConnectionException(
@@ -151,8 +161,10 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
// PipeProcessor can change the type of TabletInsertionEvent
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
- throw new NotImplementedException(
- "IoTDBThriftConnectorV1 only support PipeTsFileInsertionEvent.");
+ LOGGER.warn(
+ "IoTDBThriftConnectorV1 only support PipeTsFileInsertionEvent.
Ignore {}.",
+ tsFileInsertionEvent);
+ return;
}
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
index 6d9372f7922..ffffd075051 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
@@ -23,10 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient;
+import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
@@ -51,8 +52,8 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,25 +64,35 @@ import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
public class IoTDBThriftConnectorV2 implements PipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
+
private static final String FAILED_TO_BORROW_CLIENT_FORMATTER =
"Failed to borrow client from client pool for receiver %s:%s.";
- private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
-
- private static volatile IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>
- asyncPipeDataTransferClientManagerHolder;
+ private static final AtomicReference<
+ IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
asyncPipeDataTransferClientManager;
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private static final AtomicReference<ScheduledExecutorService> RETRY_TRIGGER
=
+ new AtomicReference<>();
+ private static final int RETRY_TRIGGER_INTERVAL_MINUTES = 1;
+ private final AtomicReference<Future<?>> retryTriggerFuture = new
AtomicReference<>();
+ private final AtomicReference<IoTDBThriftConnectorV1> retryConnector = new
AtomicReference<>();
+ private final PriorityQueue<Pair<Long, Event>> retryEventQueue =
+ new PriorityQueue<>(Comparator.comparing(o -> o.left));
private final AtomicLong commitIdGenerator = new AtomicLong(0);
private final AtomicLong lastCommitId = new AtomicLong(0);
@@ -91,41 +102,17 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
private List<TEndPoint> nodeUrls;
public IoTDBThriftConnectorV2() {
- if (asyncPipeDataTransferClientManagerHolder == null) {
+ if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
synchronized (IoTDBThriftConnectorV2.class) {
- if (asyncPipeDataTransferClientManagerHolder == null) {
- asyncPipeDataTransferClientManagerHolder =
+ if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
}
}
}
-
- asyncPipeDataTransferClientManager =
asyncPipeDataTransferClientManagerHolder;
- }
-
- public synchronized void commit(long requestCommitId, @Nullable
EnrichedEvent enrichedEvent) {
- commitQueue.offer(
- new Pair<>(
- requestCommitId,
- () ->
- Optional.ofNullable(enrichedEvent)
- .ifPresent(
- event ->
-
event.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName()))));
-
- while (!commitQueue.isEmpty()) {
- final Pair<Long, Runnable> committer = commitQueue.peek();
- if (lastCommitId.get() + 1 != committer.left) {
- break;
- }
-
- committer.right.run();
- lastCommitId.incrementAndGet();
-
- commitQueue.poll();
- }
+ asyncPipeDataTransferClientManager =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
}
@Override
@@ -142,33 +129,58 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(",")));
if (nodeUrls.isEmpty()) {
throw new PipeException("Node urls is empty.");
+ } else {
+ LOGGER.info("Node urls: {}.", nodeUrls);
}
}
@Override
- public void handshake() throws Exception {
- final TEndPoint firstNodeUrl = nodeUrls.get(0);
- try (IoTDBThriftConnectorClient client =
- new IoTDBThriftConnectorClient(
- new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
-
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
- .build(),
- firstNodeUrl.getIp(),
- firstNodeUrl.getPort())) {
- final TPipeTransferResp resp =
- client.pipeTransfer(
- PipeTransferHandshakeReq.toTPipeTransferReq(
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
- if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(String.format("Handshake error, result status
%s.", resp.status));
+ // synchronized to avoid close connector when transfer event
+ public synchronized void handshake() throws Exception {
+ if (retryConnector.get() != null) {
+ try {
+ retryConnector.get().close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close connector to receiver when try to
handshake.", e);
+ }
+ retryConnector.set(null);
+ }
+
+ for (final TEndPoint endPoint : nodeUrls) {
+ final IoTDBThriftConnectorV1 connector =
+ new IoTDBThriftConnectorV1(endPoint.getIp(), endPoint.getPort());
+ try {
+ connector.handshake();
+ retryConnector.set(connector);
+ LOGGER.info(
+ "Handshake successfully with receiver {}:{}.", endPoint.getIp(),
endPoint.getPort());
+ break;
+ } catch (Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Handshake error with receiver %s:%s, retrying...",
+ endPoint.getIp(), endPoint.getPort()),
+ e);
+ try {
+ connector.close();
+ } catch (Exception ex) {
+ LOGGER.warn(
+ String.format(
+ "Failed to close connector to receiver %s:%s when handshake
error.",
+ endPoint.getIp(), endPoint.getPort()),
+ ex);
+ }
}
- } catch (TException e) {
+ }
+
+ if (retryConnector.get() == null) {
throw new PipeConnectionException(
String.format(
- "Connect to receiver %s:%s error: %s",
- e.getMessage(), firstNodeUrl.getIp(), firstNodeUrl.getPort()),
- e);
+ "Failed to connect to all receivers %s.",
+ nodeUrls.stream()
+ .map(endPoint -> endPoint.getIp() + ":" + endPoint.getPort())
+ .reduce((s1, s2) -> s1 + "," + s2)
+ .orElse("")));
}
}
@@ -179,8 +191,20 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ transferQueuedEventsIfNecessary();
+
+ if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ LOGGER.warn(
+ "IoTDBThriftConnectorV2 only support
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+ + "Current event: {}.",
+ tabletInsertionEvent);
+ return;
+ }
+
+ final long requestCommitId = commitIdGenerator.incrementAndGet();
+
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- final long requestCommitId = commitIdGenerator.incrementAndGet();
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
(PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
@@ -191,8 +215,7 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
requestCommitId, pipeInsertNodeTabletInsertionEvent,
pipeTransferInsertNodeReq, this);
transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
- } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- final long requestCommitId = commitIdGenerator.incrementAndGet();
+ } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
final PipeTransferTabletReq pipeTransferTabletReq =
@@ -201,24 +224,19 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
pipeRawTabletInsertionEvent.isAligned());
final PipeTransferRawTabletInsertionEventHandler
pipeTransferTabletReqHandler =
new PipeTransferRawTabletInsertionEventHandler(
- requestCommitId, pipeTransferTabletReq, this);
+ requestCommitId, pipeRawTabletInsertionEvent,
pipeTransferTabletReq, this);
transfer(requestCommitId, pipeTransferTabletReqHandler);
- } else {
- throw new NotImplementedException(
- "IoTDBThriftConnectorV2 only support "
- + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent.");
}
}
- public void transfer(
+ private void transfer(
long requestCommitId,
PipeTransferInsertNodeTabletInsertionEventHandler
pipeTransferInsertNodeReqHandler) {
final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
try {
- final AsyncPipeDataTransferServiceClient client =
- asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+ final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
try {
pipeTransferInsertNodeReqHandler.transfer(client);
@@ -238,14 +256,13 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
}
}
- public void transfer(
+ private void transfer(
long requestCommitId,
PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler)
{
final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
try {
- final AsyncPipeDataTransferServiceClient client =
- asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+ final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
try {
pipeTransferTabletReqHandler.transfer(client);
@@ -267,12 +284,17 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
+ transferQueuedEventsIfNecessary();
+
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
- throw new NotImplementedException(
- "IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent.");
+ LOGGER.warn(
+ "IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent.
Current event: {}.",
+ tsFileInsertionEvent);
+ return;
}
final long requestCommitId = commitIdGenerator.incrementAndGet();
+
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;
final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler =
@@ -283,14 +305,13 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler);
}
- public void transfer(
+ private void transfer(
long requestCommitId,
PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler) {
final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
try {
- final AsyncPipeDataTransferServiceClient client =
- asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+ final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
try {
pipeTransferTsFileInsertionEventHandler.transfer(client);
@@ -311,16 +332,233 @@ public class IoTDBThriftConnectorV2 implements
PipeConnector {
}
@Override
- public void transfer(Event event) {
+ public void transfer(Event event) throws Exception {
+ transferQueuedEventsIfNecessary();
+
LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic
event: {}.", event);
}
- @Override
- public void close() {
- isClosed.set(true);
+ private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint
targetNodeUrl)
+ throws PipeConnectionException {
+ try {
+ while (true) {
+ final AsyncPipeDataTransferServiceClient client =
+ asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+ if (handshakeIfNecessary(targetNodeUrl, client)) {
+ return client;
+ }
+ }
+ } catch (Exception e) {
+ throw new PipeConnectionException(
+ String.format(
+ FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
+ e);
+ }
+ }
+
+ /**
+ * Handshake with the target if necessary.
+ *
+ * @param client client to handshake
+ * @return true if the handshake is already finished, false if the handshake
is not finished yet
+ * and finished in this method
+ * @throws Exception if an error occurs.
+ */
+ private boolean handshakeIfNecessary(
+ TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client)
throws Exception {
+ if (client.isHandshakeFinished()) {
+ return true;
+ }
+
+ final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ client.pipeTransfer(
+ PipeTransferHandshakeReq.toTPipeTransferReq(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+ new AsyncMethodCallback<TPipeTransferResp>() {
+ @Override
+ public void onComplete(TPipeTransferResp response) {
+ if (response.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Handshake error with receiver {}:{}, code: {}, message:
{}.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ response.getStatus().getCode(),
+ response.getStatus().getMessage());
+ exception.set(
+ new PipeConnectionException(
+ String.format(
+ "Handshake error with receiver %s:%s, code: %d,
message: %s.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ response.getStatus().getCode(),
+ response.getStatus().getMessage())));
+ } else {
+ LOGGER.info(
+ "Handshake successfully with receiver {}:{}.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort());
+ client.markHandshakeFinished();
+ }
+
+ isHandshakeFinished.set(true);
+ }
+
+ @Override
+ public void onError(Exception e) {
+ LOGGER.warn(
+ "Handshake error with receiver {}:{}.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ e);
+ exception.set(e);
+
+ isHandshakeFinished.set(true);
+ }
+ });
+
+ try {
+ while (!isHandshakeFinished.get()) {
+ Thread.sleep(10);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PipeException("Interrupted while waiting for handshake
response.", e);
+ }
+
+ if (exception.get() != null) {
+ throw new PipeConnectionException("Failed to handshake.",
exception.get());
+ }
+
+ return false;
+ }
+
+ /**
+ * Transfer queued events which are waiting for retry.
+ *
+ * @throws Exception if an error occurs. The error will be handled by pipe
framework, which will
+ * retry the event and mark the event as failure and stop the pipe if
the retry times exceeds
+ * the threshold.
+ * @see PipeConnector#transfer(Event) for more details.
+ * @see PipeConnector#transfer(TabletInsertionEvent) for more details.
+ * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
+ */
+ private synchronized void transferQueuedEventsIfNecessary() throws Exception
{
+ while (!retryEventQueue.isEmpty()) {
+ final Pair<Long, Event> queuedEventPair = retryEventQueue.peek();
+ final long requestCommitId = queuedEventPair.getLeft();
+ final Event event = queuedEventPair.getRight();
+
+ IoTDBThriftConnectorV1 connector = retryConnector.get();
+ if (connector == null) {
+ LOGGER.warn("Retry connector is broken. Will try to reconnect it by
handshake.");
+ handshake();
+ }
+ connector = retryConnector.get();
+
+ if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+ connector.transfer((PipeInsertNodeTabletInsertionEvent) event);
+ } else if (event instanceof PipeRawTabletInsertionEvent) {
+ connector.transfer((PipeRawTabletInsertionEvent) event);
+ } else if (event instanceof PipeTsFileInsertionEvent) {
+ connector.transfer((PipeTsFileInsertionEvent) event);
+ } else {
+ LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic
event: {}.", event);
+ }
+
+ if (event instanceof EnrichedEvent) {
+ commit(requestCommitId, (EnrichedEvent) event);
+ }
+
+ retryEventQueue.poll();
+ }
}
- public boolean isClosed() {
- return isClosed.get();
+ /**
+ * Commit the event. Decrease the reference count of the event. If the
reference count is 0, the
+ * progress index of the event will be recalculated and the resources of the
event will be
+ * released.
+ *
+ * <p>The synchronization is necessary because the commit order must be the
same as the order of
+ * the events. Concurrent commit may cause the commit order to be
inconsistent with the order of
+ * the events.
+ *
+ * @param requestCommitId commit id of the request
+ * @param enrichedEvent event to commit
+ */
+ public synchronized void commit(long requestCommitId, @Nullable
EnrichedEvent enrichedEvent) {
+ commitQueue.offer(
+ new Pair<>(
+ requestCommitId,
+ () ->
+ Optional.ofNullable(enrichedEvent)
+ .ifPresent(
+ event ->
+
event.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName()))));
+
+ while (!commitQueue.isEmpty()) {
+ final Pair<Long, Runnable> committer = commitQueue.peek();
+ if (lastCommitId.get() + 1 != committer.left) {
+ break;
+ }
+
+ committer.right.run();
+ lastCommitId.incrementAndGet();
+
+ commitQueue.poll();
+ }
+ }
+
+ /**
+ * Add failure event to retry queue.
+ *
+ * @param requestCommitId commit id of the request
+ * @param event event to retry
+ */
+ public void addFailureEventToRetryQueue(long requestCommitId, Event event) {
+ if (RETRY_TRIGGER.get() == null) {
+ synchronized (IoTDBThriftConnectorV2.class) {
+ if (RETRY_TRIGGER.get() == null) {
+ RETRY_TRIGGER.set(
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER.getName()));
+ }
+ }
+ }
+
+ if (retryTriggerFuture.get() == null) {
+ synchronized (IoTDBThriftConnectorV2.class) {
+ if (retryTriggerFuture.get() == null) {
+ retryTriggerFuture.set(
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ RETRY_TRIGGER.get(),
+ () -> {
+ try {
+ transferQueuedEventsIfNecessary();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to trigger retry.", e);
+ }
+ },
+ RETRY_TRIGGER_INTERVAL_MINUTES,
+ RETRY_TRIGGER_INTERVAL_MINUTES,
+ TimeUnit.MINUTES));
+ }
+ }
+ }
+
+ retryEventQueue.offer(new Pair<>(requestCommitId, event));
+ }
+
+ @Override
+ // synchronized to avoid close connector when transfer event
+ public synchronized void close() throws Exception {
+ if (retryTriggerFuture.get() != null) {
+ retryTriggerFuture.get().cancel(false);
+ }
+
+ if (retryConnector.get() != null) {
+ retryConnector.get().close();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
index f8694e71aba..4cb3643ba03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
@@ -21,19 +21,18 @@ package org.apache.iotdb.db.pipe.connector.v2.handler;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
-import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
-import javax.annotation.Nullable;
-
public class PipeTransferInsertNodeTabletInsertionEventHandler
extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
+
public PipeTransferInsertNodeTabletInsertionEventHandler(
long requestCommitId,
- @Nullable EnrichedEvent event,
+ PipeInsertNodeTabletInsertionEvent event,
TPipeTransferReq req,
IoTDBThriftConnectorV2 connector) {
super(requestCommitId, event, req, connector);
@@ -44,9 +43,4 @@ public class PipeTransferInsertNodeTabletInsertionEventHandler
throws TException {
client.pipeTransfer(req, this);
}
-
- @Override
- protected void retryTransfer(IoTDBThriftConnectorV2 connector, long
requestCommitId) {
- connector.transfer(requestCommitId, this);
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
index d0096eaafc2..c35c0103ce2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.connector.v2.handler;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -30,8 +31,11 @@ public class PipeTransferRawTabletInsertionEventHandler
extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
public PipeTransferRawTabletInsertionEventHandler(
- long requestCommitId, TPipeTransferReq req, IoTDBThriftConnectorV2
connector) {
- super(requestCommitId, null, req, connector);
+ long requestCommitId,
+ PipeRawTabletInsertionEvent event,
+ TPipeTransferReq req,
+ IoTDBThriftConnectorV2 connector) {
+ super(requestCommitId, event, req, connector);
}
@Override
@@ -39,9 +43,4 @@ public class PipeTransferRawTabletInsertionEventHandler
throws TException {
client.pipeTransfer(req, this);
}
-
- @Override
- protected void retryTransfer(IoTDBThriftConnectorV2 connector, long
requestCommitId) {
- connector.transfer(requestCommitId, this);
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
index 0f686bad747..3ab2220a477 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.pipe.connector.v2.handler;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -33,10 +33,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
public abstract class PipeTransferTabletInsertionEventHandler<E extends
TPipeTransferResp>
implements AsyncMethodCallback<E> {
@@ -45,20 +42,13 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
private final long requestCommitId;
- private final EnrichedEvent event;
+ private final Event event;
private final TPipeTransferReq req;
private final IoTDBThriftConnectorV2 connector;
- private static final long MAX_RETRY_WAIT_TIME_MS =
- (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() *
Math.pow(2, 5));
- private int retryCount = 0;
-
protected PipeTransferTabletInsertionEventHandler(
- long requestCommitId,
- @Nullable EnrichedEvent event,
- TPipeTransferReq req,
- IoTDBThriftConnectorV2 connector) {
+ long requestCommitId, Event event, TPipeTransferReq req,
IoTDBThriftConnectorV2 connector) {
this.requestCommitId = requestCommitId;
this.event = event;
this.req = req;
@@ -66,7 +56,13 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
Optional.ofNullable(event)
.ifPresent(
- e ->
e.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName()));
+ e -> {
+ if (e instanceof EnrichedEvent) {
+ ((EnrichedEvent) e)
+ .increaseReferenceCount(
+
PipeTransferTabletInsertionEventHandler.class.getName());
+ }
+ });
}
public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException {
@@ -85,7 +81,8 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
}
if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- connector.commit(requestCommitId, event);
+ connector.commit(
+ requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent)
event : null);
} else {
onError(new PipeException(response.getStatus().getMessage()));
}
@@ -93,40 +90,12 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
@Override
public void onError(Exception exception) {
- ++retryCount;
-
- CompletableFuture.runAsync(
- () -> {
- try {
- Thread.sleep(
- Math.min(
- (long)
-
(PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()
- * Math.pow(2d, retryCount - 1d)),
- MAX_RETRY_WAIT_TIME_MS));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Unexpected interruption during retrying", e);
- }
-
- if (connector.isClosed()) {
- LOGGER.info(
- "IoTDBThriftConnectorV2 has been stopped, "
- + "we will not retry this request {} after {} times",
- req,
- retryCount,
- exception);
- } else {
- LOGGER.warn(
- "IoTDBThriftConnectorV2 failed to transfer request {} after {}
times, retrying...",
- req,
- retryCount,
- exception);
-
- retryTransfer(connector, requestCommitId);
- }
- });
- }
+ LOGGER.warn(
+ "Failed to transfer TabletInsertionEvent {} (requestCommitId={}).",
+ event,
+ requestCommitId,
+ exception);
- protected abstract void retryTransfer(IoTDBThriftConnectorV2 connector, long
requestCommitId);
+ connector.addFailureEventToRetryQueue(requestCommitId, event);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
index 692fa0f7ada..2172d551e9f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -40,7 +40,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
public class PipeTransferTsFileInsertionEventHandler
@@ -58,16 +57,12 @@ public class PipeTransferTsFileInsertionEventHandler
private final byte[] readBuffer;
private long position;
- private RandomAccessFile reader;
+ private final RandomAccessFile reader;
private final AtomicBoolean isSealSignalSent;
private AsyncPipeDataTransferServiceClient client;
- private static final long MAX_RETRY_WAIT_TIME_MS =
- (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() *
Math.pow(2, 5));
- private int retryCount = 0;
-
public PipeTransferTsFileInsertionEventHandler(
long requestCommitId, PipeTsFileInsertionEvent event,
IoTDBThriftConnectorV2 connector)
throws FileNotFoundException {
@@ -127,14 +122,17 @@ public class PipeTransferTsFileInsertionEventHandler
reader.close();
}
} catch (IOException e) {
- LOGGER.warn("Failed to close file reader.", e);
+ LOGGER.warn("Failed to close file reader when successfully transferred
file.", e);
} finally {
+ connector.commit(requestCommitId, event);
+
+ LOGGER.info(
+ "Successfully transferred file {}. Request commit id is {}.",
tsFile, requestCommitId);
+
if (client != null) {
client.setShouldReturnSelf(true);
client.returnSelf();
}
-
- connector.commit(requestCommitId, event);
}
return;
}
@@ -165,58 +163,22 @@ public class PipeTransferTsFileInsertionEventHandler
@Override
public void onError(Exception exception) {
+ LOGGER.warn(
+ "Failed to transfer tsfile {} (request commit id {}).", tsFile,
requestCommitId, exception);
+
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
- LOGGER.warn("Failed to close file reader.", e);
+ LOGGER.warn("Failed to close file reader when failed to transfer file.",
e);
} finally {
+ connector.addFailureEventToRetryQueue(requestCommitId, event);
+
if (client != null) {
client.setShouldReturnSelf(true);
client.returnSelf();
}
}
-
- ++retryCount;
-
- CompletableFuture.runAsync(
- () -> {
- try {
- Thread.sleep(
- Math.min(
- (long)
-
(PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()
- * Math.pow(2d, retryCount - 1d)),
- MAX_RETRY_WAIT_TIME_MS));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Unexpected interruption during retrying", e);
- }
-
- if (connector.isClosed()) {
- LOGGER.info(
- "IoTDBThriftConnectorV2 has been stopped, we will not retry to
transfer tsfile {}.",
- tsFile,
- exception);
- } else {
- LOGGER.warn(
- "IoTDBThriftConnectorV2 failed to transfer tsfile {} after {}
times, retrying...",
- tsFile,
- retryCount,
- exception);
-
- try {
- position = 0;
- reader = new RandomAccessFile(tsFile, "r");
- isSealSignalSent.set(false);
-
- connector.transfer(requestCommitId, this);
- } catch (FileNotFoundException e) {
- LOGGER.error("Exception occurred when retrying...", e);
- onError(e);
- }
- }
- });
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index d3967213e06..b1ab2b77e6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
@@ -162,6 +163,8 @@ public class LoadTsFileManager {
try {
Files.delete(loadDirPath);
LOGGER.info("Load dir {} was deleted.", loadDirPath);
+ } catch (DirectoryNotEmptyException e) {
+ LOGGER.info("Load dir {} is not empty, skip deleting.", loadDirPath);
} catch (IOException e) {
LOGGER.warn(MESSAGE_DELETE_FAIL, loadDirPath, e);
}
@@ -179,6 +182,8 @@ public class LoadTsFileManager {
try {
Files.delete(loadDirPath);
LOGGER.info("Load dir {} was deleted.", loadDirPath);
+ } catch (DirectoryNotEmptyException e) {
+ LOGGER.info("Load dir {} is not empty, skip deleting.", loadDirPath);
} catch (IOException e) {
LOGGER.warn(MESSAGE_DELETE_FAIL, loadDirPath, e);
}
@@ -286,6 +291,8 @@ public class LoadTsFileManager {
}
try {
Files.delete(taskDir.toPath());
+ } catch (DirectoryNotEmptyException e) {
+ LOGGER.info("Task dir {} is not empty, skip deleting.",
taskDir.getPath());
} catch (IOException e) {
LOGGER.warn(MESSAGE_DELETE_FAIL, taskDir.getPath(), e);
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 98104de3b5b..2a6ead060ec 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1008,6 +1008,18 @@ cluster_name=defaultCluster
# The size of the pending queue for the PipeConnector to store the events.
# pipe_connector_pending_queue_size=1024
+# If the thrift RPC compression is enabled.
+# pipe_async_connector_rpc_thrift_compression_enabled=false
+
+# The maximum number of selectors that can be used in the async connector.
+# pipe_async_connector_selector_number=1
+
+# The core number of clients that can be used in the async connector.
+# pipe_async_connector_core_client_number=8
+
+# The maximum number of clients that can be used in the async connector.
+# pipe_async_connector_max_client_number=16
+
# True if the pipe heartbeat is seperated from the cluster's heartbeat, false
if the pipe heartbeat is
# merged with the cluster's heartbeat.
# pipe_heartbeat_seperated_mode_enabled=true
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index b0dc9ad2e91..72683e3a29d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -260,19 +260,21 @@ public class ClientPoolFactory {
@Override
public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient>
createClientPool(
ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
- GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient>
clientPool =
+ final GenericKeyedObjectPool<TEndPoint,
AsyncPipeDataTransferServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncPipeDataTransferServiceClient.Factory(
manager,
new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+ .setConnectionTimeoutMs((int)
conf.getPipeConnectorTimeoutMs())
+ .setRpcThriftCompressionEnabled(
+
conf.isPipeAsyncConnectorRPCThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(
+ conf.getPipeAsyncConnectorSelectorNumber())
.build(),
- ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
+ ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+
.setCoreClientNumForEachNode(conf.getPipeAsyncConnectorCoreClientNumber())
+
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 534349460f1..ad0f006b5e0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncClient
implements ThriftClient {
@@ -42,6 +43,9 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
private static final Logger LOGGER =
LoggerFactory.getLogger(AsyncPipeDataTransferServiceClient.class);
+ private static final AtomicInteger idGenerator = new AtomicInteger(0);
+ private final int id = idGenerator.incrementAndGet();
+
private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
@@ -49,6 +53,8 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
private final AtomicBoolean shouldReturnSelf = new AtomicBoolean(true);
+ private final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
+
public AsyncPipeDataTransferServiceClient(
ThriftClientProperty property,
TEndPoint endpoint,
@@ -81,7 +87,7 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
@Override
public void invalidate() {
if (!hasError()) {
- super.onError(new Exception("This client has been invalidated"));
+ super.onError(new Exception(String.format("This client %d has been
invalidated", id)));
}
}
@@ -126,9 +132,18 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
}
}
+ public boolean isHandshakeFinished() {
+ return isHandshakeFinished.get();
+ }
+
+ public void markHandshakeFinished() {
+ isHandshakeFinished.set(true);
+ LOGGER.info("Handshake finished for client {}", this);
+ }
+
@Override
public String toString() {
- return String.format("AsyncPipeDataTransferServiceClient{%s}", endpoint);
+ return String.format("AsyncPipeDataTransferServiceClient{%s}, id = {%d}",
endpoint, id);
}
public static class Factory
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 6464d6129a3..a4ee9044800 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -129,6 +129,8 @@ public enum ThreadName {
PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
+ PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
+ PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER("Pipe-Async-Connector-Retry-Trigger"),
PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
STATEFUL_TRIGGER_INFORMATION_UPDATER("Stateful-Trigger-Information-Updater"),
@@ -259,7 +261,10 @@ public enum ThreadName {
PIPE_CONNECTOR_EXECUTOR_POOL,
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL,
PIPE_RUNTIME_META_SYNCER,
+ PIPE_RUNTIME_HEARTBEAT,
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
+ PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
+ PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER,
PIPE_WAL_RESOURCE_TTL_CHECKER,
WINDOW_EVALUATION_SERVICE,
STATEFUL_TRIGGER_INFORMATION_UPDATER));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index eb8d5f7b2f7..6405201d26d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -167,6 +167,11 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private int pipeConnectorPendingQueueSize = 1024;
+ private boolean pipeAsyncConnectorRPCThriftCompressionEnabled = false;
+ private int pipeAsyncConnectorSelectorNumber = 1;
+ private int pipeAsyncConnectorCoreClientNumber = 8;
+ private int pipeAsyncConnectorMaxClientNumber = 16;
+
private boolean isSeperatedPipeHeartbeatEnabled = true;
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -554,6 +559,40 @@ public class CommonConfig {
this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
}
+ public void setPipeAsyncConnectorRPCThriftCompressionEnabled(
+ boolean pipeAsyncConnectorRPCThriftCompressionEnabled) {
+ this.pipeAsyncConnectorRPCThriftCompressionEnabled =
+ pipeAsyncConnectorRPCThriftCompressionEnabled;
+ }
+
+ public boolean isPipeAsyncConnectorRPCThriftCompressionEnabled() {
+ return pipeAsyncConnectorRPCThriftCompressionEnabled;
+ }
+
+ public int getPipeAsyncConnectorSelectorNumber() {
+ return pipeAsyncConnectorSelectorNumber;
+ }
+
+ public void setPipeAsyncConnectorSelectorNumber(int
pipeAsyncConnectorSelectorNumber) {
+ this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber;
+ }
+
+ public int getPipeAsyncConnectorCoreClientNumber() {
+ return pipeAsyncConnectorCoreClientNumber;
+ }
+
+ public void setPipeAsyncConnectorCoreClientNumber(int
pipeAsyncConnectorCoreClientNumber) {
+ this.pipeAsyncConnectorCoreClientNumber =
pipeAsyncConnectorCoreClientNumber;
+ }
+
+ public int getPipeAsyncConnectorMaxClientNumber() {
+ return pipeAsyncConnectorMaxClientNumber;
+ }
+
+ public void setPipeAsyncConnectorMaxClientNumber(int
pipeAsyncConnectorMaxClientNumber) {
+ this.pipeAsyncConnectorMaxClientNumber = pipeAsyncConnectorMaxClientNumber;
+ }
+
public boolean isSeperatedPipeHeartbeatEnabled() {
return isSeperatedPipeHeartbeatEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f45c92d479c..d29c0333a59 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -327,6 +327,27 @@ public class CommonDescriptor {
"pipe_connector_pending_queue_size",
String.valueOf(config.getPipeConnectorPendingQueueSize()))));
+ config.setPipeAsyncConnectorRPCThriftCompressionEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_async_connector_rpc_thrift_compression_enable",
+
String.valueOf(config.isPipeAsyncConnectorRPCThriftCompressionEnabled()))));
+ config.setPipeAsyncConnectorSelectorNumber(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_async_connector_selector_number",
+
String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))));
+ config.setPipeAsyncConnectorCoreClientNumber(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_async_connector_core_client_number",
+
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber()))));
+ config.setPipeAsyncConnectorMaxClientNumber(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_async_connector_max_client_number",
+
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))));
+
config.setSeperatedPipeHeartbeatEnabled(
Boolean.parseBoolean(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 9cb9f4d70c0..db33599278b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -107,6 +107,22 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
}
+ public boolean isPipeAsyncConnectorRPCThriftCompressionEnabled() {
+ return COMMON_CONFIG.isPipeAsyncConnectorRPCThriftCompressionEnabled();
+ }
+
+ public int getPipeAsyncConnectorSelectorNumber() {
+ return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
+ }
+
+ public int getPipeAsyncConnectorCoreClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncConnectorCoreClientNumber();
+ }
+
+ public int getPipeAsyncConnectorMaxClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {