This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c0a642892f7 Pipe: Added memory control for aligned chunk reader in
TsFileInsertionEventScanParser & TsFileInsertionEventTableParserTabletIterator
(#15222)
c0a642892f7 is described below
commit c0a642892f71e0dd9d48a6926f57ea0d182e80de
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 1 19:01:40 2025 +0800
Pipe: Added memory control for aligned chunk reader in
TsFileInsertionEventScanParser & TsFileInsertionEventTableParserTabletIterator
(#15222)
Co-authored-by: luoluoyuyu <[email protected]>
---
.../scan/TsFileInsertionEventScanParser.java | 10 +-
...ileInsertionEventTableParserTabletIterator.java | 178 +++++++++++----------
.../apache/iotdb/commons/conf/CommonConfig.java | 9 ++
.../iotdb/commons/conf/CommonDescriptor.java | 5 +
.../iotdb/commons/pipe/config/PipeConfig.java | 5 +
5 files changed, 120 insertions(+), 87 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 1559d472e04..83dbdf39d05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -63,6 +64,8 @@ import java.util.Objects;
public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser
{
+ private static final int PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH =
+ PipeConfig.getInstance().getPipeMaxAlignedSeriesNumInOneBatch();
private final long startTime;
private final long endTime;
private final Filter filter;
@@ -451,13 +454,16 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
chunkHeader.getMeasurementID(),
(measurement, index) -> Objects.nonNull(index) ? index + 1
: 0);
- // Emit when encountered non-sequential value chunk
+ // Emit when encountered non-sequential value chunk, or the chunk
list size exceeds
+ // certain value to avoid OOM
// Do not record or end current value chunks when there are empty
chunks
if (chunkHeader.getDataSize() == 0) {
break;
}
boolean needReturn = false;
- if (lastIndex >= 0 && valueIndex != lastIndex) {
+ if (lastIndex >= 0
+ && (valueIndex != lastIndex
+ || valueChunkList.size() >=
PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH)) {
needReturn = recordAlignedChunk(valueChunkList, marker);
}
lastIndex = valueIndex;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 32140edee8e..c080e1dc152 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
@@ -48,17 +49,16 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class TsFileInsertionEventTableParserTabletIterator implements
Iterator<Tablet> {
-
+ private static final int PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH =
+ PipeConfig.getInstance().getPipeMaxAlignedSeriesNumInOneBatch();
private final long startTime;
private final long endTime;
@@ -80,10 +80,13 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
private BatchData batchData;
// Record the metadata information of the currently read Table
- private Set<String> measurementNames;
private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
- private Iterator<List<IChunkMetadata>> chunkMetadataList;
+ private Iterator<AbstractAlignedChunkMetadata> chunkMetadataList;
private Iterator<IChunkMetadata> chunkMetadata;
+ private AbstractAlignedChunkMetadata currentChunkMetadata;
+ private Chunk timeChunk;
+ private long timeChunkSize;
+ private int offset;
// Record the information of the currently read Table
private String tableName;
@@ -91,13 +94,12 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
private List<Tablet.ColumnCategory> columnTypes;
private List<String> measurementList;
private List<TSDataType> dataTypeList;
-
- private List<Pair<String, Integer>> measurementColumIndexList;
- private List<Integer> measurementIdIndexList;
+ private int deviceIdSize;
// Used to record whether the same Tablet is generated when parsing starts.
Different table
// information cannot be placed in the same Tablet.
private boolean isSameTableName;
+ private boolean isSameDeviceID;
public TsFileInsertionEventTableParserTabletIterator(
final TsFileSequenceReader tsFileSequenceReader,
@@ -161,38 +163,35 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
break;
}
case INIT_CHUNK_READER:
- if (chunkMetadata != null && chunkMetadata.hasNext()) {
- initChunkReader((AbstractAlignedChunkMetadata)
chunkMetadata.next());
+ if (currentChunkMetadata != null
+ || (chunkMetadataList != null && chunkMetadataList.hasNext()))
{
+ if (currentChunkMetadata == null) {
+ currentChunkMetadata = chunkMetadataList.next();
+ timeChunk = null;
+ offset = 0;
+ }
+ initChunkReader(currentChunkMetadata);
state = State.INIT_DATA;
break;
}
- case INIT_CHUNK_META:
- if (chunkMetadataList != null && chunkMetadataList.hasNext()) {
- chunkMetadata = chunkMetadataList.next().iterator();
- state = State.INIT_CHUNK_READER;
- break;
- }
- case INIT_CHUNK_META_LIST:
+ case INIT_CHUNK_METADATA:
if (deviceMetaIterator != null && deviceMetaIterator.hasNext()) {
final Pair<IDeviceID, MetadataIndexNode> pair =
deviceMetaIterator.next();
long size = 0;
- List<List<IChunkMetadata>> iChunkMetadataList =
- reader.getIChunkMetadataList(pair.left, measurementNames,
pair.right);
+ List<AbstractAlignedChunkMetadata> iChunkMetadataList =
+ reader.getAlignedChunkMetadata(pair.left, true);
- Iterator<List<IChunkMetadata>> chunkMetadataIterator =
iChunkMetadataList.iterator();
+ Iterator<AbstractAlignedChunkMetadata> chunkMetadataIterator =
+ iChunkMetadataList.iterator();
while (chunkMetadataIterator.hasNext()) {
- final List<IChunkMetadata> chunkMetadata =
chunkMetadataIterator.next();
- if (chunkMetadata == null
- || chunkMetadata.isEmpty()
- || !(chunkMetadata.get(0) instanceof
AbstractAlignedChunkMetadata)) {
+ final AbstractAlignedChunkMetadata alignedChunkMetadata =
+ chunkMetadataIterator.next();
+ if (alignedChunkMetadata == null) {
throw new PipeException(
"Table model tsfile parsing does not support this type
of ChunkMeta");
}
- final AbstractAlignedChunkMetadata alignedChunkMetadata =
- (AbstractAlignedChunkMetadata) chunkMetadata.get(0);
-
// Reduce the number of times Chunks are read
if (alignedChunkMetadata.getEndTime() < startTime
|| alignedChunkMetadata.getStartTime() > endTime) {
@@ -209,7 +208,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
deviceID = pair.getLeft();
chunkMetadataList = iChunkMetadataList.iterator();
- state = State.INIT_CHUNK_META;
+ state = State.INIT_CHUNK_READER;
break;
}
case INIT_DEVICE_META:
@@ -224,34 +223,26 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
deviceMetaIterator = metadataQuerier.deviceIterator(tableRoot,
null);
final int columnSchemaSize =
tableSchema.getColumnSchemas().size();
- dataTypeList = new ArrayList<>(columnSchemaSize);
- columnTypes = new ArrayList<>(columnSchemaSize);
- measurementList = new ArrayList<>(columnSchemaSize);
- measurementNames = new HashSet<>();
-
- measurementColumIndexList = new ArrayList<>(columnSchemaSize);
- measurementIdIndexList = new ArrayList<>(columnSchemaSize);
+ dataTypeList = new
ArrayList<>(PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
+ columnTypes = new
ArrayList<>(PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
+ measurementList = new
ArrayList<>(PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
- for (int i = 0, j = 0; i < columnSchemaSize; i++) {
+ for (int i = 0; i < columnSchemaSize; i++) {
final IMeasurementSchema schema =
tableSchema.getColumnSchemas().get(i);
final Tablet.ColumnCategory columnCategory =
tableSchema.getColumnTypes().get(i);
if (schema != null
&& schema.getMeasurementName() != null
&& !schema.getMeasurementName().isEmpty()) {
final String measurementName = schema.getMeasurementName();
- columnTypes.add(columnCategory);
- measurementList.add(measurementName);
- dataTypeList.add(schema.getType());
- if (!Tablet.ColumnCategory.TAG.equals(columnCategory)) {
- measurementNames.add(measurementName);
- measurementColumIndexList.add(new Pair<>(measurementName,
j));
- } else {
- measurementIdIndexList.add(j);
+ if (Tablet.ColumnCategory.TAG.equals(columnCategory)) {
+ columnTypes.add(Tablet.ColumnCategory.TAG);
+ measurementList.add(measurementName);
+ dataTypeList.add(schema.getType());
}
- j++;
}
}
- state = State.INIT_CHUNK_META_LIST;
+ deviceIdSize = dataTypeList.size();
+ state = State.INIT_CHUNK_METADATA;
break;
}
return false;
@@ -266,8 +257,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
CHECK_DATA,
INIT_DATA,
INIT_CHUNK_READER,
- INIT_CHUNK_META,
- INIT_CHUNK_META_LIST,
+ INIT_CHUNK_METADATA,
INIT_DEVICE_META
}
@@ -280,12 +270,13 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
Tablet tablet = null;
boolean isFirstRow = true;
- while (hasNext() && (isFirstRow || isSameTableName)) {
+ while (hasNext() && (isFirstRow || (isSameTableName && isSameDeviceID))) {
if (batchData.currentTime() >= startTime && batchData.currentTime() <=
endTime) {
if (isFirstRow) {
// Record the name of the table when the tablet is started.
Different table data cannot be
// in the same tablet.
isSameTableName = true;
+ isSameDeviceID = true;
// Calculate row count and memory size of the tablet based on the
first row
final Pair<Integer, Integer> rowCountAndMemorySize =
@@ -319,7 +310,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
}
if (isFirstRow) {
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunkMeta,
0);
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet,
0);
tablet = new Tablet(tableName, measurementList, dataTypeList,
columnTypes, 0);
tablet.initBitMaps();
}
@@ -327,37 +318,56 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
return tablet;
}
- private void initChunkReader(AbstractAlignedChunkMetadata
alignedChunkMetadata)
+ private void initChunkReader(final AbstractAlignedChunkMetadata
alignedChunkMetadata)
throws IOException {
- final Chunk timeChunk =
- reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
- long size = PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
size);
-
- final List<Chunk> valueChunkList =
- new
ArrayList<>(alignedChunkMetadata.getValueChunkMetadataList().size());
- final Map<String, ChunkMetadata> metadataMap = new HashMap<>();
- for (IChunkMetadata metadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
- if (metadata != null) {
- metadataMap.put(metadata.getMeasurementUid(), (ChunkMetadata)
metadata);
- }
+ if (Objects.isNull(timeChunk)) {
+ timeChunk = reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ timeChunkSize =
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
timeChunkSize);
+ }
+ timeChunk.getData().rewind();
+ long size = timeChunkSize;
+
+ final List<Chunk> valueChunkList = new
ArrayList<>(PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
+
+ // To ensure that the Tablet has the same alignedChunk column as the
current one,
+ // you need to create a new Tablet to fill in the data.
+ isSameDeviceID = false;
+
+ // Need to ensure that columnTypes recreates an array
+ final List<Tablet.ColumnCategory> categories =
+ new ArrayList<>(deviceIdSize +
PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH);
+ for (int i = 0; i < deviceIdSize; i++) {
+ categories.add(Tablet.ColumnCategory.TAG);
}
+ columnTypes = categories;
- // The metadata obtained by alignedChunkMetadata.getValueChunkMetadataList
may not be continuous
- // when reading TSFile Chunks, so reordering the metadata here has little
effect on the
- // efficiency of reading chunks.
- for (Pair<String, Integer> m : measurementColumIndexList) {
- final ChunkMetadata metadata = metadataMap.get(m.getLeft());
+ // Clean up the remaining non-DeviceID column information
+ measurementList.subList(deviceIdSize, measurementList.size()).clear();
+ dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();
+
+ final int startOffset = offset;
+ for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size();
++offset) {
+ final IChunkMetadata metadata =
alignedChunkMetadata.getValueChunkMetadataList().get(offset);
if (metadata != null) {
- final Chunk chunk = reader.readMemChunk(metadata);
+ // Record the column information corresponding to Meta to fill in
Tablet
+ columnTypes.add(Tablet.ColumnCategory.FIELD);
+ measurementList.add(metadata.getMeasurementUid());
+ dataTypeList.add(metadata.getDataType());
+ final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
size);
valueChunkList.add(chunk);
- continue;
}
- valueChunkList.add(null);
+ if (offset - startOffset >= PIPE_MAX_ALIGNED_SERIES_NUM_IN_ONE_BATCH) {
+ break;
+ }
+ }
+
+ if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) {
+ currentChunkMetadata = null;
}
this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
@@ -366,39 +376,37 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
private void fillMeasurementValueColumns(
final BatchData data, final Tablet tablet, final int rowIndex) {
final TsPrimitiveType[] primitiveTypes = data.getVector();
- final List<IMeasurementSchema> measurementSchemas = tablet.getSchemas();
- for (int i = 0, size = measurementColumIndexList.size(); i < size; i++) {
- final TsPrimitiveType primitiveType = primitiveTypes[i];
+ for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
+ final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
if (primitiveType == null) {
continue;
}
- final int index = measurementColumIndexList.get(i).getRight();
- switch (measurementSchemas.get(index).getType()) {
+ switch (dataTypeList.get(i)) {
case BOOLEAN:
- tablet.addValue(rowIndex, index, primitiveType.getBoolean());
+ tablet.addValue(rowIndex, i, primitiveType.getBoolean());
break;
case INT32:
- tablet.addValue(rowIndex, index, primitiveType.getInt());
+ tablet.addValue(rowIndex, i, primitiveType.getInt());
break;
case DATE:
- tablet.addValue(rowIndex, index,
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
+ tablet.addValue(rowIndex, i,
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
break;
case INT64:
case TIMESTAMP:
- tablet.addValue(rowIndex, index, primitiveType.getLong());
+ tablet.addValue(rowIndex, i, primitiveType.getLong());
break;
case FLOAT:
- tablet.addValue(rowIndex, index, primitiveType.getFloat());
+ tablet.addValue(rowIndex, i, primitiveType.getFloat());
break;
case DOUBLE:
- tablet.addValue(rowIndex, index, primitiveType.getDouble());
+ tablet.addValue(rowIndex, i, primitiveType.getDouble());
break;
case TEXT:
case BLOB:
case STRING:
- tablet.addValue(rowIndex, index,
primitiveType.getBinary().getValues());
+ tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues());
break;
default:
throw new UnSupportedDataTypeException("UnSupported" +
primitiveType.getDataType());
@@ -413,7 +421,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
if (deviceIdSegments[i] == null) {
continue;
}
- tablet.addValue(rowIndex, measurementIdIndexList.get(i - 1),
deviceIdSegments[i]);
+ tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]);
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6a78fce213a..9da31b3cbd7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -288,6 +288,7 @@ public class CommonConfig {
private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
+ private int pipeMaxAlignedSeriesNumInOneBatch = 15;
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
@@ -1284,6 +1285,14 @@ public class CommonConfig {
this.pipeLeaderCacheMemoryUsagePercentage =
pipeLeaderCacheMemoryUsagePercentage;
}
+ public int getPipeMaxAlignedSeriesNumInOneBatch() {
+ return pipeMaxAlignedSeriesNumInOneBatch;
+ }
+
+ public void setPipeMaxAlignedSeriesNumInOneBatch(int
pipeMaxAlignedSeriesNumInOneBatch) {
+ this.pipeMaxAlignedSeriesNumInOneBatch = pipeMaxAlignedSeriesNumInOneBatch;
+ }
+
public long getPipeListeningQueueTransferSnapshotThreshold() {
return pipeListeningQueueTransferSnapshotThreshold;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 36eb46b73a5..ac087dc2afd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -670,6 +670,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_leader_cache_memory_usage_percentage",
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
+ config.setPipeMaxAlignedSeriesNumInOneBatch(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_max_aligned_series_num_in_one_batch",
+
String.valueOf(config.getPipeMaxAlignedSeriesNumInOneBatch()))));
config.setPipeListeningQueueTransferSnapshotThreshold(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 3c13ae02cb7..03d8653b4dd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -192,6 +192,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
}
+ public int getPipeMaxAlignedSeriesNumInOneBatch() {
+ return COMMON_CONFIG.getPipeMaxAlignedSeriesNumInOneBatch();
+ }
+
public long getPipeListeningQueueTransferSnapshotThreshold() {
return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
}
@@ -443,6 +447,7 @@ public class PipeConfig {
isPipeConnectorRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
+ LOGGER.info("PipeMaxAlignedSeriesNumInOneBatch: {}",
getPipeMaxAlignedSeriesNumInOneBatch());
LOGGER.info(
"PipeListeningQueueTransferSnapshotThreshold: {}",
getPipeListeningQueueTransferSnapshotThreshold());