This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.1 by this push:
     new 542ccdec602 Pipe: Disable unstable features in the distribution
542ccdec602 is described below

commit 542ccdec602c9d79414755ecce6c47424ad341ce
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Apr 16 17:21:05 2024 +0800

    Pipe: Disable unstable features in the distribution
---
 .../PipeDataRegionConnectorConstructor.java        |  9 ----
 .../pipe/extractor/IoTDBDataRegionExtractor.java   | 63 +---------------------
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  7 +--
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |  9 ----
 4 files changed, 2 insertions(+), 86 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
index 73f52c79231..bc7a65c9407 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
@@ -23,9 +23,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.PipeConnectorConstructor;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector;
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
-import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBAirGapConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
-import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnector;
@@ -54,12 +52,8 @@ public class PipeDataRegionConnectorConstructor extends 
PipeConnectorConstructor
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
         IoTDBLegacyPipeConnector::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(), 
IoTDBAirGapConnector::new);
     pluginConstructors.put(
         BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), 
WebSocketConnector::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), 
OpcUaConnector::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), 
DoNothingConnector::new);
     pluginConstructors.put(
@@ -78,11 +72,8 @@ public class PipeDataRegionConnectorConstructor extends 
PipeConnectorConstructor
     pluginConstructors.put(
         BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(),
         IoTDBLegacyPipeConnector::new);
-    pluginConstructors.put(
-        BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), 
IoTDBAirGapConnector::new);
     pluginConstructors.put(
         BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), 
WebSocketConnector::new);
-    pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), 
OpcUaConnector::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), 
DoNothingConnector::new);
     pluginConstructors.put(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index c13f4071a67..e9653bf11f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -59,20 +59,12 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
 
 public class IoTDBDataRegionExtractor implements PipeExtractor {
@@ -128,26 +120,6 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
                     Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
                     EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
 
-    // Validate extractor.realtime.mode
-    if (validator
-            .getParameters()
-            .getBooleanOrDefault(
-                Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
-                EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
-        || validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, 
SOURCE_END_TIME_KEY)) {
-      validator.validateAttributeValueRange(
-          validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
-              ? EXTRACTOR_REALTIME_MODE_KEY
-              : SOURCE_REALTIME_MODE_KEY,
-          true,
-          EXTRACTOR_REALTIME_MODE_FILE_VALUE,
-          EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
-          EXTRACTOR_REALTIME_MODE_LOG_VALUE,
-          EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
-          EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
-          EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
-    }
-
     // Validate source.start-time and source.end-time
     if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, 
SOURCE_END_TIME_KEY)) {
       if (validator
@@ -227,40 +199,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
       return;
     }
 
-    // Use hybrid mode by default
-    if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
-      realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
-      LOGGER.info(
-          "Pipe {}@{}: '{}' is not set, use hybrid mode by default.",
-          pipeName,
-          dataRegionId,
-          EXTRACTOR_REALTIME_MODE_KEY);
-      return;
-    }
-
-    switch (parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
-      case EXTRACTOR_REALTIME_MODE_FILE_VALUE:
-      case EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE:
-        realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
-        break;
-      case EXTRACTOR_REALTIME_MODE_HYBRID_VALUE:
-      case EXTRACTOR_REALTIME_MODE_LOG_VALUE:
-      case EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE:
-        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
-        break;
-      case EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE:
-        realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
-        break;
-      default:
-        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
-        if (LOGGER.isWarnEnabled()) {
-          LOGGER.warn(
-              "Pipe {}@{}: Unsupported extractor realtime mode: {}, create a 
hybrid extractor.",
-              pipeName,
-              dataRegionId,
-              parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
-        }
-    }
+    realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 60744e6c0e6..1543045298d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -180,12 +180,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
       enableSkippingTimeParseByTimePartition.set(false);
     }
 
-    isForwardingPipeRequests =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(
-                PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
-                PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
-            
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+    isForwardingPipeRequests = true;
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index cb55d5c5bc6..72242421328 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -20,13 +20,11 @@
 package org.apache.iotdb.commons.pipe.plugin.builtin;
 
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector;
-import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.airgap.IoTDBAirGapConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBLegacyPipeConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSyncConnector;
-import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.opcua.OpcUaConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.websocket.WebSocketConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.writeback.WriteBackConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.extractor.donothing.DoNothingExtractor;
@@ -59,9 +57,7 @@ public enum BuiltinPipePlugin {
   IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", 
IoTDBThriftSyncConnector.class),
   IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", 
IoTDBThriftAsyncConnector.class),
   IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", 
IoTDBLegacyPipeConnector.class),
-  IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", 
IoTDBAirGapConnector.class),
   WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
-  OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class),
   WRITE_BACK_CONNECTOR("write-back-connector", WriteBackConnector.class),
 
   DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class),
@@ -70,9 +66,7 @@ public enum BuiltinPipePlugin {
   IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", 
IoTDBThriftSyncConnector.class),
   IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", 
IoTDBThriftAsyncConnector.class),
   IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", 
IoTDBLegacyPipeConnector.class),
-  IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class),
   WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class),
-  OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class),
   WRITE_BACK_SINK("write-back-sink", WriteBackConnector.class),
   ;
 
@@ -116,15 +110,12 @@ public enum BuiltinPipePlugin {
                   
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
-                  IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
                   WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
-                  OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
                   WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
                   // Sinks
                   IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
                   IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
                   IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
                   WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
-                  OPC_UA_SINK.getPipePluginName().toUpperCase(),
                   WRITE_BACK_SINK.getPipePluginName().toUpperCase())));
 }

Reply via email to