This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new fca7fee95e1 [To dev/1.3] Pipe: Cleaned some questionable parameters &
Fixed unstable testPipeAfterDataRegionLeaderStop (#16555) (#16565)
fca7fee95e1 is described below
commit fca7fee95e1143ba45c2f23e3286bf2070792ed2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Oct 13 12:22:59 2025 +0800
[To dev/1.3] Pipe: Cleaned some questionable parameters & Fixed unstable
testPipeAfterDataRegionLeaderStop (#16555) (#16565)
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../metadata/AlignedTimeseriesException.java | 33 --------------
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 4 +-
.../scan/TsFileInsertionScanDataContainer.java | 3 +-
.../visitor/PipeStatementTSStatusVisitor.java | 5 ---
.../db/pipe/resource/memory/PipeMemoryManager.java | 6 ---
.../apache/iotdb/commons/conf/CommonConfig.java | 50 +++-------------------
.../iotdb/commons/pipe/config/PipeConfig.java | 49 ++++++++-------------
.../iotdb/commons/pipe/config/PipeDescriptor.java | 14 +-----
9 files changed, 32 insertions(+), 133 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index a4b7ad9bd3a..d28510c2b7f 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -93,6 +93,7 @@ public enum TSStatusCode {
WRITE_PROCESS_REJECT(606),
OUT_OF_TTL(607),
COMPACTION_ERROR(608),
+ @Deprecated
ALIGNED_TIMESERIES_ERROR(609),
WAL_ERROR(610),
DISK_SPACE_INSUFFICIENT(611),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
deleted file mode 100644
index f11c986afdc..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.exception.metadata;
-
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-public class AlignedTimeseriesException extends MetadataException {
-
- public AlignedTimeseriesException(String message, String path) {
- super(
- String.format("%s (Path: %s)", message, path),
- TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(),
- true);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 4c2a6ef1a6a..f916ac067c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -771,12 +771,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final long allocatedMemorySizeInBytes =
this.getAllFloatingMemoryUsageInByte();
final long remainingMemory =
PipeMemoryManager.getTotalFloatingMemorySizeInBytes() -
allocatedMemorySizeInBytes;
- if (remainingMemory <
PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
+ if (remainingMemory <
PipeConfig.getInstance().getPipeInsertNodeQueueMemory()) {
final String message =
String.format(
"%s Need Floating memory: %d bytes, free Floating memory: %d
bytes",
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
- PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
+ PipeConfig.getInstance().getPipeInsertNodeQueueMemory(),
remainingMemory);
LOGGER.warn(message);
throw new PipeException(message);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 4223a5155f2..c5ef8423297 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -113,8 +113,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
- .forceAllocateForTabletWithRetry(
-
PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
try {
tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 0874857c3f7..6eae20e506f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -132,7 +132,6 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() ==
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
- || context.getCode() ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()
|| context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
@@ -167,10 +166,6 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
&& status.getCode() !=
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
- if (status.getCode() ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- }
return visitStatement(statement, context);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 2b53e85d049..87be4d5fb62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -640,12 +640,6 @@ public class PipeMemoryManager {
return usedMemorySizeInBytesOfTsFiles;
}
- public long getAllocatedMemorySizeInBytesOfBatch() {
- return (long)
- (PipeConfig.getInstance().getPipeDataStructureBatchMemoryProportion()
- * getTotalNonFloatingMemorySizeInBytes());
- }
-
public long getFreeMemorySizeInBytes() {
return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 24d72a5e415..88e6985070f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -215,7 +215,6 @@ public class CommonConfig {
private int pipeDataStructureTabletSizeInBytes = 2097152;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.3;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.3;
- private double PipeDataStructureBatchMemoryProportion = 0.2;
private volatile double pipeTotalFloatingMemoryProportion = 0.5;
// Check if memory check is enabled for Pipe
@@ -251,7 +250,6 @@ public class CommonConfig {
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
- private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
private long pipeMaxWaitFinishTime = 10 * 1000;
@@ -316,7 +314,7 @@ public class CommonConfig {
private volatile long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; //
3Min
private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
private volatile float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
- private volatile long pipeMaxAlignedSeriesChunkSizeInOneBatch = (long) 16 *
1024 * 1024; // 16MB;
+ private volatile long pipeMaxReaderChunkSize = (long) 16 * 1024 * 1024; //
16MB;
private volatile long pipeListeningQueueTransferSnapshotThreshold = 1000;
private volatile int pipeSnapshotExecutionMaxBatchSize = 1000;
private volatile long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
@@ -834,21 +832,6 @@ public class CommonConfig {
pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
}
- public double getPipeDataStructureBatchMemoryProportion() {
- return PipeDataStructureBatchMemoryProportion;
- }
-
- public void setPipeDataStructureBatchMemoryProportion(
- double PipeDataStructureBatchMemoryProportion) {
- if (this.PipeDataStructureBatchMemoryProportion ==
PipeDataStructureBatchMemoryProportion) {
- return;
- }
- this.PipeDataStructureBatchMemoryProportion =
PipeDataStructureBatchMemoryProportion;
- logger.info(
- "PipeDataStructureBatchMemoryProportion is set to {}.",
- PipeDataStructureBatchMemoryProportion);
- }
-
public boolean isPipeEnableMemoryChecked() {
return isPipeEnableMemoryCheck;
}
@@ -1396,22 +1379,6 @@ public class CommonConfig {
pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds);
}
- public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
- return pipeSubtaskExecutorForcedRestartIntervalMs;
- }
-
- public void setPipeSubtaskExecutorForcedRestartIntervalMs(
- long pipeSubtaskExecutorForcedRestartIntervalMs) {
- if (this.pipeSubtaskExecutorForcedRestartIntervalMs
- == pipeSubtaskExecutorForcedRestartIntervalMs) {
- return;
- }
- this.pipeSubtaskExecutorForcedRestartIntervalMs =
pipeSubtaskExecutorForcedRestartIntervalMs;
- logger.info(
- "pipeSubtaskExecutorForcedRestartIntervalMs is set to {}",
- pipeSubtaskExecutorForcedRestartIntervalMs);
- }
-
public long getPipeMaxWaitFinishTime() {
return pipeMaxWaitFinishTime;
}
@@ -1730,19 +1697,16 @@ public class CommonConfig {
"pipeLeaderCacheMemoryUsagePercentage is set to {}",
pipeLeaderCacheMemoryUsagePercentage);
}
- public long getPipeMaxAlignedSeriesChunkSizeInOneBatch() {
- return pipeMaxAlignedSeriesChunkSizeInOneBatch;
+ public long getPipeMaxReaderChunkSize() {
+ return pipeMaxReaderChunkSize;
}
- public void setPipeMaxAlignedSeriesChunkSizeInOneBatch(
- long pipeMaxAlignedSeriesChunkSizeInOneBatch) {
- if (this.pipeMaxAlignedSeriesChunkSizeInOneBatch ==
pipeMaxAlignedSeriesChunkSizeInOneBatch) {
+ public void setPipeMaxReaderChunkSize(long pipeMaxReaderChunkSize) {
+ if (this.pipeMaxReaderChunkSize == pipeMaxReaderChunkSize) {
return;
}
- this.pipeMaxAlignedSeriesChunkSizeInOneBatch =
pipeMaxAlignedSeriesChunkSizeInOneBatch;
- logger.info(
- "pipeMaxAlignedSeriesChunkSizeInOneBatch is set to {}",
- pipeMaxAlignedSeriesChunkSizeInOneBatch);
+ this.pipeMaxReaderChunkSize = pipeMaxReaderChunkSize;
+ logger.info("pipeMaxAlignedSeriesChunkSizeInOneBatch is set to {}",
pipeMaxReaderChunkSize);
}
public long getPipeListeningQueueTransferSnapshotThreshold() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ec21b8fe72e..2578008f68e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -69,15 +69,13 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
}
- public double getPipeDataStructureBatchMemoryProportion() {
- return COMMON_CONFIG.getPipeDataStructureBatchMemoryProportion();
- }
+ /////////////////////////////// Estimation ///////////////////////////////
public boolean isPipeEnableMemoryCheck() {
return COMMON_CONFIG.isPipeEnableMemoryChecked();
}
- public long PipeInsertNodeQueueMemory() {
+ public long getPipeInsertNodeQueueMemory() {
return COMMON_CONFIG.getPipeInsertNodeQueueMemory();
}
@@ -141,10 +139,6 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
}
- public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
- return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
- }
-
public long getPipeMaxWaitFinishTime() {
return COMMON_CONFIG.getPipeMaxWaitFinishTime();
}
@@ -233,20 +227,8 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
}
- public float getPipeLeaderCacheMemoryUsagePercentage() {
- return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
- }
-
- public long getPipeMaxAlignedSeriesChunkSizeInOneBatch() {
- return COMMON_CONFIG.getPipeMaxAlignedSeriesChunkSizeInOneBatch();
- }
-
- public long getPipeListeningQueueTransferSnapshotThreshold() {
- return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
- }
-
- public int getPipeSnapshotExecutionMaxBatchSize() {
- return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+ public long getPipeMaxReaderChunkSize() {
+ return COMMON_CONFIG.getPipeMaxReaderChunkSize();
}
public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
@@ -407,6 +389,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeCheckMemoryEnoughIntervalMs();
}
+ public float getPipeLeaderCacheMemoryUsagePercentage() {
+ return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
+ }
+
/////////////////////////////// TwoStage ///////////////////////////////
public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
@@ -421,6 +407,16 @@ public class PipeConfig {
return COMMON_CONFIG.getTwoStageAggregateSenderEndPointsCacheInMs();
}
+ /////////////////////////////// Meta ///////////////////////////////
+
+ public long getPipeListeningQueueTransferSnapshotThreshold() {
+ return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
+ }
+
+ public int getPipeSnapshotExecutionMaxBatchSize() {
+ return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+ }
+
/////////////////////////////// Ref ///////////////////////////////
public boolean getPipeEventReferenceTrackingEnabled() {
@@ -456,8 +452,6 @@ public class PipeConfig {
getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
LOGGER.info("PipeTotalFloatingMemoryProportion: {}",
getPipeTotalFloatingMemoryProportion());
- LOGGER.info(
- "PipeDataStructureBatchMemoryProportion: {}",
getPipeDataStructureBatchMemoryProportion());
LOGGER.info("IsPipeEnableMemoryCheck: {}", isPipeEnableMemoryCheck());
LOGGER.info("PipeTsFileParserMemory: {}", getTsFileParserMemory());
LOGGER.info("SinkBatchMemoryInsertNode: {}",
getSinkBatchMemoryInsertNode());
@@ -485,9 +479,6 @@ public class PipeConfig {
LOGGER.info(
"PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
- LOGGER.info(
- "PipeSubtaskExecutorForcedRestartIntervalMs: {}",
- getPipeSubtaskExecutorForcedRestartIntervalMs());
LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
LOGGER.info(
@@ -510,9 +501,7 @@ public class PipeConfig {
isPipeConnectorRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
- LOGGER.info(
- "PipeMaxAlignedSeriesChunkSizeInOneBatch: {}",
- getPipeMaxAlignedSeriesChunkSizeInOneBatch());
+ LOGGER.info("PipeReaderChunkSize: {}", getPipeMaxReaderChunkSize());
LOGGER.info(
"PipeListeningQueueTransferSnapshotThreshold: {}",
getPipeListeningQueueTransferSnapshotThreshold());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 1beb4c67e25..27e5fddf14d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -210,11 +210,6 @@ public class PipeDescriptor {
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
String.valueOf(
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
- config.setPipeDataStructureBatchMemoryProportion(
- Double.parseDouble(
- properties.getProperty(
- "pipe_data_structure_batch_memory_proportion",
-
String.valueOf(config.getPipeDataStructureBatchMemoryProportion()))));
config.setPipeTotalFloatingMemoryProportion(
Double.parseDouble(
properties.getProperty(
@@ -295,11 +290,6 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_subtask_executor_pending_queue_max_blocking_time_ms",
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
- config.setPipeSubtaskExecutorForcedRestartIntervalMs(
- Long.parseLong(
- properties.getProperty(
- "pipe_subtask_executor_forced_restart_interval_ms",
-
String.valueOf(config.getPipeSubtaskExecutorForcedRestartIntervalMs()))));
config.setPipeExtractorAssignerDisruptorRingBufferSize(
Integer.parseInt(
@@ -483,11 +473,11 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_leader_cache_memory_usage_percentage",
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
- config.setPipeMaxAlignedSeriesChunkSizeInOneBatch(
+ config.setPipeMaxReaderChunkSize(
Long.parseLong(
properties.getProperty(
"pipe_max_aligned_series_chunk_size_in_one_batch",
-
String.valueOf(config.getPipeMaxAlignedSeriesChunkSizeInOneBatch()))));
+ String.valueOf(config.getPipeMaxReaderChunkSize()))));
config.setPipeTransferTsFileSync(
Boolean.parseBoolean(