This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rc/2.0.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ce377d9fdd1d07d68c2dbf61fe72f7e828948893 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu May 22 10:42:18 2025 +0800 Pipe: Allow realtime subscriptions --- .../dataregion/IoTDBDataRegionExtractor.java | 30 +++------------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index 3572d700c92..9dabcb7f799 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -287,32 +287,6 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)); - // Validate extractor.realtime.mode - if (validator - .getParameters() - .getBooleanOrDefault( - Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), - EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE) - || validator - .getParameters() - .hasAnyAttributes( - SOURCE_START_TIME_KEY, - EXTRACTOR_START_TIME_KEY, - SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY)) { - validator.validateAttributeValueRange( - validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY) - ? EXTRACTOR_REALTIME_MODE_KEY - : SOURCE_REALTIME_MODE_KEY, - true, - EXTRACTOR_REALTIME_MODE_FILE_VALUE, - EXTRACTOR_REALTIME_MODE_HYBRID_VALUE, - EXTRACTOR_REALTIME_MODE_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE, - EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE, - EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE); - } - checkInvalidParameters(validator); constructHistoricalExtractor(); @@ -481,7 +455,9 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { return; } - if (pipeName == null || !pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + if (!(pipeName != null + && (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX) + || pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) { realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor(); return; }
