This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 86f851c0e43 [To dev/1.3] Pipe: Changed the default value about batch
delay configuration (#15657) (#15658)
86f851c0e43 is described below
commit 86f851c0e43faa13fd113c9960ceb2c3120b7105
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 6 17:01:47 2025 +0800
[To dev/1.3] Pipe: Changed the default value about batch delay
configuration (#15657) (#15658)
* Fix
* Update PipeConnectorSubtask.java
* Fix
* Refactor batch
* Update CommonConfig.java
* Update PipeConnectorSubtask.java
* Update CommonConfig.java
* Update CommonConfig.java
---
.../subtask/connector/PipeConnectorSubtask.java | 12 +----
.../batch/PipeTransferBatchReqBuilder.java | 54 +++++++++++-----------
.../PipeConsensusTransferBatchReqBuilder.java | 4 +-
.../async/IoTDBDataRegionAsyncConnector.java | 15 +++---
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 13 ++----
.../apache/iotdb/commons/conf/CommonConfig.java | 7 +--
.../config/constant/PipeConnectorConstant.java | 4 +-
7 files changed, 49 insertions(+), 60 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 5e49b76b240..5d8d622a74a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.connector;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractConnectorSubtask;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -64,9 +63,6 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
// when no event can be pulled.
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
new PipeHeartbeatEvent("cron", false);
- private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS =
-
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()
* 1000;
- private long lastHeartbeatEventInjectTime = System.currentTimeMillis();
public PipeConnectorSubtask(
final String taskID,
@@ -105,12 +101,8 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
}
try {
- if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
- > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
- transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
- }
-
if (Objects.isNull(event)) {
+ transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
return false;
}
@@ -187,8 +179,6 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
e);
}
- lastHeartbeatEventInjectTime = System.currentTimeMillis();
-
event.onTransferred();
PipeDataRegionConnectorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index eed3a7e5bd9..7acc0aba6b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -45,9 +45,9 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE;
@@ -93,14 +93,13 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
final Integer requestMaxDelayInMillis =
parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY,
SINK_IOTDB_BATCH_DELAY_MS_KEY);
if (Objects.isNull(requestMaxDelayInMillis)) {
- final int requestMaxDelayInSeconds =
+ final int requestMaxDelayConfig =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
usingTsFileBatch
- ? CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE
- : CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
- requestMaxDelayInMs =
- requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE :
requestMaxDelayInSeconds * 1000;
+ ? CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE * 1000
+ : CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE);
+ requestMaxDelayInMs = requestMaxDelayConfig < 0 ? Integer.MAX_VALUE :
requestMaxDelayConfig;
} else {
requestMaxDelayInMs =
requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE :
requestMaxDelayInMillis;
@@ -122,20 +121,18 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
* duplicated.
*
* @param event the given {@link Event}
- * @return {@link Pair}<{@link TEndPoint}, {@link
PipeTabletEventPlainBatch}> not null means this
- * {@link PipeTabletEventPlainBatch} can be transferred. the first
element is the leader
- * endpoint to transfer to (might be null), the second element is the
batch to be transferred.
*/
- public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
- final TabletInsertionEvent event) throws IOException, WALPipeException {
+ public synchronized void onEvent(final TabletInsertionEvent event)
+ throws IOException, WALPipeException {
if (!(event instanceof EnrichedEvent)) {
LOGGER.warn(
"Unsupported event {} type {} when building transfer request",
event, event.getClass());
- return null;
+ return;
}
if (!useLeaderCache) {
- return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) :
null;
+ defaultBatch.onEvent(event);
+ return;
}
String deviceId = null;
@@ -146,35 +143,38 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
}
if (Objects.isNull(deviceId)) {
- return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) :
null;
+ defaultBatch.onEvent(event);
+ return;
}
final TEndPoint endPoint =
IoTDBDataNodeCacheLeaderClientManager.LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId);
if (Objects.isNull(endPoint)) {
- return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) :
null;
+ defaultBatch.onEvent(event);
+ return;
}
-
- final PipeTabletEventPlainBatch batch =
- endPointToBatch.computeIfAbsent(
+ endPointToBatch
+ .computeIfAbsent(
endPoint,
- k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes));
- return batch.onEvent(event) ? new Pair<>(endPoint, batch) : null;
+ k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes))
+ .onEvent(event);
}
/** Get all batches that have at least 1 event. */
- public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
getAllNonEmptyBatches() {
- final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyBatches = new
ArrayList<>();
- if (!defaultBatch.isEmpty()) {
- nonEmptyBatches.add(new Pair<>(null, defaultBatch));
+ public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
+ getAllNonEmptyAndShouldEmitBatches() {
+ final List<Pair<TEndPoint, PipeTabletEventBatch>>
nonEmptyAndShouldEmitBatches =
+ new ArrayList<>();
+ if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
+ nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
}
endPointToBatch.forEach(
(endPoint, batch) -> {
- if (!batch.isEmpty()) {
- nonEmptyBatches.add(new Pair<>(endPoint, batch));
+ if (!batch.isEmpty() && batch.shouldEmit()) {
+ nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
}
});
- return nonEmptyBatches;
+ return nonEmptyAndShouldEmitBatches;
}
public boolean isEmpty() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index 870cea0ee87..1b022aaeb8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -49,8 +49,8 @@ import java.util.Objects;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
@@ -94,7 +94,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder
implements AutoClosea
final long requestMaxBatchSizeInBytes =
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
- CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
+ CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE);
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 28644a0532c..e4634dd4300 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -173,9 +173,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
if (isTabletBatchModeEnabled) {
- final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
- tabletBatchBuilder.onEvent(tabletInsertionEvent);
- transferInBatchWithoutCheck(endPointAndBatch);
+ tabletBatchBuilder.onEvent(tabletInsertionEvent);
+ transferBatchedEventsIfNecessary();
} else {
transferInEventWithoutCheck(tabletInsertionEvent);
}
@@ -183,7 +182,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private void transferInBatchWithoutCheck(
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch)
- throws IOException, WriteProcessException, InterruptedException {
+ throws IOException, WriteProcessException {
if (Objects.isNull(endPointAndBatch)) {
return;
}
@@ -402,14 +401,13 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
/** Try its best to commit data in order. Flush can also be a trigger to
transfer batched data. */
- private void transferBatchedEventsIfNecessary()
- throws IOException, WriteProcessException, InterruptedException {
+ private void transferBatchedEventsIfNecessary() throws IOException,
WriteProcessException {
if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
return;
}
for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
- tabletBatchBuilder.getAllNonEmptyBatches()) {
+ tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) {
transferInBatchWithoutCheck(endPointAndBatch);
}
}
@@ -535,7 +533,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private void retryTransfer(final TabletInsertionEvent tabletInsertionEvent) {
if (isTabletBatchModeEnabled) {
try {
-
transferInBatchWithoutCheck(tabletBatchBuilder.onEvent(tabletInsertionEvent));
+ tabletBatchBuilder.onEvent(tabletInsertionEvent);
+ transferBatchedEventsIfNecessary();
if (tabletInsertionEvent instanceof EnrichedEvent) {
((EnrichedEvent) tabletInsertionEvent)
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 806cf5a41f2..e5ae94f5b2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -115,11 +115,8 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
try {
if (isTabletBatchModeEnabled) {
- final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
- tabletBatchBuilder.onEvent(tabletInsertionEvent);
- if (Objects.nonNull(endPointAndBatch)) {
- doTransferWrapper(endPointAndBatch);
- }
+ tabletBatchBuilder.onEvent(tabletInsertionEvent);
+ doTransferWrapper();
} else {
if (tabletInsertionEvent instanceof
PipeInsertNodeTabletInsertionEvent) {
doTransferWrapper((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
@@ -182,9 +179,9 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
}
private void doTransferWrapper() throws IOException, WriteProcessException {
- for (final Pair<TEndPoint, PipeTabletEventBatch> nonEmptyBatch :
- tabletBatchBuilder.getAllNonEmptyBatches()) {
- doTransferWrapper(nonEmptyBatch);
+ for (final Pair<TEndPoint, PipeTabletEventBatch>
nonEmptyAndShouldEmitBatch :
+ tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) {
+ doTransferWrapper(nonEmptyAndShouldEmitBatch);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3004ace2e30..94cd826b430 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -218,13 +218,14 @@ public class CommonConfig {
private int pipeDataStructureTabletSizeInBytes = 2097152;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.2;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.2;
- private double pipeDataStructureWalMemoryProportion = 0.2;
- private double PipeDataStructureBatchMemoryProportion = 0.2;
+ private double pipeDataStructureWalMemoryProportion = 0.3;
+ private double PipeDataStructureBatchMemoryProportion = 0.1;
private double pipeTotalFloatingMemoryProportion = 0.2;
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
10_000;
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
- private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
+ private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
+
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index e13a40a2c74..479b93c2585 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -70,10 +70,12 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY =
"connector.batch.max-delay-ms";
public static final String SINK_IOTDB_BATCH_DELAY_MS_KEY =
"sink.batch.max-delay-ms";
+ public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 10;
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.size-bytes";
public static final String SINK_IOTDB_BATCH_SIZE_KEY =
"sink.batch.size-bytes";
- public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = 16
* MB;
+ public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = MB;
+ public static final long CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE
= 16 * MB;
public static final long CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE =
80 * MB;
public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";