This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-rename in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e1e620dfb5b2dd36971ff292a4ca5b63e8b168d0 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Oct 27 18:28:15 2023 +0800 rename: extractor -> source --- .../api/customizer/parameter/PipeParameters.java | 135 ++++++++++++++++++--- .../persistence/pipe/PipePluginInfo.java | 3 +- .../db/pipe/agent/plugin/PipePluginAgent.java | 4 +- .../config/constant/PipeConnectorConstant.java | 18 +++ .../config/constant/PipeExtractorConstant.java | 32 +++-- .../pipe/extractor/IoTDBDataRegionExtractor.java | 81 ++++++++----- .../PipeHistoricalDataRegionTsFileExtractor.java | 48 ++++++-- .../realtime/PipeRealtimeDataRegionExtractor.java | 10 +- .../db/pipe/task/stage/PipeTaskExtractorStage.java | 5 +- .../extractor/IoTDBDataRegionExtractorTest.java | 8 +- 10 files changed, 270 insertions(+), 74 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index cfea33aa3b9..2d00ce717b8 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -24,6 +24,7 @@ import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import java.util.List; import java.util.Map; /** @@ -51,33 +52,73 @@ public class PipeParameters { return attributes.containsKey(key); } - public String getString(String key) { - return attributes.get(key); + public boolean hasAnyAttributes(String... keys) { + for (final String key : keys) { + if (attributes.containsKey(key)) { + return true; + } + } + return false; } - public Boolean getBoolean(String key) { - String value = attributes.get(key); - return value == null ? null : Boolean.parseBoolean(value); + public String getString(String... keys) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return value; + } + } + return null; } - public Integer getInt(String key) { - String value = attributes.get(key); - return value == null ? null : Integer.parseInt(value); + public Boolean getBoolean(String... keys) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Boolean.parseBoolean(value); + } + } + return null; } - public Long getLong(String key) { - String value = attributes.get(key); - return value == null ? null : Long.parseLong(value); + public Integer getInt(String... keys) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Integer.parseInt(value); + } + } + return null; } - public Float getFloat(String key) { - String value = attributes.get(key); - return value == null ? null : Float.parseFloat(value); + public Long getLong(String... keys) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Long.parseLong(value); + } + } + return null; } - public Double getDouble(String key) { - String value = attributes.get(key); - return value == null ? null : Double.parseDouble(value); + public Float getFloat(String... keys) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Float.parseFloat(value); + } + } + return null; + } + + public Double getDouble(String... keys) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Double.parseDouble(value); + } + } + return null; } public String getStringOrDefault(String key, String defaultValue) { @@ -110,6 +151,66 @@ public class PipeParameters { return value == null ? defaultValue : Double.parseDouble(value); } + public String getStringOrDefault(List<String> keys, String defaultValue) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return value; + } + } + return defaultValue; + } + + public boolean getBooleanOrDefault(List<String> keys, boolean defaultValue) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Boolean.parseBoolean(value); + } + } + return defaultValue; + } + + public int getIntOrDefault(List<String> keys, int defaultValue) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Integer.parseInt(value); + } + } + return defaultValue; + } + + public long getLongOrDefault(List<String> keys, long defaultValue) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Long.parseLong(value); + } + } + return defaultValue; + } + + public float getFloatOrDefault(List<String> keys, float defaultValue) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Float.parseFloat(value); + } + } + return defaultValue; + } + + public double getDoubleOrDefault(List<String> keys, double defaultValue) { + for (final String key : keys) { + final String value = attributes.get(key); + if (value != null) { + return Double.parseDouble(value); + } + } + return defaultValue; + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index bdc30ddd683..5b1510957c7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -126,7 +126,8 @@ public class PipePluginInfo implements SnapshotProcessor { new PipeParameters(createPipeRequest.getExtractorAttributes()); final String extractorPluginName = extractorParameters.getStringOrDefault( - PipeExtractorConstant.EXTRACTOR_KEY, IOTDB_EXTRACTOR.getPipePluginName()); + Arrays.asList(PipeExtractorConstant.EXTRACTOR_KEY, PipeExtractorConstant.SOURCE_KEY), + IOTDB_EXTRACTOR.getPipePluginName()); if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) { final String exceptionMessage = String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java index 59b18f77014..b1914c2bc61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.concurrent.locks.ReentrantLock; public class PipePluginAgent { @@ -203,7 +204,8 @@ public class PipePluginAgent { return (PipeExtractor) reflect( extractorParameters.getStringOrDefault( - PipeExtractorConstant.EXTRACTOR_KEY, + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_KEY, PipeExtractorConstant.SOURCE_KEY), BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java index 8ed5af7ed56..93e28a0a040 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java @@ -29,51 +29,69 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.MB; public class PipeConnectorConstant { public static final String CONNECTOR_KEY = "connector"; + public static final String SINK_KEY = "sink"; public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip"; + public static final String SINK_IOTDB_IP_KEY = "sink.ip"; public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port"; + public static final String SINK_IOTDB_PORT_KEY = "sink.port"; public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls"; + public static final String SINK_IOTDB_NODE_URLS_KEY = "sink.node-urls"; public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = "connector.parallel.tasks"; + public static final String SINK_IOTDB_PARALLEL_TASKS_KEY = "sink.parallel.tasks"; public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE = PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(); public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable"; + public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = "sink.batch.enable"; public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true; public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = "connector.batch.max-delay-seconds"; + public static final String SINK_IOTDB_BATCH_DELAY_KEY = "sink.batch.max-delay-seconds"; public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1; public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = "connector.batch.size-bytes"; + public static final String SINK_IOTDB_BATCH_SIZE_KEY = "sink.batch.size-bytes"; public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB; public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user"; + public static final String SINK_IOTDB_USER_KEY = "sink.user"; public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root"; public static final String CONNECTOR_IOTDB_PASSWORD_KEY = "connector.password"; + public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password"; public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root"; public static final String CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY = "connector.air-gap.e-language.enable"; + public static final String SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY = "sink.air-gap.e-language.enable"; public static final boolean CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE = false; public static final String CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY = "connector.air-gap.handshake-timeout-ms"; + public static final String SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY = + "sink.air-gap.handshake-timeout-ms"; public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE = 5000; public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "connector.version"; + public static final String SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY = "sink.version"; public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1"; public static final String CONNECTOR_WEBSOCKET_PORT_KEY = "connector.websocket.port"; + public static final String SINK_WEBSOCKET_PORT_KEY = "sink.websocket.port"; public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080; public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY = "connector.opcua.tcp.port"; + public static final String SINK_OPC_UA_TCP_BIND_PORT_KEY = "sink.opcua.tcp.port"; public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686; public static final String CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY = "connector.opcua.https.port"; + public static final String SINK_OPC_UA_HTTPS_BIND_PORT_KEY = "sink.opcua.https.port"; public static final int CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE = 8443; public static final String CONNECTOR_OPC_UA_SECURITY_DIR_KEY = "connector.opcua.security.dir"; + public static final String SINK_OPC_UA_SECURITY_DIR_KEY = "sink.opcua.security.dir"; public static final String CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE = IoTDBDescriptor.getInstance().getConfDir() != null ? IoTDBDescriptor.getInstance().getConfDir() + File.separatorChar + "opc_security" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java index f432e9838cd..a1eead9e6f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java @@ -22,25 +22,37 @@ package org.apache.iotdb.db.pipe.config.constant; public class PipeExtractorConstant { public static final String EXTRACTOR_KEY = "extractor"; + public static final String SOURCE_KEY = "source"; public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern"; + public static final String SOURCE_PATTERN_KEY = "source.pattern"; public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root"; public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY = "extractor.forwarding-pipe-requests"; + public static final String SOURCE_FORWARDING_PIPE_REQUESTS_KEY = + "source.forwarding-pipe-requests"; public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE = true; public static final String EXTRACTOR_HISTORY_ENABLE_KEY = "extractor.history.enable"; - public static final String EXTRACTOR_HISTORY_START_TIME = "extractor.history.start-time"; - public static final String EXTRACTOR_HISTORY_END_TIME = "extractor.history.end-time"; - public static final String EXTRACTOR_HISTORY_LOOSE_RANGE = "extractor.history.loose-range"; - - public static final String EXTRACTOR_REALTIME_ENABLE = "extractor.realtime.enable"; - public static final String EXTRACTOR_REALTIME_MODE = "extractor.realtime.mode"; - public static final String EXTRACTOR_REALTIME_MODE_HYBRID = "hybrid"; - public static final String EXTRACTOR_REALTIME_MODE_FILE = "file"; - public static final String EXTRACTOR_REALTIME_MODE_LOG = "log"; - public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG = "forced-log"; + public static final String SOURCE_HISTORY_ENABLE_KEY = "source.history.enable"; + public static final boolean EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE = true; + public static final String EXTRACTOR_HISTORY_START_TIME_KEY = "extractor.history.start-time"; + public static final String SOURCE_HISTORY_START_TIME_KEY = "source.history.start-time"; + public static final String EXTRACTOR_HISTORY_END_TIME_KEY = "extractor.history.end-time"; + public static final String SOURCE_HISTORY_END_TIME_KEY = "source.history.end-time"; + public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_KEY = "extractor.history.loose-range"; + public static final String SOURCE_HISTORY_LOOSE_RANGE_KEY = "source.history.loose-range"; + + public static final String EXTRACTOR_REALTIME_ENABLE_KEY = "extractor.realtime.enable"; + public static final String SOURCE_REALTIME_ENABLE_KEY = "source.realtime.enable"; + public static final boolean EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE = true; + public static final String EXTRACTOR_REALTIME_MODE_KEY = "extractor.realtime.mode"; + public static final String SOURCE_REALTIME_MODE_KEY = "source.realtime.mode"; + public static final String EXTRACTOR_REALTIME_MODE_HYBRID_VALUE = "hybrid"; + public static final String EXTRACTOR_REALTIME_MODE_FILE_VALUE = "file"; + public static final String EXTRACTOR_REALTIME_MODE_LOG_VALUE = "log"; + public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE = "forced-log"; private PipeExtractorConstant() { throw new IllegalStateException("Utility class"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index c0f120a0d40..f84020760b8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -43,17 +43,23 @@ import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY; public class IoTDBDataRegionExtractor implements PipeExtractor { @@ -78,24 +84,40 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { .validateAttributeValueRange( EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) .validateAttributeValueRange( - EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) + EXTRACTOR_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .validateAttributeValueRange( + SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .validateAttributeValueRange( + SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) .validate( args -> (boolean) args[0] || (boolean) args[1], - String.format( - "Should not set both %s and %s to false.", - EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE), - validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true), - validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)); + "Should not set both history.enable and realtime.enable to false.", + validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), + EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE), + validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), + EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)); // Validate extractor.realtime.mode - if (validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) { + if (validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), + EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { validator.validateAttributeValueRange( - EXTRACTOR_REALTIME_MODE, + validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY) + ? EXTRACTOR_REALTIME_MODE_KEY + : SOURCE_REALTIME_MODE_KEY, true, - EXTRACTOR_REALTIME_MODE_FILE, - EXTRACTOR_REALTIME_MODE_HYBRID, - EXTRACTOR_REALTIME_MODE_LOG, - EXTRACTOR_REALTIME_MODE_FORCED_LOG); + EXTRACTOR_REALTIME_MODE_FILE_VALUE, + EXTRACTOR_REALTIME_MODE_HYBRID_VALUE, + EXTRACTOR_REALTIME_MODE_LOG_VALUE, + EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE); } constructHistoricalExtractor(); @@ -112,28 +134,31 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { private void constructRealtimeExtractor(PipeParameters parameters) { // Enable realtime extractor by default - if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) { + if (!parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), + EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor(); - LOGGER.info("'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE); + LOGGER.info( + "'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE_KEY); return; } // Use hybrid mode by default - if (!parameters.hasAttribute(EXTRACTOR_REALTIME_MODE)) { + if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); - LOGGER.info("'{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE); + LOGGER.info("'{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE_KEY); return; } - switch (parameters.getString(EXTRACTOR_REALTIME_MODE)) { - case EXTRACTOR_REALTIME_MODE_FILE: + switch (parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { + case EXTRACTOR_REALTIME_MODE_FILE_VALUE: realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor(); break; - case EXTRACTOR_REALTIME_MODE_HYBRID: - case EXTRACTOR_REALTIME_MODE_LOG: + case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE: + case EXTRACTOR_REALTIME_MODE_LOG_VALUE: realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); break; - case EXTRACTOR_REALTIME_MODE_FORCED_LOG: + case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE: realtimeExtractor = new PipeRealtimeDataRegionLogExtractor(); break; default: @@ -141,7 +166,7 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { if (LOGGER.isWarnEnabled()) { LOGGER.warn( "Unsupported extractor realtime mode: {}, create a hybrid extractor.", - parameters.getString(EXTRACTOR_REALTIME_MODE)); + parameters.getString(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index 0b00632ff0f..009a9f510c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -49,12 +49,18 @@ import java.util.Map; import java.util.Queue; import java.util.stream.Collectors; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE; -import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY; public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDataRegionExtractor { @@ -100,7 +106,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L); } - pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, EXTRACTOR_PATTERN_DEFAULT_VALUE); + pattern = + parameters.getStringOrDefault( + Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY), + EXTRACTOR_PATTERN_DEFAULT_VALUE); final DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(environment.getRegionId())); if (dataRegion != null) { @@ -115,21 +124,32 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa // User may set the EXTRACTOR_HISTORY_START_TIME and EXTRACTOR_HISTORY_END_TIME without // enabling the historical data extraction, which may affect the realtime data extraction. final boolean isHistoricalExtractorEnabledByUser = - parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true); + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), + EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); historicalDataExtractionStartTime = - isHistoricalExtractorEnabledByUser && parameters.hasAttribute(EXTRACTOR_HISTORY_START_TIME) + isHistoricalExtractorEnabledByUser + && parameters.hasAnyAttributes( + EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY) ? DateTimeUtils.convertDatetimeStrToLong( - parameters.getString(EXTRACTOR_HISTORY_START_TIME), ZoneId.systemDefault()) + parameters.getString( + EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY), + ZoneId.systemDefault()) : Long.MIN_VALUE; historicalDataExtractionEndTime = - isHistoricalExtractorEnabledByUser && parameters.hasAttribute(EXTRACTOR_HISTORY_END_TIME) + isHistoricalExtractorEnabledByUser + && parameters.hasAnyAttributes( + EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY) ? DateTimeUtils.convertDatetimeStrToLong( - parameters.getString(EXTRACTOR_HISTORY_END_TIME), ZoneId.systemDefault()) + parameters.getString(EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY), + ZoneId.systemDefault()) : Long.MAX_VALUE; // Enable historical extractor by default historicalDataExtractionTimeLowerBound = - parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true) + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), + EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE) ? Long.MIN_VALUE // We define the realtime data as the data generated after the creation time // of the pipe from user's perspective. But we still need to use @@ -168,7 +188,13 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa } sloppyTimeRange = - Arrays.stream(parameters.getStringOrDefault(EXTRACTOR_HISTORY_LOOSE_RANGE, "").split(",")) + Arrays.stream( + parameters + .getStringOrDefault( + Arrays.asList( + EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY), + "") + .split(",")) .map(String::trim) .map(String::toLowerCase) .collect(Collectors.toSet()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java index 2634424ed0d..fb88a91fb93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java @@ -36,9 +36,13 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY; +import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY; + public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { protected String pipeName; @@ -78,7 +82,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { pattern = parameters.getStringOrDefault( - PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, + Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY), PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE); final DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(environment.getRegionId())); @@ -93,7 +97,9 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor { isForwardingPipeRequests = parameters.getBooleanOrDefault( - PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY, + PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java index 75c5955c491..2798d18fa61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java @@ -33,6 +33,8 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; +import java.util.Arrays; + public class PipeTaskExtractorStage extends PipeTaskStage { private final PipeExtractor pipeExtractor; @@ -46,7 +48,8 @@ public class PipeTaskExtractorStage extends PipeTaskStage { pipeExtractor = extractorParameters .getStringOrDefault( - PipeExtractorConstant.EXTRACTOR_KEY, + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_KEY, PipeExtractorConstant.SOURCE_KEY), BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) .equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) ? new IoTDBDataRegionExtractor() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java index 3d4bbbbb9b3..1914316fd28 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java @@ -41,10 +41,12 @@ public class IoTDBDataRegionExtractorTest { put( PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY, Boolean.TRUE.toString()); - put(PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE, Boolean.TRUE.toString()); put( - PipeExtractorConstant.EXTRACTOR_REALTIME_MODE, - PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID); + PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY, + Boolean.TRUE.toString()); + put( + PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY, + PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE); } }))); } catch (Exception e) {
