This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2a8192a428d Pipe: Simplified the hybrid down-grading logic (#16033)
2a8192a428d is described below
commit 2a8192a428da3516e44078566f750cdf52f88ff5
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jul 25 17:03:03 2025 +0800
Pipe: Simplified the hybrid down-grading logic (#16033)
* simplify
* comp
---
.../PipeRealtimeDataRegionHybridExtractor.java | 231 ++++-----------------
.../apache/iotdb/commons/conf/CommonConfig.java | 49 -----
.../iotdb/commons/pipe/config/PipeConfig.java | 22 --
.../iotdb/commons/pipe/config/PipeDescriptor.java | 16 --
4 files changed, 40 insertions(+), 278 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index f1ddff90c67..516ab4d05ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -20,19 +20,16 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
-import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -41,7 +38,6 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Objects;
import java.util.Optional;
public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegionExtractor {
@@ -82,7 +78,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
private void extractTabletInsertion(final PipeRealtimeEvent event) {
TsFileEpoch.State state;
- if (canNotUseTabletAnyMore(event)) {
+ if (canNotUseTabletAnymore(event)) {
event.getTsFileEpoch().migrateState(this, curState ->
TsFileEpoch.State.USING_TSFILE);
PipeTsFileEpochProgressIndexKeeper.getInstance()
.registerProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
@@ -162,7 +158,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
return TsFileEpoch.State.USING_TSFILE;
case USING_BOTH:
default:
- return canNotUseTabletAnyMore(event)
+ return canNotUseTabletAnymore(event)
? TsFileEpoch.State.USING_TSFILE
: TsFileEpoch.State.USING_BOTH;
}
@@ -171,9 +167,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
switch (state) {
case USING_TABLET:
- // Though the data in tsfile event has been extracted in tablet mode,
we still need to
- // extract the tsfile event to help to determine
isTsFileEventCountInQueueExceededLimit().
- // The extracted tsfile event will be discarded in
supplyTsFileInsertion.
+ // If the state is USING_TABLET, discard the event
+ PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .eliminateProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
+ return;
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
@@ -202,17 +199,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
}
- private boolean canNotUseTabletAnyMore(final PipeRealtimeEvent event) {
- // In the following 4 cases, we should not extract this tablet event. all
the data
- // represented by the tablet event should be carried by the following
tsfile event:
- // the write operation will be throttled, so we should not extract any
more tablet events.
- // 1. The shallow memory usage of the insert node has reached the
dangerous threshold.
- // 2. Deprecated logics (unused by default)
- return mayInsertNodeMemoryReachDangerousThreshold(event)
- || canNotUseTabletAnymoreDeprecated(event);
- }
-
- private boolean mayInsertNodeMemoryReachDangerousThreshold(final
PipeRealtimeEvent event) {
+ // If the insertNode's memory has reached the dangerous threshold, we should
not extract any
+ // tablets.
+ private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) {
final long floatingMemoryUsageInByte =
PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName);
final long pipeCount = PipeDataNodeAgent.task().getPipeCount();
@@ -224,7 +213,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
final PipeDataNodeRemainingEventAndTimeOperator operator =
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory
usage of the insert node {} has reached the dangerous threshold of single pipe
{}, event count: {}",
+ "Pipe task {}@{} canNotUseTabletAnyMore for tsFile {}: The memory
usage of the insert node {} has reached the dangerous threshold of single pipe
{}, event count: {}",
pipeName,
dataRegionId,
event.getTsFileEpoch().getFilePath(),
@@ -237,83 +226,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
return mayInsertNodeMemoryReachDangerousThreshold;
}
- /**
- * These judgements are deprecated, and are only reserved for manual
operation and compatibility.
- */
- @Deprecated
- private boolean canNotUseTabletAnymoreDeprecated(final PipeRealtimeEvent
event) {
- // In the following 5 cases, we should not extract any more tablet events.
all the data
- // represented by the tablet events should be carried by the following
tsfile event:
- // 1. The number of historical tsFile events to transfer has exceeded the
limit.
- // 2. The number of realtime tsfile events to transfer has exceeded the
limit.
- // 3. The number of linked tsFiles has reached the dangerous threshold.
- return isHistoricalTsFileEventCountExceededLimit(event)
- || isRealtimeTsFileEventCountExceededLimit(event)
- || mayTsFileLinkedCountReachDangerousThreshold(event);
- }
-
- private boolean isHistoricalTsFileEventCountExceededLimit(final
PipeRealtimeEvent event) {
- if
(PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion()
- == Integer.MAX_VALUE) {
- return false;
- }
- final IoTDBDataRegionExtractor extractor =
-
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
- final boolean isHistoricalTsFileEventCountExceededLimit =
- Objects.nonNull(extractor)
- && extractor.getHistoricalTsFileInsertionEventCount()
- >=
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
- if (isHistoricalTsFileEventCountExceededLimit &&
event.mayExtractorUseTablets(this)) {
- LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(1) for tsFile {}:
The number of historical tsFile events {} has exceeded the limit {}",
- pipeName,
- dataRegionId,
- event.getTsFileEpoch().getFilePath(),
- extractor.getHistoricalTsFileInsertionEventCount(),
-
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion());
- }
- return isHistoricalTsFileEventCountExceededLimit;
- }
-
- private boolean isRealtimeTsFileEventCountExceededLimit(final
PipeRealtimeEvent event) {
- if
(PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion()
- == Integer.MAX_VALUE) {
- return false;
- }
- final boolean isRealtimeTsFileEventCountExceededLimit =
- pendingQueue.getTsFileInsertionEventCount()
- >=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
- if (isRealtimeTsFileEventCountExceededLimit &&
event.mayExtractorUseTablets(this)) {
- LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(2) for tsFile {}:
The number of realtime tsFile events {} has exceeded the limit {}",
- pipeName,
- dataRegionId,
- event.getTsFileEpoch().getFilePath(),
- pendingQueue.getTsFileInsertionEventCount(),
-
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
- }
- return isRealtimeTsFileEventCountExceededLimit;
- }
-
- private boolean mayTsFileLinkedCountReachDangerousThreshold(final
PipeRealtimeEvent event) {
- if (PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount() ==
Long.MAX_VALUE) {
- return false;
- }
- final boolean mayTsFileLinkedCountReachDangerousThreshold =
- PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(pipeName)
- >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
- if (mayTsFileLinkedCountReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
- LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(3) for tsFile {}:
The number of linked tsFiles {} has reached the dangerous threshold {}",
- pipeName,
- dataRegionId,
- event.getTsFileEpoch().getFilePath(),
- PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(pipeName),
- PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount());
- }
- return mayTsFileLinkedCountReachDangerousThreshold;
- }
-
@Override
public Event supply() {
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent)
pendingQueue.directPoll();
@@ -355,103 +267,40 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private Event supplyTabletInsertion(final PipeRealtimeEvent event) {
- event
- .getTsFileEpoch()
- .migrateState(
- this,
- state -> {
- switch (state) {
- case EMPTY:
- return canNotUseTabletAnyMore(event)
- ? TsFileEpoch.State.USING_TSFILE
- : TsFileEpoch.State.USING_TABLET;
- case USING_TSFILE:
- return canNotUseTabletAnyMore(event)
- ? TsFileEpoch.State.USING_TSFILE
- : TsFileEpoch.State.USING_BOTH;
- case USING_TABLET:
- case USING_BOTH:
- default:
- return state;
- }
- });
-
- final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
- if (state == TsFileEpoch.State.USING_TSFILE) {
- PipeTsFileEpochProgressIndexKeeper.getInstance()
- .registerProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
- }
-
- switch (state) {
- case USING_TSFILE:
- // If the state is USING_TSFILE, discard the event and poll the next
one.
- return null;
- case EMPTY:
- case USING_TABLET:
- case USING_BOTH:
- default:
- if
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
{
- return event.getEvent();
- } else {
- // If the event's reference count can not be increased, it means the
data represented by
- // this event is not reliable anymore. but the data represented by
this event
- // has been carried by the following tsfile event, so we can just
discard this event.
- event.getTsFileEpoch().migrateState(this, s ->
TsFileEpoch.State.USING_BOTH);
- LOGGER.warn(
- "Discard tablet event {} because it is not reliable anymore. "
- + "Change the state of TsFileEpoch to USING_TSFILE.",
- event);
- return null;
- }
+ if
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
{
+ return event.getEvent();
+ } else {
+ // If the event's reference count can not be increased, it means the
data represented by
+ // this event is not reliable anymore. but the data represented by this
event
+ // has been carried by the following tsfile event, so we can just
discard this event.
+ event.getTsFileEpoch().migrateState(this, s ->
TsFileEpoch.State.USING_BOTH);
+ LOGGER.warn(
+ "Discard tablet event {} because it is not reliable anymore. "
+ + "Change the state of TsFileEpoch to USING_BOTH.",
+ event);
+ return null;
}
}
private Event supplyTsFileInsertion(final PipeRealtimeEvent event) {
- event
- .getTsFileEpoch()
- .migrateState(
- this,
- state -> {
- // This would not happen, but just in case.
- if (state.equals(TsFileEpoch.State.EMPTY)) {
- LOGGER.error(
- String.format("EMPTY TsFileEpoch when supplying TsFile
Event %s", event));
- return TsFileEpoch.State.USING_TSFILE;
- }
- return state;
- });
-
- final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
- switch (state) {
- case USING_TABLET:
- // If the state is USING_TABLET, discard the event and poll the next
one.
- PipeTsFileEpochProgressIndexKeeper.getInstance()
- .eliminateProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
- return null;
- case EMPTY:
- case USING_TSFILE:
- case USING_BOTH:
- default:
- if
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
{
- return event.getEvent();
- } else {
- // If the event's reference count can not be increased, it means the
data represented by
- // this event is not reliable anymore. the data has been lost. we
simply discard this
- // event
- // and report the exception to PipeRuntimeAgent.
- final String errorMessage =
- String.format(
- "TsFile Event %s can not be supplied because "
- + "the reference count can not be increased, "
- + "the data represented by this event is lost",
- event.getEvent());
- LOGGER.error(errorMessage);
- PipeDataNodeAgent.runtime()
- .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
- PipeTsFileEpochProgressIndexKeeper.getInstance()
- .eliminateProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
- return null;
- }
+ if
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
{
+ return event.getEvent();
+ } else {
+ // If the event's reference count can not be increased, it means the
data represented by
+ // this event is not reliable anymore. the data has been lost. we simply
discard this
+ // event and report the exception to PipeRuntimeAgent.
+ final String errorMessage =
+ String.format(
+ "TsFile Event %s can not be supplied because "
+ + "the reference count can not be increased, "
+ + "the data represented by this event is lost",
+ event.getEvent());
+ LOGGER.error(errorMessage);
+ PipeDataNodeAgent.runtime()
+ .report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
+ PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .eliminateProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
+ return null;
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a2bd1ee0f28..f9697bd585f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -295,10 +295,6 @@ public class CommonConfig {
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
- private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE;
// Deprecated
- private int pipeMaxAllowedPendingTsFileEpochPerDataRegion =
Integer.MAX_VALUE; // Deprecated
- private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated
-
private double pipeMetaReportMaxLogNumPerRound = 0.1;
private int pipeMetaReportMaxLogIntervalRounds = 360;
private int pipeTsFilePinMaxLogNumPerRound = 10;
@@ -1499,51 +1495,6 @@ public class CommonConfig {
return pipeReceiverReqDecompressedMaxLengthInBytes;
}
- public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
- return pipeMaxAllowedHistoricalTsFilePerDataRegion;
- }
-
- public void setPipeMaxAllowedHistoricalTsFilePerDataRegion(
- int pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
- if (this.pipeMaxAllowedHistoricalTsFilePerDataRegion
- == pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
- return;
- }
- this.pipeMaxAllowedHistoricalTsFilePerDataRegion =
- pipeMaxAllowedPendingTsFileEpochPerDataRegion;
- logger.info(
- "pipeMaxAllowedHistoricalTsFilePerDataRegion is set to {}",
- pipeMaxAllowedPendingTsFileEpochPerDataRegion);
- }
-
- public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
- return pipeMaxAllowedPendingTsFileEpochPerDataRegion;
- }
-
- public void setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
- int pipeExtractorPendingQueueTsfileLimit) {
- if (this.pipeMaxAllowedPendingTsFileEpochPerDataRegion
- == pipeExtractorPendingQueueTsfileLimit) {
- return;
- }
- this.pipeMaxAllowedPendingTsFileEpochPerDataRegion =
pipeExtractorPendingQueueTsfileLimit;
- logger.info(
- "pipeMaxAllowedPendingTsFileEpochPerDataRegion is set to {}.",
- pipeMaxAllowedPendingTsFileEpochPerDataRegion);
- }
-
- public long getPipeMaxAllowedLinkedTsFileCount() {
- return pipeMaxAllowedLinkedTsFileCount;
- }
-
- public void setPipeMaxAllowedLinkedTsFileCount(long
pipeMaxAllowedLinkedTsFileCount) {
- if (this.pipeMaxAllowedLinkedTsFileCount ==
pipeMaxAllowedLinkedTsFileCount) {
- return;
- }
- this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount;
- logger.info("pipeMaxAllowedLinkedTsFileCount is set to {}",
pipeMaxAllowedLinkedTsFileCount);
- }
-
public double getPipeMetaReportMaxLogNumPerRound() {
return pipeMetaReportMaxLogNumPerRound;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index ca99d29a1d0..770e0e959b2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -343,20 +343,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
}
- /////////////////////////////// Hybrid Mode ///////////////////////////////
-
- public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
- return COMMON_CONFIG.getPipeMaxAllowedHistoricalTsFilePerDataRegion();
- }
-
- public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
- return COMMON_CONFIG.getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
- }
-
- public long getPipeMaxAllowedLinkedTsFileCount() {
- return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
- }
-
/////////////////////////////// Logger ///////////////////////////////
public double getPipeMetaReportMaxLogNumPerRound() {
@@ -588,14 +574,6 @@ public class PipeConfig {
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
getPipeReceiverReqDecompressedMaxLengthInBytes());
- LOGGER.info(
- "PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
- getPipeMaxAllowedHistoricalTsFilePerDataRegion());
- LOGGER.info(
- "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
- getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
- LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}",
getPipeMaxAllowedLinkedTsFileCount());
-
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());
LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}",
getPipeTsFilePinMaxLogNumPerRound());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 3e31864e3d8..7e27838618b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -433,22 +433,6 @@ public class PipeDescriptor {
"pipe_receiver_req_decompressed_max_length_in_bytes",
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
- config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
- Integer.parseInt(
- properties.getProperty(
- "pipe_max_allowed_historical_tsfile_per_data_region",
-
String.valueOf(config.getPipeMaxAllowedHistoricalTsFilePerDataRegion()))));
- config.setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
- Integer.parseInt(
- properties.getProperty(
- "pipe_max_allowed_pending_tsfile_epoch_per_data_region",
-
String.valueOf(config.getPipeMaxAllowedPendingTsFileEpochPerDataRegion()))));
- config.setPipeMaxAllowedLinkedTsFileCount(
- Long.parseLong(
- properties.getProperty(
- "pipe_max_allowed_linked_tsfile_count",
- String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
-
config.setPipeMemoryAllocateMaxRetries(
Integer.parseInt(
properties.getProperty(