This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fd3911f4ca6 Pipe: convert the value of source, processor, sink key to 
lowercase & convert `SHOW_PIPE_PLUGINS_BLACKLIST` keys to uppercase (#11487)
fd3911f4ca6 is described below

commit fd3911f4ca61ccb7537e571b1a1b181da4d8a88f
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Nov 7 14:41:45 2023 +0800

    Pipe: convert the value of source, processor, sink key to lowercase & 
convert `SHOW_PIPE_PLUGINS_BLACKLIST` keys to uppercase (#11487)
    
    This commit fixes two minor bugs related to plugin names case-sensitive in 
Pipe.
    
    1. When a user creates a pipe like the following:
    
    ```sql
    create pipe a2b
    with connector (
        'connector'='IOTDB-THRIFT-CONNECTOR',
        'connector.ip'='127.0.0.1',
        'connector.port'='6668'
    )
    ```
    Because the connector key here is in **uppercase**, it fails to match the 
built-in plugin instance in 
`org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager#CONNECTOR_CONSTRUCTORS`
 (stored in **lowercase**).
    
    2. `SHOW_PIPE_PLUGINS_BLACKLIST` do not take effect in `show pipeplugins` 
statement, this is because plugin names in Pipe are stored in **uppercase** in 
`org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper#pipePluginNameToMetaMap`.
    
    ---
    
    After this PR, when parsing Pipe plugin from the parameters, it goes 
through the following two stages:
    
    1. Obtain the plugin name from the parameters and convert it to 
**lowercase**. This is done to facilitate matching and retrieval of the 
built-in plugin instance.
    
    2. If the initial match fails, the plugin name is converted to 
**uppercase** and matched against 
`org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper#pipePluginNameToMetaMap`.
 This allows for the reflection of custom plugin instances.
    
    ---
    
    * fix: convert the value of source, processor, sink key to lowercase for 
matching
    
    * fix: convert SHOW_PIPE_PLUGINS_BLACKLIST keys to upper case
---
 .../db/pipe/task/stage/PipeTaskExtractorStage.java | 11 +++++---
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  2 ++
 .../connector/PipeConnectorSubtaskManager.java     | 11 +++++---
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     | 31 +++++++++++-----------
 4 files changed, 34 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index 83fd4d8e098..1b4ed57251b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -45,10 +45,15 @@ public class PipeTaskExtractorStage extends PipeTaskStage {
       PipeParameters extractorParameters,
       TConsensusGroupId dataRegionId,
       PipeTaskMeta pipeTaskMeta) {
+    // Convert the value of `EXTRACTOR_KEY` or `SOURCE_KEY` to lowercase for 
matching
+    // `IOTDB_EXTRACTOR`
     final String pluginName =
-        extractorParameters.getStringOrDefault(
-            Arrays.asList(PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
-            BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName());
+        extractorParameters
+            .getStringOrDefault(
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_KEY, 
PipeExtractorConstant.SOURCE_KEY),
+                BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+            .toLowerCase();
     pipeExtractor =
         
pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
                 || 
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 66d48365b04..6face1a58fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -62,11 +62,13 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       TConsensusGroupId dataRegionId,
       EventSupplier pipeExtractorInputEventSupplier,
       BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+    // Convert the value of `PROCESSOR_KEY` to lowercase for matching 
`DO_NOTHING_PROCESSOR`
     final PipeProcessor pipeProcessor =
         pipeProcessorParameters
                 .getStringOrDefault(
                     PipeProcessorConstant.PROCESSOR_KEY,
                     BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
+                .toLowerCase()
                 
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
             ? new DoNothingProcessor()
             : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 76fa00cd652..c1c75b49ba4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -77,10 +77,15 @@ public class PipeConnectorSubtaskManager {
       final List<PipeConnectorSubtaskLifeCycle> 
pipeConnectorSubtaskLifeCycleList =
           new ArrayList<>(connectorNum);
 
+      // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase for 
matching in
+      // `CONNECTOR_CONSTRUCTORS`
       final String connectorKey =
-          pipeConnectorParameters.getStringOrDefault(
-              Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY),
-              BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
+          pipeConnectorParameters
+              .getStringOrDefault(
+                  Arrays.asList(
+                      PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY),
+                  BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+              .toLowerCase();
       // Shared pending queue for all subtasks
       final BoundedBlockingPendingQueue<Event> pendingQueue =
           new BoundedBlockingPendingQueue<>(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 70814c34b45..96c6bbae38f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -91,20 +91,21 @@ public enum BuiltinPipePlugin {
   public static final Set<String> SHOW_PIPE_PLUGINS_BLACKLIST = new 
HashSet<>();
 
   static {
-    SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName());
-
-    SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
-    SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName());
-    SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName());
-
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName());
-    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName());
-    SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_SINK.getPipePluginName());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName().toUpperCase());
+
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase());
+
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase());
+    
SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_SINK.getPipePluginName().toUpperCase());
   }
 }

Reply via email to