This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2699ed1ae8c Pipe: Set WAL to uncompressed when using real-time sync
(#15733) (#15757)
2699ed1ae8c is described below
commit 2699ed1ae8cd02e8b80ba6057581b4456028f44e
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 18 15:07:28 2025 +0800
Pipe: Set WAL to uncompressed when using real-time sync (#15733) (#15757)
---
.../dataregion/IoTDBDataRegionExtractor.java | 23 +++++++++++++++++-----
1 file changed, 18 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index b0ccea5cc43..808df12dfac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,7 +270,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
// Use hybrid mode by default
if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
LOGGER.info(
"Pipe: '{}' is not set, use hybrid mode by default.",
EXTRACTOR_REALTIME_MODE_KEY);
@@ -284,15 +285,15 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
break;
case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
break;
default:
- checkWalEnable(parameters);
+ checkWalEnableAndSetUncompressed(parameters);
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
@@ -302,7 +303,8 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
}
}
- private void checkWalEnable(final PipeParameters parameters) throws
IllegalPathException {
+ private void checkWalEnableAndSetUncompressed(final PipeParameters
parameters)
+ throws IllegalPathException {
if (Boolean.TRUE.equals(
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
.getLeft())
@@ -310,6 +312,17 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
throw new PipeException(
"The pipe cannot transfer realtime insertion if data region disables
wal. Please set 'realtime.mode'='batch' in source parameters when enabling
realtime transmission.");
}
+
+ if (!IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getWALCompressionAlgorithm()
+ .equals(CompressionType.UNCOMPRESSED)) {
+ LOGGER.info(
+ "The pipe prefers uncompressed wal, and may introduce certain delay
in realtime insert syncing without it. Hence, we change it to uncompressed.");
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+ }
}
@Override