This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 474e74790da Pipe: Fix TSFile transfer blocking InsertNode sending
(#15666)
474e74790da is described below
commit 474e74790daf2b5776aeebad726b262b18181509
Author: Zhenyu Luo <[email protected]>
AuthorDate: Sat Jun 7 13:13:21 2025 +0800
Pipe: Fix TSFile transfer blocking InsertNode sending (#15666)
* Pipe: Fix TSFile transfer blocking InsertNode sending
* fix
* fix
* fix
* fix
* fix
* fix
* fix
---
.../connector/PipeConnectorSubtaskManager.java | 10 +++
.../PipeRealtimePriorityBlockingQueue.java | 22 +++++--
.../client/IoTDBDataNodeAsyncClientManager.java | 8 ++-
.../async/IoTDBDataRegionAsyncConnector.java | 76 +++++++++++++++++++---
.../apache/iotdb/commons/conf/CommonConfig.java | 30 ++++++++-
.../iotdb/commons/pipe/config/PipeConfig.java | 9 +++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 10 +++
7 files changed, 148 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
index a79ff6e4b58..3a2cd2639f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -47,6 +48,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
public class PipeConnectorSubtaskManager {
@@ -112,12 +114,17 @@ public class PipeConnectorSubtaskManager {
final List<PipeConnectorSubtaskLifeCycle>
pipeConnectorSubtaskLifeCycleList =
new ArrayList<>(connectorNum);
+ AtomicInteger counter = new AtomicInteger(0);
// Shared pending queue for all subtasks
final UnboundedBlockingPendingQueue<Event> pendingQueue =
realTimeFirst
? new PipeRealtimePriorityBlockingQueue()
: new UnboundedBlockingPendingQueue<>(new
PipeDataRegionEventCounter());
+ if (realTimeFirst) {
+ ((PipeRealtimePriorityBlockingQueue)
pendingQueue).setOfferTsFileCounter(counter);
+ }
+
for (int connectorIndex = 0; connectorIndex < connectorNum;
connectorIndex++) {
final PipeConnector pipeConnector =
isDataRegionConnector
@@ -128,6 +135,9 @@ public class PipeConnectorSubtaskManager {
// 1. Construct, validate and customize PipeConnector, and then
handshake (create
// connection) with the target
try {
+ if (pipeConnector instanceof IoTDBDataRegionAsyncConnector) {
+ ((IoTDBDataRegionAsyncConnector)
pipeConnector).setTransferTsFileCounter(counter);
+ }
pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
pipeConnector.customize(
pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index c8b050a5cfe..a4f05447eae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -47,6 +47,9 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
private final AtomicLong pollHistoricalTsFileCounter = new AtomicLong(0);
+ // Need to ensure that NPE does not occur
+ private AtomicInteger offerTsFileCounter = new AtomicInteger(0);
+
public PipeRealtimePriorityBlockingQueue() {
super(new PipeDataRegionEventCounter());
}
@@ -85,18 +88,22 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
Event event = null;
final int pollHistoricalTsFileThreshold =
PIPE_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
+ final int realTimeQueueMaxWaitingTsFileSize =
+ PIPE_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
- if (pollTsFileCounter.get() >=
PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()) {
+ if (pollTsFileCounter.get() >=
PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()
+ && offerTsFileCounter.get() < realTimeQueueMaxWaitingTsFileSize) {
event =
pollHistoricalTsFileCounter.incrementAndGet() %
pollHistoricalTsFileThreshold == 0
? tsfileInsertEventDeque.pollFirst()
: tsfileInsertEventDeque.pollLast();
pollTsFileCounter.set(0);
}
+
if (Objects.isNull(event)) {
// Sequentially poll the first offered non-TsFileInsertionEvent
event = super.directPoll();
- if (Objects.isNull(event)) {
+ if (Objects.isNull(event) && offerTsFileCounter.get() <
realTimeQueueMaxWaitingTsFileSize) {
event =
pollHistoricalTsFileCounter.incrementAndGet() %
pollHistoricalTsFileThreshold == 0
? tsfileInsertEventDeque.pollFirst()
@@ -126,8 +133,11 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
Event event = null;
final int pollHistoricalTsFileThreshold =
PIPE_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
+ final int realTimeQueueMaxWaitingTsFileSize =
+ PIPE_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
- if (pollTsFileCounter.get() >=
PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()) {
+ if (pollTsFileCounter.get() >=
PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()
+ && offerTsFileCounter.get() < realTimeQueueMaxWaitingTsFileSize) {
event =
pollHistoricalTsFileCounter.incrementAndGet() %
pollHistoricalTsFileThreshold == 0
? tsfileInsertEventDeque.pollFirst()
@@ -149,7 +159,7 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
}
// If no event is available, block until an event is available
- if (Objects.isNull(event)) {
+ if (Objects.isNull(event) && offerTsFileCounter.get() <
realTimeQueueMaxWaitingTsFileSize) {
event = super.waitedPoll();
if (Objects.isNull(event)) {
event =
@@ -233,4 +243,8 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
public int getTsFileInsertionEventCount() {
return tsfileInsertEventDeque.size();
}
+
+ public void setOfferTsFileCounter(AtomicInteger offerTsFileCounter) {
+ this.offerTsFileCounter = offerTsFileCounter;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index aa5bdd5112f..a1531343b04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -87,7 +87,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy,
final boolean validateTsFile,
- final boolean shouldMarkAsPipeRequest) {
+ final boolean shouldMarkAsPipeRequest,
+ final boolean isTSFileUsed) {
super(
endPoints,
useLeaderCache,
@@ -102,12 +103,13 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
receiverAttributes =
String.format(
- "%s-%s-%s-%s-%s",
+ "%s-%s-%s-%s-%s-%s",
Base64.getEncoder().encodeToString((username + ":" +
password).getBytes()),
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
validateTsFile,
- shouldMarkAsPipeRequest);
+ shouldMarkAsPipeRequest,
+ isTSFileUsed);
synchronized (IoTDBDataNodeAsyncClientManager.class) {
if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
{
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index dd7da2cfc5b..7ea783f5317 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -74,7 +74,10 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -97,6 +100,10 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
"Exception occurred while sending to receiver %s:%s.";
+ private static final boolean isSplitTSFileBatchModeEnabled = true;
+ private static final ExecutorService executor =
+
Executors.newFixedThreadPool(PipeConfig.getInstance().getPipeAsyncConnectorMaxClientNumber());
+
private final IoTDBDataRegionSyncConnector syncConnector = new
IoTDBDataRegionSyncConnector();
private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
@@ -104,6 +111,10 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
new PipeDataRegionEventCounter();
private IoTDBDataNodeAsyncClientManager clientManager;
+ private IoTDBDataNodeAsyncClientManager transferTsFileClientManager;
+
+ // It is necessary to ensure that other classes that inherit Async Connector
will not have NPE
+ public AtomicInteger transferTsFileCounter = new AtomicInteger(0);
private PipeTransferBatchReqBuilder tabletBatchBuilder;
@@ -146,7 +157,23 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
loadTsFileValidation,
- shouldMarkAsPipeRequest);
+ shouldMarkAsPipeRequest,
+ false);
+
+ transferTsFileClientManager =
+ new IoTDBDataNodeAsyncClientManager(
+ nodeUrls,
+ parameters.getBooleanOrDefault(
+ Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
+ CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
+ loadBalanceStrategy,
+ username,
+ password,
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy,
+ loadTsFileValidation,
+ shouldMarkAsPipeRequest,
+ isSplitTSFileBatchModeEnabled);
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
@@ -390,14 +417,37 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
}
- private void transfer(final PipeTransferTsFileHandler
pipeTransferTsFileHandler) {
- AsyncPipeDataTransferServiceClient client = null;
- try {
- client = clientManager.borrowClient();
- pipeTransferTsFileHandler.transfer(clientManager, client);
- } catch (final Exception ex) {
- logOnClientException(client, ex);
- pipeTransferTsFileHandler.onError(ex);
+ private void transfer(final PipeTransferTsFileHandler
pipeTransferTsFileHandler)
+ throws Exception {
+ transferTsFileCounter.incrementAndGet();
+ CompletableFuture<Void> completableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ AsyncPipeDataTransferServiceClient client = null;
+ try {
+ client = transferTsFileClientManager.borrowClient();
+ pipeTransferTsFileHandler.transfer(clientManager, client);
+ } catch (final Exception ex) {
+ logOnClientException(client, ex);
+ pipeTransferTsFileHandler.onError(ex);
+ } finally {
+ transferTsFileCounter.decrementAndGet();
+ }
+ return null;
+ },
+ executor);
+
+ if (PipeConfig.getInstance().isTransferTsFileSync()) {
+ try {
+ completableFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Transfer tsfile event asynchronously was interrupted.",
e);
+ throw new PipeException("Transfer tsfile event asynchronously was
interrupted.", e);
+ } catch (Exception e) {
+ LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
+ throw e;
+ }
}
}
@@ -682,6 +732,10 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
if (clientManager != null) {
clientManager.close();
}
+
+ if (transferTsFileClientManager != null) {
+ transferTsFileClientManager.close();
+ }
} catch (final Exception e) {
LOGGER.warn("Failed to close client manager.", e);
}
@@ -734,4 +788,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
public boolean hasPendingHandlers() {
return !pendingHandlers.isEmpty();
}
+
+ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
+ this.transferTsFileCounter = transferTsFileCounter;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 94cd826b430..6f6d8723eb3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -207,6 +207,7 @@ public class CommonConfig {
private int pipeRealTimeQueuePollTsFileThreshold = 10;
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 3;
+ private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
@@ -235,7 +236,7 @@ public class CommonConfig {
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
- private int pipeConnectorReadFileBufferSize = 8388608;
+ private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
@@ -310,6 +311,7 @@ public class CommonConfig {
private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio =
0.1d;
private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold
= 0.8d;
+ private boolean pipeTransferTsFileSync = false;
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8
minutes
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
@@ -1335,6 +1337,20 @@ public class CommonConfig {
pipeRealTimeQueuePollHistoricalTsFileThreshold);
}
+ public int getPipeRealTimeQueueMaxWaitingTsFileSize() {
+ return pipeRealTimeQueueMaxWaitingTsFileSize;
+ }
+
+ public void setPipeRealTimeQueueMaxWaitingTsFileSize(int
pipeRealTimeQueueMaxWaitingTsFileSize) {
+ if (this.pipeRealTimeQueueMaxWaitingTsFileSize ==
pipeRealTimeQueueMaxWaitingTsFileSize) {
+ return;
+ }
+ this.pipeRealTimeQueueMaxWaitingTsFileSize =
pipeRealTimeQueueMaxWaitingTsFileSize;
+ logger.info(
+ "pipeRealTimeQueueMaxWaitingTsFileSize is set to {}.",
+ pipeRealTimeQueueMaxWaitingTsFileSize);
+ }
+
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
return;
@@ -1951,6 +1967,18 @@ public class CommonConfig {
pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold);
}
+ public boolean getPipeTransferTsFileSync() {
+ return pipeTransferTsFileSync;
+ }
+
+ public void setPipeTransferTsFileSync(boolean pipeTransferTsFileSync) {
+ if (this.pipeTransferTsFileSync == pipeTransferTsFileSync) {
+ return;
+ }
+ this.pipeTransferTsFileSync = pipeTransferTsFileSync;
+ logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync);
+ }
+
public double getPipeAllSinksRateLimitBytesPerSecond() {
return pipeAllSinksRateLimitBytesPerSecond;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index b099e713dcc..89042d4609e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -101,6 +101,10 @@ public class PipeConfig {
return
Math.max(COMMON_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold(), 1);
}
+ public int getPipeRealTimeQueueMaxWaitingTsFileSize() {
+ return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
+ }
+
/////////////////////////////// Subtask Executor
///////////////////////////////
public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -255,6 +259,10 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold();
}
+ public boolean isTransferTsFileSync() {
+ return COMMON_CONFIG.getPipeTransferTsFileSync();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -520,6 +528,7 @@ public class PipeConfig {
LOGGER.info(
"PipePipeRemainingInsertEventCountAverage: {}",
getPipeRemainingInsertNodeCountAverage());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}",
getPipeTsFileScanParsingThreshold());
+ LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync());
LOGGER.info("PipeDynamicMemoryHistoryWeight: {}",
getPipeDynamicMemoryHistoryWeight());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index cc9850e90d3..6303c8ad571 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -264,6 +264,11 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_realtime_queue_poll_historical_tsfile_threshold",
String.valueOf(config.getPipeRealTimeQueuePollHistoricalTsFileThreshold()))));
+ config.setPipeRealTimeQueueMaxWaitingTsFileSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_realTime_queue_max_waiting_tsFile_size",
+
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
properties.getProperty(
@@ -526,6 +531,11 @@ public class PipeDescriptor {
"pipe_max_aligned_series_num_in_one_batch",
String.valueOf(config.getPipeMaxAlignedSeriesNumInOneBatch()))));
+ config.setPipeTransferTsFileSync(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_transfer_tsfile_sync",
String.valueOf(config.getPipeTransferTsFileSync()))));
+
config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
Long.parseLong(
properties.getProperty(