This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e95510d4aeb Pipe: Optimize realtime performace when pipe starts after
long time stop with heavy data backlog (#15048)
e95510d4aeb is described below
commit e95510d4aeb94c12e5646e76fa33c4584615b072
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Mar 10 19:34:36 2025 +0800
Pipe: Optimize realtime performace when pipe starts after long time stop
with heavy data backlog (#15048)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 54 +++++++++++++++++++++-
.../PipeRealtimePriorityBlockingQueue.java | 49 ++++++++++++++------
.../async/IoTDBDataRegionAsyncConnector.java | 20 +++++++-
.../PipeRealtimeDataRegionHybridExtractor.java | 7 +++
.../dataregion/wal/utils/WALInsertNodeCache.java | 7 ++-
.../apache/iotdb/commons/conf/CommonConfig.java | 44 +++++++++++++++---
.../iotdb/commons/conf/CommonDescriptor.java | 28 +++++++++--
.../commons/pipe/agent/task/PipeTaskAgent.java | 2 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 27 +++++++++--
9 files changed, 205 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 34d11913dff..f521698b87c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -111,6 +111,54 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
}
+ ////////////////////////// Manage by Pipe Name //////////////////////////
+
+ @Override
+ protected void startPipe(final String pipeName, final long creationTime) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ final PipeStatus status =
existedPipeMeta.getRuntimeMeta().getStatus().get();
+ if (PipeStatus.STOPPED.equals(status) || status == null) {
+ restartPipeToReloadResourceIfNeeded(existedPipeMeta);
+ }
+
+ super.startPipe(pipeName, creationTime);
+ }
+
+ private void restartPipeToReloadResourceIfNeeded(final PipeMeta pipeMeta) {
+ if (System.currentTimeMillis() - pipeMeta.getStaticMeta().getCreationTime()
+ < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+ return;
+ }
+
+ final AtomicLong lastRestartTime =
+
PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
+ if (lastRestartTime != null
+ && System.currentTimeMillis() - lastRestartTime.get()
+ < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+ LOGGER.info(
+ "Skipping reload resource for stopped pipe {} before starting it
because reloading resource is too frequent.",
+ pipeMeta.getStaticMeta().getPipeName());
+ return;
+ }
+
+ if (PIPE_NAME_TO_LAST_RESTART_TIME_MAP.isEmpty()) {
+ LOGGER.info(
+ "Flushing storage engine before restarting pipe {}.",
+ pipeMeta.getStaticMeta().getPipeName());
+ final long currentTime = System.currentTimeMillis();
+ StorageEngine.getInstance().syncCloseAllProcessor();
+ WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
+ LOGGER.info(
+ "Finished flushing storage engine, time cost: {} ms.",
+ System.currentTimeMillis() - currentTime);
+ }
+
+ restartStuckPipe(pipeMeta);
+ LOGGER.info(
+ "Reloaded resource for stopped pipe {} before starting it.",
+ pipeMeta.getStaticMeta().getPipeName());
+ }
+
///////////////////////// Manage by regionGroupId /////////////////////////
@Override
@@ -674,7 +722,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
private void restartStuckPipe(final PipeMeta pipeMeta) {
- LOGGER.warn("Pipe {} will be restarted because of stuck.",
pipeMeta.getStaticMeta());
+ LOGGER.warn(
+ "Pipe {} will be restarted because it is stuck or has encountered
issues such as data backlog or being stopped for too long.",
+ pipeMeta.getStaticMeta());
acquireWriteLock();
try {
final long startTime = System.currentTimeMillis();
@@ -688,7 +738,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
handleSinglePipeMetaChanges(originalPipeMeta);
LOGGER.warn(
- "Pipe {} was restarted because of stuck, time cost: {} ms.",
+ "Pipe {} was restarted because of stuck or data backlog, time cost:
{} ms.",
originalPipeMeta.getStaticMeta(),
System.currentTimeMillis() - startTime);
} catch (final Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index a170b19bd42..d9eac5f5625 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -33,6 +33,7 @@ import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQueue<Event> {
@@ -40,10 +41,13 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
private final BlockingDeque<TsFileInsertionEvent> tsfileInsertEventDeque =
new LinkedBlockingDeque<>();
- private final AtomicInteger eventCount = new AtomicInteger(0);
+ private static final int POLL_TSFILE_THRESHOLD =
+ PipeConfig.getInstance().getPipeRealTimeQueuePollTsFileThreshold();
+ private final AtomicInteger pollTsFileCounter = new AtomicInteger(0);
- private static final int pollHistoryThreshold =
- PipeConfig.getInstance().getPipeRealTimeQueuePollHistoryThreshold();
+ private static final int POLL_HISTORICAL_TSFILE_THRESHOLD =
+
Math.max(PipeConfig.getInstance().getPipeRealTimeQueuePollHistoricalTsFileThreshold(),
1);
+ private final AtomicLong pollHistoricalTsFileCounter = new AtomicLong(0);
public PipeRealtimePriorityBlockingQueue() {
super(new PipeDataRegionEventCounter());
@@ -81,18 +85,24 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
@Override
public Event directPoll() {
Event event = null;
- if (eventCount.get() >= pollHistoryThreshold) {
- event = tsfileInsertEventDeque.pollFirst();
- eventCount.set(0);
+ if (pollTsFileCounter.get() >= POLL_TSFILE_THRESHOLD) {
+ event =
+ pollHistoricalTsFileCounter.incrementAndGet() %
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+ ? tsfileInsertEventDeque.pollFirst()
+ : tsfileInsertEventDeque.pollLast();
+ pollTsFileCounter.set(0);
}
if (Objects.isNull(event)) {
// Sequentially poll the first offered non-TsFileInsertionEvent
event = super.directPoll();
if (Objects.isNull(event)) {
- event = tsfileInsertEventDeque.pollFirst();
+ event =
+ pollHistoricalTsFileCounter.incrementAndGet() %
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+ ? tsfileInsertEventDeque.pollFirst()
+ : tsfileInsertEventDeque.pollLast();
}
if (event != null) {
- eventCount.incrementAndGet();
+ pollTsFileCounter.incrementAndGet();
}
}
@@ -113,18 +123,24 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
@Override
public Event waitedPoll() {
Event event = null;
- if (eventCount.get() >= pollHistoryThreshold) {
- event = tsfileInsertEventDeque.pollFirst();
- eventCount.set(0);
+ if (pollTsFileCounter.get() >= POLL_TSFILE_THRESHOLD) {
+ event =
+ pollHistoricalTsFileCounter.incrementAndGet() %
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+ ? tsfileInsertEventDeque.pollFirst()
+ : tsfileInsertEventDeque.pollLast();
+ pollTsFileCounter.set(0);
}
if (event == null) {
// Sequentially poll the first offered non-TsFileInsertionEvent
event = super.directPoll();
if (event == null && !tsfileInsertEventDeque.isEmpty()) {
- event = tsfileInsertEventDeque.pollFirst();
+ event =
+ pollHistoricalTsFileCounter.incrementAndGet() %
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+ ? tsfileInsertEventDeque.pollFirst()
+ : tsfileInsertEventDeque.pollLast();
}
if (event != null) {
- eventCount.incrementAndGet();
+ pollTsFileCounter.incrementAndGet();
}
}
@@ -132,10 +148,13 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
if (Objects.isNull(event)) {
event = super.waitedPoll();
if (Objects.isNull(event)) {
- event = tsfileInsertEventDeque.pollFirst();
+ event =
+ pollHistoricalTsFileCounter.incrementAndGet() %
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+ ? tsfileInsertEventDeque.pollFirst()
+ : tsfileInsertEventDeque.pollLast();
}
if (event != null) {
- eventCount.incrementAndGet();
+ pollTsFileCounter.incrementAndGet();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index d98b93d9526..bf0b8df2d6c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
@@ -92,6 +93,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private final IoTDBDataRegionSyncConnector retryConnector = new
IoTDBDataRegionSyncConnector();
private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
+ private final long maxRetryExecutionTimeMsPerCall =
+
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
private IoTDBDataNodeAsyncClientManager clientManager;
@@ -416,11 +419,21 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
* @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
*/
private void transferQueuedEventsIfNecessary() throws Exception {
+ if (retryEventQueue.isEmpty()) {
+ // Trigger cron heartbeat event in retry connector to send batch in time
+ retryConnector.transfer(PipeConnectorSubtask.CRON_HEARTBEAT_EVENT);
+ return;
+ }
+
+ final long retryStartTime = System.currentTimeMillis();
while (!retryEventQueue.isEmpty()) {
synchronized (this) {
- if (isClosed.get() || retryEventQueue.isEmpty()) {
+ if (isClosed.get()) {
return;
}
+ if (retryEventQueue.isEmpty()) {
+ break;
+ }
final Event peekedEvent = retryEventQueue.peek();
@@ -453,6 +466,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
LOGGER.debug("Polled event {} from retry queue.", polledEvent);
}
}
+
+ // Stop retrying if the execution time exceeds the threshold for better
realtime performance
+ if (System.currentTimeMillis() - retryStartTime >
maxRetryExecutionTimeMsPerCall) {
+ break;
+ }
}
// Trigger cron heartbeat event in retry connector to send batch in time
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 26cc157e7ee..b1baca7c42a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -48,6 +48,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
+ private final boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled =
+ PipeConfig.getInstance().isPipeEpochKeepTsFileAfterStuckRestartEnabled();
+
@Override
protected void doExtract(final PipeRealtimeEvent event) {
final Event eventToExtract = event.getEvent();
@@ -223,6 +226,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private boolean isPipeTaskCurrentlyRestarted(final PipeRealtimeEvent event) {
+ if (!isPipeEpochKeepTsFileAfterStuckRestartEnabled) {
+ return false;
+ }
+
final boolean isPipeTaskCurrentlyRestarted =
PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
if (isPipeTaskCurrentlyRestarted && event.mayExtractorUseTablets(this)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 76857c04983..0541ff96188 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -58,6 +59,7 @@ public class WALInsertNodeCache {
private static final Logger LOGGER =
LoggerFactory.getLogger(WALInsertNodeCache.class);
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
private final PipeMemoryBlock allocatedMemoryBlock;
// Used to adjust the memory usage of the cache
@@ -75,8 +77,9 @@ public class WALInsertNodeCache {
final long requestedAllocateSize =
(long)
Math.min(
- (double) 2 * CONFIG.getWalFileSizeThresholdInByte(),
- CONFIG.getAllocateMemoryForPipe() * 0.8 / 5);
+ (double) PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
+ * CONFIG.getWalFileSizeThresholdInByte(),
+ CONFIG.getAllocateMemoryForPipe() * 0.45);
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory()
.tryAllocate(requestedAllocateSize)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cb69d5db391..d37e1c1eda9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -204,7 +204,8 @@ public class CommonConfig {
private boolean pipeFileReceiverFsyncEnabled = true;
- private int pipeRealTimeQueuePollHistoryThreshold = 1;
+ private int pipeRealTimeQueuePollTsFileThreshold = 10;
+ private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 3;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
@@ -234,6 +235,7 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+ private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
private int pipeAsyncConnectorMaxClientNumber =
@@ -258,12 +260,13 @@ public class CommonConfig {
private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
- private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 10;
+ private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 5;
private int pipeMaxAllowedPinnedMemTableCount = 10; // per data region
private long pipeMaxAllowedLinkedTsFileCount = 300;
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
private long pipeStuckRestartIntervalSeconds = 120;
private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
+ private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false;
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
private int pipeMetaReportMaxLogNumPerRound = 10;
@@ -857,6 +860,16 @@ public class CommonConfig {
return pipeConnectorRPCThriftCompressionEnabled;
}
+ public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+ long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
+ this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
+ pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+ }
+
+ public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+ return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+ }
+
public int getPipeAsyncConnectorSelectorNumber() {
return pipeAsyncConnectorSelectorNumber;
}
@@ -990,12 +1003,22 @@ public class CommonConfig {
this.pipeSubtaskExecutorForcedRestartIntervalMs =
pipeSubtaskExecutorForcedRestartIntervalMs;
}
- public int getPipeRealTimeQueuePollHistoryThreshold() {
- return pipeRealTimeQueuePollHistoryThreshold;
+ public int getPipeRealTimeQueuePollTsFileThreshold() {
+ return pipeRealTimeQueuePollTsFileThreshold;
+ }
+
+ public void setPipeRealTimeQueuePollTsFileThreshold(int
pipeRealTimeQueuePollTsFileThreshold) {
+ this.pipeRealTimeQueuePollTsFileThreshold =
pipeRealTimeQueuePollTsFileThreshold;
}
- public void setPipeRealTimeQueuePollHistoryThreshold(int
pipeRealTimeQueuePollHistoryThreshold) {
- this.pipeRealTimeQueuePollHistoryThreshold =
pipeRealTimeQueuePollHistoryThreshold;
+ public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() {
+ return pipeRealTimeQueuePollHistoricalTsFileThreshold;
+ }
+
+ public void setPipeRealTimeQueuePollHistoricalTsFileThreshold(
+ int pipeRealTimeQueuePollHistoricalTsFileThreshold) {
+ this.pipeRealTimeQueuePollHistoricalTsFileThreshold =
+ pipeRealTimeQueuePollHistoricalTsFileThreshold;
}
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
@@ -1077,6 +1100,10 @@ public class CommonConfig {
return pipeStuckRestartMinIntervalMs;
}
+ public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() {
+ return pipeEpochKeepTsFileAfterStuckRestartEnabled;
+ }
+
public long getPipeStorageEngineFlushTimeIntervalMs() {
return pipeStorageEngineFlushTimeIntervalMs;
}
@@ -1089,6 +1116,11 @@ public class CommonConfig {
this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs;
}
+ public void setPipeEpochKeepTsFileAfterStuckRestartEnabled(
+ boolean pipeEpochKeepTsFileAfterStuckRestartEnabled) {
+ this.pipeEpochKeepTsFileAfterStuckRestartEnabled =
pipeEpochKeepTsFileAfterStuckRestartEnabled;
+ }
+
public void setPipeStorageEngineFlushTimeIntervalMs(long
pipeStorageEngineFlushTimeIntervalMs) {
this.pipeStorageEngineFlushTimeIntervalMs =
pipeStorageEngineFlushTimeIntervalMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 2afe9bbdeb4..b3613bb6677 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -300,11 +300,19 @@ public class CommonDescriptor {
String.valueOf(
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
- config.setPipeRealTimeQueuePollHistoryThreshold(
+ config.setPipeRealTimeQueuePollTsFileThreshold(
+ Integer.parseInt(
+ Optional.ofNullable(
+
properties.getProperty("pipe_realtime_queue_poll_history_threshold"))
+ .orElse(
+ properties.getProperty(
+ "pipe_realtime_queue_poll_tsfile_threshold",
+
String.valueOf(config.getPipeRealTimeQueuePollTsFileThreshold())))));
+ config.setPipeRealTimeQueuePollHistoricalTsFileThreshold(
Integer.parseInt(
properties.getProperty(
- "pipe_realtime_queue_poll_history_threshold",
-
Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold()))));
+ "pipe_realtime_queue_poll_historical_tsfile_threshold",
+
String.valueOf(config.getPipeRealTimeQueuePollHistoricalTsFileThreshold()))));
int pipeSubtaskExecutorMaxThreadNum =
Integer.parseInt(
@@ -413,6 +421,15 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+ config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+ Long.parseLong(
+ Optional.ofNullable(
+
properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))
+ .orElse(
+ properties.getProperty(
+
"pipe_async_connector_max_retry_execution_time_ms_per_call",
+ String.valueOf(
+
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
int pipeAsyncConnectorSelectorNumber =
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
@@ -532,6 +549,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_stuck_restart_min_interval_ms",
String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
+ config.setPipeEpochKeepTsFileAfterStuckRestartEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_epoch_keep_tsfile_after_stuck_restart_enabled",
+
String.valueOf(config.isPipeEpochKeepTsFileAfterStuckRestartEnabled()))));
config.setPipeStorageEngineFlushTimeIntervalMs(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index addd9d1f0bf..41cf6df907d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -576,7 +576,7 @@ public abstract class PipeTaskAgent {
return true;
}
- private void startPipe(final String pipeName, final long creationTime) {
+ protected void startPipe(final String pipeName, final long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (!checkBeforeStartPipe(existedPipeMeta, pipeName, creationTime)) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4492df7a5db..0f558782f00 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -78,8 +78,12 @@ public class PipeConfig {
/////////////////////////////// Subtask Connector
///////////////////////////////
- public int getPipeRealTimeQueuePollHistoryThreshold() {
- return COMMON_CONFIG.getPipeRealTimeQueuePollHistoryThreshold();
+ public int getPipeRealTimeQueuePollTsFileThreshold() {
+ return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
+ }
+
+ public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() {
+ return COMMON_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
}
/////////////////////////////// Subtask Executor
///////////////////////////////
@@ -148,6 +152,10 @@ public class PipeConfig {
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
}
+ public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+ return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+ }
+
public int getPipeAsyncConnectorSelectorNumber() {
return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
}
@@ -264,6 +272,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs();
}
+ public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() {
+ return COMMON_CONFIG.isPipeEpochKeepTsFileAfterStuckRestartEnabled();
+ }
+
public long getPipeStorageEngineFlushTimeIntervalMs() {
return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
}
@@ -373,7 +385,10 @@ public class PipeConfig {
getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
LOGGER.info(
- "PipeRealTimeQueuePollHistoryThreshold: {}",
getPipeRealTimeQueuePollHistoryThreshold());
+ "PipeRealTimeQueuePollTsFileThreshold: {}",
getPipeRealTimeQueuePollTsFileThreshold());
+ LOGGER.info(
+ "PipeRealTimeQueuePollHistoricalTsFileThreshold: {}",
+ getPipeRealTimeQueuePollHistoricalTsFileThreshold());
LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}",
getPipeSubtaskExecutorMaxThreadNum());
LOGGER.info(
@@ -423,6 +438,9 @@ public class PipeConfig {
"PipeRemainingTimeCommitRateAverageTime: {}",
getPipeRemainingTimeCommitRateAverageTime());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
+ LOGGER.info(
+ "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
+ getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
@@ -468,6 +486,9 @@ public class PipeConfig {
getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
LOGGER.info("PipeStuckRestartIntervalSeconds: {}",
getPipeStuckRestartIntervalSeconds());
LOGGER.info("PipeStuckRestartMinIntervalMs: {}",
getPipeStuckRestartMinIntervalMs());
+ LOGGER.info(
+ "PipeEpochKeepTsFileAfterStuckRestartEnabled: {}",
+ isPipeEpochKeepTsFileAfterStuckRestartEnabled());
LOGGER.info(
"PipeStorageEngineFlushTimeIntervalMs: {}",
getPipeStorageEngineFlushTimeIntervalMs());