This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.1 by this push:
new 542ccdec602 Pipe: Disable unstable features in the distribution
542ccdec602 is described below
commit 542ccdec602c9d79414755ecce6c47424ad341ce
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 16 17:21:05 2024 +0800
Pipe: Disable unstable features in the distribution
---
.../PipeDataRegionConnectorConstructor.java | 9 ----
.../pipe/extractor/IoTDBDataRegionExtractor.java | 63 +---------------------
.../realtime/PipeRealtimeDataRegionExtractor.java | 7 +--
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 9 ----
4 files changed, 2 insertions(+), 86 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
index 73f52c79231..bc7a65c9407 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
@@ -23,9 +23,7 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.PipeConnectorConstructor;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
-import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBAirGapConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
-import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnector;
@@ -54,12 +52,8 @@ public class PipeDataRegionConnectorConstructor extends
PipeConnectorConstructor
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
IoTDBLegacyPipeConnector::new);
- pluginConstructors.put(
- BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
IoTDBAirGapConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(),
WebSocketConnector::new);
- pluginConstructors.put(
- BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
OpcUaConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(),
DoNothingConnector::new);
pluginConstructors.put(
@@ -78,11 +72,8 @@ public class PipeDataRegionConnectorConstructor extends
PipeConnectorConstructor
pluginConstructors.put(
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(),
IoTDBLegacyPipeConnector::new);
- pluginConstructors.put(
- BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(),
IoTDBAirGapConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(),
WebSocketConnector::new);
- pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(),
OpcUaConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(),
DoNothingConnector::new);
pluginConstructors.put(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index c13f4071a67..e9653bf11f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -59,20 +59,12 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
public class IoTDBDataRegionExtractor implements PipeExtractor {
@@ -128,26 +120,6 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
- // Validate extractor.realtime.mode
- if (validator
- .getParameters()
- .getBooleanOrDefault(
- Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
- EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
- || validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY,
SOURCE_END_TIME_KEY)) {
- validator.validateAttributeValueRange(
- validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
- ? EXTRACTOR_REALTIME_MODE_KEY
- : SOURCE_REALTIME_MODE_KEY,
- true,
- EXTRACTOR_REALTIME_MODE_FILE_VALUE,
- EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
- EXTRACTOR_REALTIME_MODE_LOG_VALUE,
- EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
- EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
- EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
- }
-
// Validate source.start-time and source.end-time
if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY,
SOURCE_END_TIME_KEY)) {
if (validator
@@ -227,40 +199,7 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
return;
}
- // Use hybrid mode by default
- if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
- realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
- LOGGER.info(
- "Pipe {}@{}: '{}' is not set, use hybrid mode by default.",
- pipeName,
- dataRegionId,
- EXTRACTOR_REALTIME_MODE_KEY);
- return;
- }
-
- switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
- case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
- case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE:
- realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
- break;
- case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
- case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
- case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
- realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
- break;
- case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
- realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
- break;
- default:
- realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn(
- "Pipe {}@{}: Unsupported extractor realtime mode: {}, create a
hybrid extractor.",
- pipeName,
- dataRegionId,
- parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY));
- }
- }
+ realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 60744e6c0e6..1543045298d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -180,12 +180,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
enableSkippingTimeParseByTimePartition.set(false);
}
- isForwardingPipeRequests =
- parameters.getBooleanOrDefault(
- Arrays.asList(
- PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
- PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+ isForwardingPipeRequests = true;
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index cb55d5c5bc6..72242421328 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -20,13 +20,11 @@
package org.apache.iotdb.commons.pipe.plugin.builtin;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector;
-import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.airgap.IoTDBAirGapConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBLegacyPipeConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftAsyncConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSyncConnector;
-import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.opcua.OpcUaConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.websocket.WebSocketConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.writeback.WriteBackConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.extractor.donothing.DoNothingExtractor;
@@ -59,9 +57,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector",
IoTDBThriftSyncConnector.class),
IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector",
IoTDBThriftAsyncConnector.class),
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector",
IoTDBLegacyPipeConnector.class),
- IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector",
IoTDBAirGapConnector.class),
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
- OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class),
WRITE_BACK_CONNECTOR("write-back-connector", WriteBackConnector.class),
DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class),
@@ -70,9 +66,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink",
IoTDBThriftSyncConnector.class),
IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink",
IoTDBThriftAsyncConnector.class),
IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink",
IoTDBLegacyPipeConnector.class),
- IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class),
WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class),
- OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class),
WRITE_BACK_SINK("write-back-sink", WriteBackConnector.class),
;
@@ -116,15 +110,12 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
- IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
- OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
// Sinks
IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
- OPC_UA_SINK.getPipePluginName().toUpperCase(),
WRITE_BACK_SINK.getPipePluginName().toUpperCase())));
}