This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9f277c0ce23 Pipe: iotdb-thrift-connector async retry mechanism
(#14916) (#15201)
9f277c0ce23 is described below
commit 9f277c0ce235d419ae6950981f7a4f6343512075
Author: Itami Sho <[email protected]>
AuthorDate: Thu Mar 27 17:22:09 2025 +0800
Pipe: iotdb-thrift-connector async retry mechanism (#14916) (#15201)
---
.../evolvable/batch/PipeTabletEventBatch.java | 12 +-
.../batch/PipeTransferBatchReqBuilder.java | 4 +-
.../async/IoTDBDataRegionAsyncConnector.java | 175 +++++++++++++++------
.../apache/iotdb/commons/conf/CommonConfig.java | 35 ++++-
.../iotdb/commons/conf/CommonDescriptor.java | 30 ++++
.../iotdb/commons/pipe/config/PipeConfig.java | 22 +++
6 files changed, 220 insertions(+), 58 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 44374adb22a..bd07d0c34ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -69,8 +69,16 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
if (((EnrichedEvent) event)
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
- if (constructBatch(event)) {
- events.add((EnrichedEvent) event);
+ try {
+ if (constructBatch(event)) {
+ events.add((EnrichedEvent) event);
+ }
+ } catch (final Exception e) {
+ // If the event is not added to the batch, we need to decrease the
reference count.
+ ((EnrichedEvent) event)
+
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
+ // Will cause a retry
+ throw e;
}
if (firstEventProcessingTime == Long.MIN_VALUE) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 9b787ee30f6..74ab3a1ebdc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,8 +127,7 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
* endpoint to transfer to (might be null), the second element is the
batch to be transferred.
*/
public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
- final TabletInsertionEvent event)
- throws IOException, WALPipeException, WriteProcessException {
+ final TabletInsertionEvent event) throws IOException, WALPipeException {
if (!(event instanceof EnrichedEvent)) {
LOGGER.warn(
"Unsupported event {} type {} when building transfer request",
event, event.getClass());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index b92c066e8be..ca38765820c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -24,7 +24,6 @@ import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
@@ -45,6 +44,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -53,6 +53,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import com.google.common.collect.ImmutableSet;
@@ -91,8 +92,16 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
"Exception occurred while sending to receiver %s:%s.";
- private final IoTDBDataRegionSyncConnector retryConnector = new
IoTDBDataRegionSyncConnector();
+ private final IoTDBDataRegionSyncConnector syncConnector = new
IoTDBDataRegionSyncConnector();
private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
+ private final PipeDataRegionEventCounter retryEventQueueEventCounter =
+ new PipeDataRegionEventCounter();
+ private final int forcedRetryTsFileEventQueueSizeThreshold =
+
PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
+ private final int forcedRetryTabletEventQueueSizeThreshold =
+
PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
+ private final int forcedRetryTotalEventQueueSizeThreshold =
+
PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
private final long maxRetryExecutionTimeMsPerCall =
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
@@ -108,7 +117,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
super.validate(validator);
- retryConnector.validate(validator);
+ syncConnector.validate(validator);
final PipeParameters parameters = validator.getParameters();
@@ -125,7 +134,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
throws Exception {
super.customize(parameters, configuration);
- retryConnector.customize(parameters, configuration);
+ syncConnector.customize(parameters, configuration);
clientManager =
new IoTDBDataNodeAsyncClientManager(
@@ -149,17 +158,17 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
@Override
// Synchronized to avoid close connector when transfer event
public synchronized void handshake() throws Exception {
- retryConnector.handshake();
+ syncConnector.handshake();
}
@Override
- public void heartbeat() {
- retryConnector.heartbeat();
+ public void heartbeat() throws Exception {
+ syncConnector.heartbeat();
}
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
- transferQueuedEventsIfNecessary();
+ transferQueuedEventsIfNecessary(false);
if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
&& !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
@@ -173,9 +182,6 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
if (isTabletBatchModeEnabled) {
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
tabletBatchBuilder.onEvent(tabletInsertionEvent);
- if (Objects.isNull(endPointAndBatch)) {
- return;
- }
transferInBatchWithoutCheck(endPointAndBatch);
} else {
transferInEventWithoutCheck(tabletInsertionEvent);
@@ -185,6 +191,9 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private void transferInBatchWithoutCheck(
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch)
throws IOException, WriteProcessException, InterruptedException {
+ if (Objects.isNull(endPointAndBatch)) {
+ return;
+ }
final PipeTabletEventBatch batch = endPointAndBatch.getRight();
if (batch instanceof PipeTabletEventPlainBatch) {
@@ -226,7 +235,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
endPointAndBatch.getRight().onSuccess();
}
- private void transferInEventWithoutCheck(final TabletInsertionEvent
tabletInsertionEvent)
+ private boolean transferInEventWithoutCheck(final TabletInsertionEvent
tabletInsertionEvent)
throws Exception {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
@@ -234,7 +243,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
IoTDBDataRegionAsyncConnector.class.getName())) {
- return;
+ return false;
}
final InsertNode insertNode =
@@ -258,7 +267,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
IoTDBDataRegionAsyncConnector.class.getName())) {
- return;
+ return false;
}
final TPipeTransferReq pipeTransferTabletRawReq =
@@ -272,6 +281,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
transfer(pipeRawTabletInsertionEvent.getDeviceId(),
pipeTransferTabletReqHandler);
}
+
+ return true;
}
private void transfer(
@@ -314,7 +325,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
@Override
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
- transferQueuedEventsIfNecessary();
+ transferQueuedEventsIfNecessary(false);
transferBatchedEventsIfNecessary();
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
@@ -327,14 +338,14 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
transferWithoutCheck(tsFileInsertionEvent);
}
- private void transferWithoutCheck(final TsFileInsertionEvent
tsFileInsertionEvent)
+ private boolean transferWithoutCheck(final TsFileInsertionEvent
tsFileInsertionEvent)
throws Exception {
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeTsFileInsertionEvent.increaseReferenceCount(
IoTDBDataRegionAsyncConnector.class.getName())) {
- return;
+ return false;
}
// We assume that no exceptions will be thrown after reference count is
increased.
@@ -361,6 +372,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
&& clientManager.supportModsIfIsDataNodeReceiver());
transfer(pipeTransferTsFileHandler);
+ return true;
} catch (final Exception e) {
// Just in case. To avoid the case that exception occurred when
constructing the handler.
pipeTsFileInsertionEvent.decreaseReferenceCount(
@@ -382,7 +394,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
@Override
public void transfer(final Event event) throws Exception {
- transferQueuedEventsIfNecessary();
+ transferQueuedEventsIfNecessary(true);
transferBatchedEventsIfNecessary();
if (!(event instanceof PipeHeartbeatEvent
@@ -393,7 +405,20 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return;
}
- retryConnector.transfer(event);
+ syncConnector.transfer(event);
+ }
+
+ /** Try its best to commit data in order. Flush can also be a trigger to
transfer batched data. */
+ private void transferBatchedEventsIfNecessary()
+ throws IOException, WriteProcessException, InterruptedException {
+ if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
+ return;
+ }
+
+ for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
+ tabletBatchBuilder.getAllNonEmptyBatches()) {
+ transferInBatchWithoutCheck(endPointAndBatch);
+ }
}
//////////////////////////// Leader cache update ////////////////////////////
@@ -425,14 +450,19 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
* @see PipeConnector#transfer(TabletInsertionEvent) for more details.
* @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
*/
- private void transferQueuedEventsIfNecessary() throws Exception {
- if (retryEventQueue.isEmpty()) {
- // Trigger cron heartbeat event in retry connector to send batch in time
- retryConnector.transfer(PipeConnectorSubtask.CRON_HEARTBEAT_EVENT);
+ private void transferQueuedEventsIfNecessary(final boolean forced) throws
Exception {
+ if (retryEventQueue.isEmpty()
+ || (!forced
+ && retryEventQueueEventCounter.getTabletInsertionEventCount()
+ < forcedRetryTabletEventQueueSizeThreshold
+ && retryEventQueueEventCounter.getTsFileInsertionEventCount()
+ < forcedRetryTsFileEventQueueSizeThreshold
+ && retryEventQueue.size() <
forcedRetryTotalEventQueueSizeThreshold)) {
return;
}
final long retryStartTime = System.currentTimeMillis();
+ final int remainingEvents = retryEventQueue.size();
while (!retryEventQueue.isEmpty()) {
synchronized (this) {
if (isClosed.get()) {
@@ -445,23 +475,19 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
final Event peekedEvent = retryEventQueue.peek();
if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- retryConnector.transfer((PipeInsertNodeTabletInsertionEvent)
peekedEvent);
+ retryTransfer((PipeInsertNodeTabletInsertionEvent) peekedEvent);
} else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
- retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
+ retryTransfer((PipeRawTabletInsertionEvent) peekedEvent);
} else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
- retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
+ retryTransfer((PipeTsFileInsertionEvent) peekedEvent);
} else {
LOGGER.warn(
"IoTDBThriftAsyncConnector does not support transfer generic
event: {}.",
peekedEvent);
}
- if (peekedEvent instanceof EnrichedEvent) {
- ((EnrichedEvent) peekedEvent)
-
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true);
- }
-
final Event polledEvent = retryEventQueue.poll();
+ retryEventQueueEventCounter.decreaseEventCount(polledEvent);
if (polledEvent != peekedEvent) {
LOGGER.error(
"The event polled from the queue is not the same as the event
peeked from the queue. "
@@ -476,24 +502,66 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
// Stop retrying if the execution time exceeds the threshold for better
realtime performance
if (System.currentTimeMillis() - retryStartTime >
maxRetryExecutionTimeMsPerCall) {
- break;
+ if (retryEventQueueEventCounter.getTabletInsertionEventCount()
+ < forcedRetryTabletEventQueueSizeThreshold
+ && retryEventQueueEventCounter.getTsFileInsertionEventCount()
+ < forcedRetryTsFileEventQueueSizeThreshold
+ && retryEventQueue.size() <
forcedRetryTotalEventQueueSizeThreshold) {
+ return;
+ }
+
+ if (remainingEvents <= retryEventQueue.size()) {
+ throw new PipeException("Failed to transfer events in retry queue.");
+ }
}
}
-
- // Trigger cron heartbeat event in retry connector to send batch in time
- retryConnector.transfer(PipeConnectorSubtask.CRON_HEARTBEAT_EVENT);
}
- /** Try its best to commit data in order. Flush can also be a trigger to
transfer batched data. */
- private void transferBatchedEventsIfNecessary()
- throws IOException, WriteProcessException, InterruptedException {
- if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
+ private void retryTransfer(final TabletInsertionEvent tabletInsertionEvent) {
+ if (isTabletBatchModeEnabled) {
+ try {
+
transferInBatchWithoutCheck(tabletBatchBuilder.onEvent(tabletInsertionEvent));
+ if (tabletInsertionEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent) tabletInsertionEvent)
+
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
+ }
+ } catch (final Exception e) {
+ addFailureEventToRetryQueue(tabletInsertionEvent);
+ }
return;
}
- for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
- tabletBatchBuilder.getAllNonEmptyBatches()) {
- transferInBatchWithoutCheck(endPointAndBatch);
+ // Tablet batch mode is not enabled, so we need to transfer the event
directly.
+ try {
+ if (transferInEventWithoutCheck(tabletInsertionEvent)) {
+ if (tabletInsertionEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent) tabletInsertionEvent)
+
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
+ }
+ } else {
+ addFailureEventToRetryQueue(tabletInsertionEvent);
+ }
+ } catch (final Exception e) {
+ if (tabletInsertionEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent) tabletInsertionEvent)
+
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
+ }
+ addFailureEventToRetryQueue(tabletInsertionEvent);
+ }
+ }
+
+ private void retryTransfer(final PipeTsFileInsertionEvent
tsFileInsertionEvent) {
+ try {
+ if (transferWithoutCheck(tsFileInsertionEvent)) {
+ tsFileInsertionEvent.decreaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName(), false);
+ } else {
+ addFailureEventToRetryQueue(tsFileInsertionEvent);
+ }
+ } catch (final Exception e) {
+ tsFileInsertionEvent.decreaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName(), false);
+ addFailureEventToRetryQueue(tsFileInsertionEvent);
}
}
@@ -516,6 +584,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
retryEventQueue.offer(event);
+ retryEventQueueEventCounter.increaseEventCount(event);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Added event {} to retry queue.", event);
}
@@ -536,15 +605,6 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
events.forEach(this::addFailureEventToRetryQueue);
}
- public synchronized void clearRetryEventsReferenceCount() {
- while (!retryEventQueue.isEmpty()) {
- final Event event = retryEventQueue.poll();
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
- }
- }
- }
-
//////////////////////////// Operations for close
////////////////////////////
@Override
@@ -559,6 +619,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
&& regionId == ((EnrichedEvent) event).getRegionId()) {
((EnrichedEvent) event)
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+ retryEventQueueEventCounter.decreaseEventCount(event);
return true;
}
return false;
@@ -570,7 +631,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
public synchronized void close() {
isClosed.set(true);
- retryConnector.close();
+ syncConnector.close();
if (tabletBatchBuilder != null) {
tabletBatchBuilder.close();
@@ -600,6 +661,16 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
super.close();
}
+ public synchronized void clearRetryEventsReferenceCount() {
+ while (!retryEventQueue.isEmpty()) {
+ final Event event = retryEventQueue.poll();
+ retryEventQueueEventCounter.decreaseEventCount(event);
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+ }
+ }
+ }
+
//////////////////////// APIs provided for metric framework
////////////////////////
public int getRetryEventQueueSize() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 99e041cdfb4..dd15c21a48c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -235,6 +235,9 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+ private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
+ private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
+ private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -261,7 +264,7 @@ public class CommonConfig {
private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 5;
- private int pipeMaxAllowedPinnedMemTableCount = 5; // per data region
+ private int pipeMaxAllowedPinnedMemTableCount = 10; // per data region
private long pipeMaxAllowedLinkedTsFileCount = 300;
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
private long pipeStuckRestartIntervalSeconds = 120;
@@ -862,6 +865,36 @@ public class CommonConfig {
return pipeConnectorRPCThriftCompressionEnabled;
}
+ public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
+ int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
+ this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold =
+ pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+ }
+
+ public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
+ return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+ }
+
+ public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
+ int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
+ this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold =
+ pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+ }
+
+ public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
+ return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+ }
+
+ public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
+ int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
+ this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold =
+ pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+ }
+
+ public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
+ return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+ }
+
public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c6b7013b23e..d7d8ba068f4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -430,6 +430,36 @@ public class CommonDescriptor {
"pipe_async_connector_max_retry_execution_time_ms_per_call",
String.valueOf(
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
+ config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
+ Integer.parseInt(
+ Optional.ofNullable(
+
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
+ .orElse(
+ properties.getProperty(
+
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
+ String.valueOf(
+ config
+
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
+ config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
+ Integer.parseInt(
+ Optional.ofNullable(
+
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
+ .orElse(
+ properties.getProperty(
+
"pipe_async_connector_forced_retry_tablet_event_queue_size",
+ String.valueOf(
+ config
+
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
+ config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
+ Integer.parseInt(
+ Optional.ofNullable(
+
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
+ .orElse(
+ properties.getProperty(
+
"pipe_async_connector_forced_retry_total_event_queue_size",
+ String.valueOf(
+ config
+
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
int pipeAsyncConnectorSelectorNumber =
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 0f558782f00..24d49da94c1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -152,6 +152,18 @@ public class PipeConfig {
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
}
+ public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
+ return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
+ }
+
+ public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
+ return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
+ }
+
+ public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
+ return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
+ }
+
public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
}
@@ -438,6 +450,16 @@ public class PipeConfig {
"PipeRemainingTimeCommitRateAverageTime: {}",
getPipeRemainingTimeCommitRateAverageTime());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
+ LOGGER.info(
+ "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
+ getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
+ LOGGER.info(
+ "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
+ getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold());
+ LOGGER.info(
+ "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
+ getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold());
+
LOGGER.info(
"PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());