This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 00fc9b5708b Pipe: rename extractor -> source, connector -> sink,
sloppy-time-range -> loose-range (#11406)
00fc9b5708b is described below
commit 00fc9b5708bd5cfbbcd971055c53b1141b8c3f32
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Oct 29 17:35:23 2023 +0800
Pipe: rename extractor -> source, connector -> sink, sloppy-time-range ->
loose-range (#11406)
---
.../api/customizer/parameter/PipeParameters.java | 135 ++++++++++++++++++---
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 2 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 4 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 8 ++
.../persistence/pipe/PipePluginInfo.java | 9 +-
.../db/pipe/agent/plugin/PipePluginAgent.java | 11 +-
.../config/constant/PipeConnectorConstant.java | 18 +++
.../config/constant/PipeExtractorConstant.java | 33 +++--
.../builder/PipeTransferBatchReqBuilder.java | 9 +-
.../db/pipe/connector/protocol/IoTDBConnector.java | 40 +++++-
.../protocol/airgap/IoTDBAirGapConnector.java | 8 +-
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 37 ++++--
.../connector/protocol/opcua/OpcUaConnector.java | 22 +++-
.../protocol/websocket/WebSocketConnector.java | 5 +-
.../pipe/extractor/IoTDBDataRegionExtractor.java | 81 ++++++++-----
.../PipeHistoricalDataRegionTsFileExtractor.java | 62 ++++++++--
.../realtime/PipeRealtimeDataRegionExtractor.java | 10 +-
.../db/pipe/task/stage/PipeTaskExtractorStage.java | 5 +-
.../connector/PipeConnectorSubtaskManager.java | 7 +-
.../plan/analyze/LoadTsfileAnalyzer.java | 5 +-
.../extractor/IoTDBDataRegionExtractorTest.java | 8 +-
21 files changed, 411 insertions(+), 108 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index cfea33aa3b9..2d00ce717b8 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.pipe.api.PipeProcessor;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import java.util.List;
import java.util.Map;
/**
@@ -51,33 +52,73 @@ public class PipeParameters {
return attributes.containsKey(key);
}
- public String getString(String key) {
- return attributes.get(key);
+ public boolean hasAnyAttributes(String... keys) {
+ for (final String key : keys) {
+ if (attributes.containsKey(key)) {
+ return true;
+ }
+ }
+ return false;
}
- public Boolean getBoolean(String key) {
- String value = attributes.get(key);
- return value == null ? null : Boolean.parseBoolean(value);
+ public String getString(String... keys) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return value;
+ }
+ }
+ return null;
}
- public Integer getInt(String key) {
- String value = attributes.get(key);
- return value == null ? null : Integer.parseInt(value);
+ public Boolean getBoolean(String... keys) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Boolean.parseBoolean(value);
+ }
+ }
+ return null;
}
- public Long getLong(String key) {
- String value = attributes.get(key);
- return value == null ? null : Long.parseLong(value);
+ public Integer getInt(String... keys) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Integer.parseInt(value);
+ }
+ }
+ return null;
}
- public Float getFloat(String key) {
- String value = attributes.get(key);
- return value == null ? null : Float.parseFloat(value);
+ public Long getLong(String... keys) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Long.parseLong(value);
+ }
+ }
+ return null;
}
- public Double getDouble(String key) {
- String value = attributes.get(key);
- return value == null ? null : Double.parseDouble(value);
+ public Float getFloat(String... keys) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Float.parseFloat(value);
+ }
+ }
+ return null;
+ }
+
+ public Double getDouble(String... keys) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Double.parseDouble(value);
+ }
+ }
+ return null;
}
public String getStringOrDefault(String key, String defaultValue) {
@@ -110,6 +151,66 @@ public class PipeParameters {
return value == null ? defaultValue : Double.parseDouble(value);
}
+ public String getStringOrDefault(List<String> keys, String defaultValue) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return value;
+ }
+ }
+ return defaultValue;
+ }
+
+ public boolean getBooleanOrDefault(List<String> keys, boolean defaultValue) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Boolean.parseBoolean(value);
+ }
+ }
+ return defaultValue;
+ }
+
+ public int getIntOrDefault(List<String> keys, int defaultValue) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Integer.parseInt(value);
+ }
+ }
+ return defaultValue;
+ }
+
+ public long getLongOrDefault(List<String> keys, long defaultValue) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Long.parseLong(value);
+ }
+ }
+ return defaultValue;
+ }
+
+ public float getFloatOrDefault(List<String> keys, float defaultValue) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Float.parseFloat(value);
+ }
+ }
+ return defaultValue;
+ }
+
+ public double getDoubleOrDefault(List<String> keys, double defaultValue) {
+ for (final String key : keys) {
+ final String value = attributes.get(key);
+ if (value != null) {
+ return Double.parseDouble(value);
+ }
+ }
+ return defaultValue;
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index af97d6e2047..e92b7b99dfa 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -198,8 +198,10 @@ keyWords
| SETTLE
| SGLEVEL
| SHOW
+ | SINK
| SLIMIT
| SOFFSET
+ | SOURCE
| SPACE
| STORAGE
| START
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index dc4235e4013..fefed050b27 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -521,7 +521,7 @@ createPipe
;
extractorAttributesClause
- : WITH EXTRACTOR
+ : WITH (EXTRACTOR | SOURCE)
LR_BRACKET
(extractorAttributeClause COMMA)* extractorAttributeClause?
RR_BRACKET
@@ -543,7 +543,7 @@ processorAttributeClause
;
connectorAttributesClause
- : WITH CONNECTOR
+ : WITH (CONNECTOR | SINK)
LR_BRACKET
(connectorAttributeClause COMMA)* connectorAttributeClause?
RR_BRACKET
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index a1cf3cf6ad2..d98d9b1cbcd 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -690,6 +690,10 @@ SHOW
: S H O W
;
+SINK
+ : S I N K
+ ;
+
SLIMIT
: S L I M I T
;
@@ -698,6 +702,10 @@ SOFFSET
: S O F F S E T
;
+SOURCE
+ : S O U R C E
+ ;
+
SPACE
: S P A C E
;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index bdc30ddd683..85ab9f69adc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -126,7 +126,8 @@ public class PipePluginInfo implements SnapshotProcessor {
new PipeParameters(createPipeRequest.getExtractorAttributes());
final String extractorPluginName =
extractorParameters.getStringOrDefault(
- PipeExtractorConstant.EXTRACTOR_KEY,
IOTDB_EXTRACTOR.getPipePluginName());
+ Arrays.asList(PipeExtractorConstant.EXTRACTOR_KEY,
PipeExtractorConstant.SOURCE_KEY),
+ IOTDB_EXTRACTOR.getPipePluginName());
if (!pipePluginMetaKeeper.containsPipePlugin(extractorPluginName)) {
final String exceptionMessage =
String.format(
@@ -152,14 +153,16 @@ public class PipePluginInfo implements SnapshotProcessor {
final PipeParameters connectorParameters =
new PipeParameters(createPipeRequest.getConnectorAttributes());
- if
(!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+ if (!connectorParameters.hasAnyAttributes(
+ PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
final String exceptionMessage =
"Failed to create pipe, the pipe connector plugin is not specified";
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
final String connectorPluginName =
- connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY);
+ connectorParameters.getString(
+ PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY);
if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
final String exceptionMessage =
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 59b18f77014..a7edab29de8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;
public class PipePluginAgent {
@@ -203,7 +204,8 @@ public class PipePluginAgent {
return (PipeExtractor)
reflect(
extractorParameters.getStringOrDefault(
- PipeExtractorConstant.EXTRACTOR_KEY,
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_KEY,
PipeExtractorConstant.SOURCE_KEY),
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()));
}
@@ -216,13 +218,16 @@ public class PipePluginAgent {
}
public PipeConnector reflectConnector(PipeParameters connectorParameters) {
- if
(!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+ if (!connectorParameters.hasAnyAttributes(
+ PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY)) {
throw new PipeException(
"Failed to reflect PipeConnector instance because "
+ "'connector' is not specified in the parameters.");
}
return (PipeConnector)
-
reflect(connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY));
+ reflect(
+ connectorParameters.getString(
+ PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY));
}
private PipePlugin reflect(String pluginName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 8ed5af7ed56..93e28a0a040 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -29,51 +29,69 @@ import static
org.apache.iotdb.commons.conf.IoTDBConstant.MB;
public class PipeConnectorConstant {
public static final String CONNECTOR_KEY = "connector";
+ public static final String SINK_KEY = "sink";
public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
+ public static final String SINK_IOTDB_IP_KEY = "sink.ip";
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
+ public static final String SINK_IOTDB_PORT_KEY = "sink.port";
public static final String CONNECTOR_IOTDB_NODE_URLS_KEY =
"connector.node-urls";
+ public static final String SINK_IOTDB_NODE_URLS_KEY = "sink.node-urls";
public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY =
"connector.parallel.tasks";
+ public static final String SINK_IOTDB_PARALLEL_TASKS_KEY =
"sink.parallel.tasks";
public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY =
"connector.batch.enable";
+ public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY =
"sink.batch.enable";
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE
= true;
public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY =
"connector.batch.max-delay-seconds";
+ public static final String SINK_IOTDB_BATCH_DELAY_KEY =
"sink.batch.max-delay-seconds";
public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.size-bytes";
+ public static final String SINK_IOTDB_BATCH_SIZE_KEY =
"sink.batch.size-bytes";
public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
+ public static final String SINK_IOTDB_USER_KEY = "sink.user";
public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
public static final String CONNECTOR_IOTDB_PASSWORD_KEY =
"connector.password";
+ public static final String SINK_IOTDB_PASSWORD_KEY = "sink.password";
public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
public static final String CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY =
"connector.air-gap.e-language.enable";
+ public static final String SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY =
"sink.air-gap.e-language.enable";
public static final boolean
CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE = false;
public static final String CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
"connector.air-gap.handshake-timeout-ms";
+ public static final String SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
+ "sink.air-gap.handshake-timeout-ms";
public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE
= 5000;
public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY =
"connector.version";
+ public static final String SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY =
"sink.version";
public static final String
CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
public static final String CONNECTOR_WEBSOCKET_PORT_KEY =
"connector.websocket.port";
+ public static final String SINK_WEBSOCKET_PORT_KEY = "sink.websocket.port";
public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;
public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY =
"connector.opcua.tcp.port";
+ public static final String SINK_OPC_UA_TCP_BIND_PORT_KEY =
"sink.opcua.tcp.port";
public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
public static final String CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY =
"connector.opcua.https.port";
+ public static final String SINK_OPC_UA_HTTPS_BIND_PORT_KEY =
"sink.opcua.https.port";
public static final int CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE =
8443;
public static final String CONNECTOR_OPC_UA_SECURITY_DIR_KEY =
"connector.opcua.security.dir";
+ public static final String SINK_OPC_UA_SECURITY_DIR_KEY =
"sink.opcua.security.dir";
public static final String CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE =
IoTDBDescriptor.getInstance().getConfDir() != null
? IoTDBDescriptor.getInstance().getConfDir() + File.separatorChar +
"opc_security"
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
index b4be2ed34b9..a1eead9e6f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeExtractorConstant.java
@@ -22,26 +22,37 @@ package org.apache.iotdb.db.pipe.config.constant;
public class PipeExtractorConstant {
public static final String EXTRACTOR_KEY = "extractor";
+ public static final String SOURCE_KEY = "source";
public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
+ public static final String SOURCE_PATTERN_KEY = "source.pattern";
public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root";
public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
"extractor.forwarding-pipe-requests";
+ public static final String SOURCE_FORWARDING_PIPE_REQUESTS_KEY =
+ "source.forwarding-pipe-requests";
public static final boolean EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE
= true;
public static final String EXTRACTOR_HISTORY_ENABLE_KEY =
"extractor.history.enable";
- public static final String EXTRACTOR_HISTORY_START_TIME =
"extractor.history.start-time";
- public static final String EXTRACTOR_HISTORY_END_TIME =
"extractor.history.end-time";
- public static final String EXTRACTOR_HISTORY_SLOPPY_TIME_RANGE =
- "extractor.history.sloppy-time-range";
-
- public static final String EXTRACTOR_REALTIME_ENABLE =
"extractor.realtime.enable";
- public static final String EXTRACTOR_REALTIME_MODE =
"extractor.realtime.mode";
- public static final String EXTRACTOR_REALTIME_MODE_HYBRID = "hybrid";
- public static final String EXTRACTOR_REALTIME_MODE_FILE = "file";
- public static final String EXTRACTOR_REALTIME_MODE_LOG = "log";
- public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG = "forced-log";
+ public static final String SOURCE_HISTORY_ENABLE_KEY =
"source.history.enable";
+ public static final boolean EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE = true;
+ public static final String EXTRACTOR_HISTORY_START_TIME_KEY =
"extractor.history.start-time";
+ public static final String SOURCE_HISTORY_START_TIME_KEY =
"source.history.start-time";
+ public static final String EXTRACTOR_HISTORY_END_TIME_KEY =
"extractor.history.end-time";
+ public static final String SOURCE_HISTORY_END_TIME_KEY =
"source.history.end-time";
+ public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_KEY =
"extractor.history.loose-range";
+ public static final String SOURCE_HISTORY_LOOSE_RANGE_KEY =
"source.history.loose-range";
+
+ public static final String EXTRACTOR_REALTIME_ENABLE_KEY =
"extractor.realtime.enable";
+ public static final String SOURCE_REALTIME_ENABLE_KEY =
"source.realtime.enable";
+ public static final boolean EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE = true;
+ public static final String EXTRACTOR_REALTIME_MODE_KEY =
"extractor.realtime.mode";
+ public static final String SOURCE_REALTIME_MODE_KEY = "source.realtime.mode";
+ public static final String EXTRACTOR_REALTIME_MODE_HYBRID_VALUE = "hybrid";
+ public static final String EXTRACTOR_REALTIME_MODE_FILE_VALUE = "file";
+ public static final String EXTRACTOR_REALTIME_MODE_LOG_VALUE = "log";
+ public static final String EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE =
"forced-log";
private PipeExtractorConstant() {
throw new IllegalStateException("Utility class");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 7d3d3b38c23..9521e6a97aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -32,12 +32,15 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
public abstract class PipeTransferBatchReqBuilder {
@@ -55,11 +58,13 @@ public abstract class PipeTransferBatchReqBuilder {
protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
maxDelayInMs =
parameters.getIntOrDefault(
- CONNECTOR_IOTDB_BATCH_DELAY_KEY,
CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
+ CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
* 1000;
maxBatchSizeInBytes =
parameters.getLongOrDefault(
- CONNECTOR_IOTDB_BATCH_SIZE_KEY,
CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
+ CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
}
public List<TPipeTransferReq> getTPipeTransferReqs() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index 1ca5db5ba15..3a2bae03068 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -40,6 +40,10 @@ import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
public abstract class IoTDBConnector implements PipeConnector {
@@ -53,13 +57,25 @@ public abstract class IoTDBConnector implements
PipeConnector {
public void validate(PipeParameterValidator validator) throws Exception {
final PipeParameters parameters = validator.getParameters();
validator.validate(
- args -> (boolean) args[0] || ((boolean) args[1] && (boolean) args[2]),
+ args ->
+ (boolean) args[0]
+ || ((boolean) args[1] && (boolean) args[2])
+ || (boolean) args[3]
+ || ((boolean) args[4] && (boolean) args[5]),
String.format(
- "Either %s or %s:%s must be specified",
- CONNECTOR_IOTDB_NODE_URLS_KEY, CONNECTOR_IOTDB_IP_KEY,
CONNECTOR_IOTDB_PORT_KEY),
+ "One of %s, %s:%s, %s, %s:%s must be specified",
+ CONNECTOR_IOTDB_NODE_URLS_KEY,
+ CONNECTOR_IOTDB_IP_KEY,
+ CONNECTOR_IOTDB_PORT_KEY,
+ SINK_IOTDB_NODE_URLS_KEY,
+ SINK_IOTDB_IP_KEY,
+ SINK_IOTDB_PORT_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
- parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY));
+ parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+ parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
+ parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+ parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
}
@Override
@@ -75,19 +91,33 @@ public abstract class IoTDBConnector implements
PipeConnector {
parameters.getInt(CONNECTOR_IOTDB_PORT_KEY)));
}
+ if (parameters.hasAttribute(SINK_IOTDB_IP_KEY)
+ && parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) {
+ givenNodeUrls.add(
+ new TEndPoint(
+ parameters.getString(SINK_IOTDB_IP_KEY),
parameters.getInt(SINK_IOTDB_PORT_KEY)));
+ }
+
if (parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY)) {
givenNodeUrls.addAll(
SessionUtils.parseSeedNodeUrls(
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(","))));
}
+ if (parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY)) {
+ givenNodeUrls.addAll(
+ SessionUtils.parseSeedNodeUrls(
+
Arrays.asList(parameters.getString(SINK_IOTDB_NODE_URLS_KEY).split(","))));
+ }
+
nodeUrls.clear();
nodeUrls.addAll(givenNodeUrls);
LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
isTabletBatchModeEnabled =
parameters.getBooleanOrDefault(
- CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
+ CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index add1c166687..8beb6f4d9bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -64,6 +64,8 @@ import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
public class IoTDBAirGapConnector extends IoTDBConnector {
@@ -98,14 +100,16 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
handshakeTimeoutMs =
parameters.getIntOrDefault(
- CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY,
+ Arrays.asList(
+ CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY,
SINK_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY),
CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE);
LOGGER.info(
"IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.",
handshakeTimeoutMs);
eLanguageEnable =
parameters.getBooleanOrDefault(
- CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY,
+ Arrays.asList(
+ CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_KEY,
SINK_AIR_GAP_E_LANGUAGE_ENABLE_KEY),
CONNECTOR_AIR_GAP_E_LANGUAGE_ENABLE_DEFAULT_VALUE);
LOGGER.info("IoTDBAirGapConnector is customized with eLanguageEnable:
{}.", eLanguageEnable);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 2fa42762897..f8a05de6f42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -58,6 +58,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
@@ -67,6 +68,11 @@ import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
public class IoTDBLegacyPipeConnector implements PipeConnector {
@@ -91,26 +97,41 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
@Override
public void validate(PipeParameterValidator validator) throws Exception {
- validator
- .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY)
- .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY);
+ final PipeParameters parameters = validator.getParameters();
+ validator.validate(
+ args ->
+ ((boolean) args[0] && (boolean) args[1]) || ((boolean) args[2] &&
(boolean) args[3]),
+ String.format(
+ "Either %s:%s or %s:%s must be specified",
+ CONNECTOR_IOTDB_IP_KEY,
+ CONNECTOR_IOTDB_PORT_KEY,
+ SINK_IOTDB_IP_KEY,
+ SINK_IOTDB_PORT_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
+ parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
+ parameters.hasAttribute(SINK_IOTDB_IP_KEY),
+ parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
}
@Override
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
throws Exception {
- ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
- port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
+ ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY,
SINK_IOTDB_IP_KEY);
+ port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY, SINK_IOTDB_PORT_KEY);
user =
- parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY,
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+ CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
password =
parameters.getStringOrDefault(
- CONNECTOR_IOTDB_PASSWORD_KEY,
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+ Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY,
SINK_IOTDB_PASSWORD_KEY),
+ CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
syncConnectorVersion =
parameters.getStringOrDefault(
- CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY,
+ Arrays.asList(
+ CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY,
SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY),
CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE);
pipeName = configuration.getRuntimeEnvironment().getPipeName();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index cdfcb044392..dad75907cd9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -45,6 +45,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -60,6 +61,11 @@ import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
/**
* Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data
are converted into
@@ -86,19 +92,25 @@ public class OpcUaConnector implements PipeConnector {
throws Exception {
int tcpBindPort =
parameters.getIntOrDefault(
- CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY,
CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
+ Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY,
SINK_OPC_UA_TCP_BIND_PORT_KEY),
+ CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE);
int httpsBindPort =
parameters.getIntOrDefault(
- CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY,
CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
+ Arrays.asList(CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY,
SINK_OPC_UA_HTTPS_BIND_PORT_KEY),
+ CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE);
String user =
- parameters.getStringOrDefault(CONNECTOR_IOTDB_USER_KEY,
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
+ CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
String password =
parameters.getStringOrDefault(
- CONNECTOR_IOTDB_PASSWORD_KEY,
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+ Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY,
SINK_IOTDB_PASSWORD_KEY),
+ CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
String securityDir =
parameters.getStringOrDefault(
- CONNECTOR_OPC_UA_SECURITY_DIR_KEY,
CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
+ Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY,
SINK_OPC_UA_SECURITY_DIR_KEY),
+ CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
serverKey = httpsBindPort + ":" + tcpBindPort;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 9ee87b1df54..3329fd3bcf1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.net.InetSocketAddress;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
@@ -68,7 +69,9 @@ public class WebSocketConnector implements PipeConnector {
throws Exception {
port =
parameters.getIntOrDefault(
- PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
+ Arrays.asList(
+ PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
+ PipeConnectorConstant.SINK_WEBSOCKET_PORT_KEY),
PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index ee7adb96601..c68e3047404 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -43,17 +43,23 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
public class IoTDBDataRegionExtractor implements PipeExtractor {
@@ -78,24 +84,40 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
.validateAttributeValueRange(
EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
.validateAttributeValueRange(
- EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
+ EXTRACTOR_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
+ .validateAttributeValueRange(
+ SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
+ .validateAttributeValueRange(
+ SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(),
Boolean.FALSE.toString())
.validate(
args -> (boolean) args[0] || (boolean) args[1],
- String.format(
- "Should not set both %s and %s to false.",
- EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE),
-
validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY,
true),
-
validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true));
+ "Should not set both history.enable and realtime.enable to false.",
+ validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE),
+ validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
// Validate extractor.realtime.mode
- if
(validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE,
true)) {
+ if (validator
+ .getParameters()
+ .getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
validator.validateAttributeValueRange(
- EXTRACTOR_REALTIME_MODE,
+ validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
+ ? EXTRACTOR_REALTIME_MODE_KEY
+ : SOURCE_REALTIME_MODE_KEY,
true,
- EXTRACTOR_REALTIME_MODE_FILE,
- EXTRACTOR_REALTIME_MODE_HYBRID,
- EXTRACTOR_REALTIME_MODE_LOG,
- EXTRACTOR_REALTIME_MODE_FORCED_LOG);
+ EXTRACTOR_REALTIME_MODE_FILE_VALUE,
+ EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
+ EXTRACTOR_REALTIME_MODE_LOG_VALUE,
+ EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE);
}
constructHistoricalExtractor();
@@ -112,28 +134,31 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
private void constructRealtimeExtractor(PipeParameters parameters) {
// Enable realtime extractor by default
- if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
+ if (!parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
+ EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
- LOGGER.info("'{}' is set to false, use fake realtime extractor.",
EXTRACTOR_REALTIME_ENABLE);
+ LOGGER.info(
+ "'{}' is set to false, use fake realtime extractor.",
EXTRACTOR_REALTIME_ENABLE_KEY);
return;
}
// Use hybrid mode by default
- if (!parameters.hasAttribute(EXTRACTOR_REALTIME_MODE)) {
+ if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
- LOGGER.info("'{}' is not set, use hybrid mode by default.",
EXTRACTOR_REALTIME_MODE);
+ LOGGER.info("'{}' is not set, use hybrid mode by default.",
EXTRACTOR_REALTIME_MODE_KEY);
return;
}
- switch (parameters.getString(EXTRACTOR_REALTIME_MODE)) {
- case EXTRACTOR_REALTIME_MODE_FILE:
+ switch (parameters.getString(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
+ case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
break;
- case EXTRACTOR_REALTIME_MODE_HYBRID:
- case EXTRACTOR_REALTIME_MODE_LOG:
+ case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
+ case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
break;
- case EXTRACTOR_REALTIME_MODE_FORCED_LOG:
+ case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
break;
default:
@@ -141,7 +166,7 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
"Unsupported extractor realtime mode: {}, create a hybrid
extractor.",
- parameters.getString(EXTRACTOR_REALTIME_MODE));
+ parameters.getString(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY));
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 37da7db751b..009a9f510c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -42,18 +42,25 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayDeque;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_SLOPPY_TIME_RANGE;
-import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
public class PipeHistoricalDataRegionTsFileExtractor implements
PipeHistoricalDataRegionExtractor {
@@ -99,7 +106,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
}
- pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY,
EXTRACTOR_PATTERN_DEFAULT_VALUE);
+ pattern =
+ parameters.getStringOrDefault(
+ Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
+ EXTRACTOR_PATTERN_DEFAULT_VALUE);
final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new
DataRegionId(environment.getRegionId()));
if (dataRegion != null) {
@@ -114,21 +124,32 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
// User may set the EXTRACTOR_HISTORY_START_TIME and
EXTRACTOR_HISTORY_END_TIME without
// enabling the historical data extraction, which may affect the realtime
data extraction.
final boolean isHistoricalExtractorEnabledByUser =
- parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true);
+ parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
historicalDataExtractionStartTime =
- isHistoricalExtractorEnabledByUser &&
parameters.hasAttribute(EXTRACTOR_HISTORY_START_TIME)
+ isHistoricalExtractorEnabledByUser
+ && parameters.hasAnyAttributes(
+ EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY)
? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getString(EXTRACTOR_HISTORY_START_TIME),
ZoneId.systemDefault())
+ parameters.getString(
+ EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY),
+ ZoneId.systemDefault())
: Long.MIN_VALUE;
historicalDataExtractionEndTime =
- isHistoricalExtractorEnabledByUser &&
parameters.hasAttribute(EXTRACTOR_HISTORY_END_TIME)
+ isHistoricalExtractorEnabledByUser
+ && parameters.hasAnyAttributes(
+ EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY)
? DateTimeUtils.convertDatetimeStrToLong(
- parameters.getString(EXTRACTOR_HISTORY_END_TIME),
ZoneId.systemDefault())
+ parameters.getString(EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY),
+ ZoneId.systemDefault())
: Long.MAX_VALUE;
// Enable historical extractor by default
historicalDataExtractionTimeLowerBound =
- parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true)
+ parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE)
? Long.MIN_VALUE
// We define the realtime data as the data generated after the
creation time
// of the pipe from user's perspective. But we still need to use
@@ -166,7 +187,26 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
}
- sloppyTimeRange =
parameters.getBooleanOrDefault(EXTRACTOR_HISTORY_SLOPPY_TIME_RANGE, false);
+ sloppyTimeRange =
+ Arrays.stream(
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ EXTRACTOR_HISTORY_LOOSE_RANGE_KEY,
SOURCE_HISTORY_LOOSE_RANGE_KEY),
+ "")
+ .split(","))
+ .map(String::trim)
+ .map(String::toLowerCase)
+ .collect(Collectors.toSet())
+ .contains("time");
+
+ LOGGER.info(
+ "historical data extraction time range, start time {}({}), end time
{}({}), sloppy time range {}",
+ DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
+ historicalDataExtractionStartTime,
+ DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
+ historicalDataExtractionEndTime,
+ sloppyTimeRange);
}
private void flushDataRegionAllTsFiles() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 3087a6ef41f..fb4e6e521c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -36,9 +36,13 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
+
public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor
{
protected String pipeName;
@@ -80,7 +84,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
pattern =
parameters.getStringOrDefault(
- PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
+ Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new
DataRegionId(environment.getRegionId()));
@@ -95,7 +99,9 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
isForwardingPipeRequests =
parameters.getBooleanOrDefault(
- PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
+ PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
// Metrics related to TsFileEpoch are managed in PipeExtractorMetrics.
These metrics are
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index 75c5955c491..2798d18fa61 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -33,6 +33,8 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import java.util.Arrays;
+
public class PipeTaskExtractorStage extends PipeTaskStage {
private final PipeExtractor pipeExtractor;
@@ -46,7 +48,8 @@ public class PipeTaskExtractorStage extends PipeTaskStage {
pipeExtractor =
extractorParameters
.getStringOrDefault(
- PipeExtractorConstant.EXTRACTOR_KEY,
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_KEY,
PipeExtractorConstant.SOURCE_KEY),
BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
? new IoTDBDataRegionExtractor()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index fbff28fed45..6450247a53f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,14 +68,16 @@ public class PipeConnectorSubtaskManager {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
final int connectorNum =
pipeConnectorParameters.getIntOrDefault(
- PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+ Arrays.asList(
+ PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+ PipeConnectorConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
final List<PipeConnectorSubtaskLifeCycle>
pipeConnectorSubtaskLifeCycleList =
new ArrayList<>(connectorNum);
final String connectorKey =
pipeConnectorParameters.getStringOrDefault(
- PipeConnectorConstant.CONNECTOR_KEY,
+ Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
// Shared pending queue for all subtasks
final BoundedBlockingPendingQueue<Event> pendingQueue =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 52776115030..cbad758cc55 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -605,9 +605,10 @@ public class LoadTsfileAnalyzer {
}
// check encoding
- if
(!tsFileSchema.getEncodingType().equals(iotdbSchema.getEncodingType())) {
+ if (LOGGER.isDebugEnabled()
+ &&
!tsFileSchema.getEncodingType().equals(iotdbSchema.getEncodingType())) {
// we allow a measurement to have different encodings in different
chunks
- LOGGER.warn(
+ LOGGER.debug(
"Encoding type not match, measurement: {}{}{}, "
+ "TsFile encoding: {}, IoTDB encoding: {}",
device,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
index 3d4bbbbb9b3..1914316fd28 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractorTest.java
@@ -41,10 +41,12 @@ public class IoTDBDataRegionExtractorTest {
put(
PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
Boolean.TRUE.toString());
- put(PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE,
Boolean.TRUE.toString());
put(
- PipeExtractorConstant.EXTRACTOR_REALTIME_MODE,
-
PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID);
+ PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+ Boolean.TRUE.toString());
+ put(
+ PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY,
+
PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE);
}
})));
} catch (Exception e) {