This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fd3911f4ca6 Pipe: convert the value of source, processor, sink key to
lowercase & convert `SHOW_PIPE_PLUGINS_BLACKLIST` keys to uppercase (#11487)
fd3911f4ca6 is described below
commit fd3911f4ca61ccb7537e571b1a1b181da4d8a88f
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Nov 7 14:41:45 2023 +0800
Pipe: convert the value of source, processor, sink key to lowercase &
convert `SHOW_PIPE_PLUGINS_BLACKLIST` keys to uppercase (#11487)
This commit fixes two minor bugs related to plugin names case-sensitive in
Pipe.
1. When a user creates a pipe like the following:
```sql
create pipe a2b
with connector (
'connector'='IOTDB-THRIFT-CONNECTOR',
'connector.ip'='127.0.0.1',
'connector.port'='6668'
)
```
Because the connector key here is in **uppercase**, it fails to match the
built-in plugin instance in
`org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager#CONNECTOR_CONSTRUCTORS`
(stored in **lowercase**).
2. `SHOW_PIPE_PLUGINS_BLACKLIST` do not take effect in `show pipeplugins`
statement, this is because plugin names in Pipe are stored in **uppercase** in
`org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper#pipePluginNameToMetaMap`.
---
After this PR, when parsing Pipe plugin from the parameters, it goes
through the following two stages:
1. Obtain the plugin name from the parameters and convert it to
**lowercase**. This is done to facilitate matching and retrieval of the
built-in plugin instance.
2. If the initial match fails, the plugin name is converted to
**uppercase** and matched against
`org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMetaKeeper#pipePluginNameToMetaMap`.
This allows for the reflection of custom plugin instances.
---
* fix: convert the value of source, processor, sink key to lowercase for
matching
* fix: convert SHOW_PIPE_PLUGINS_BLACKLIST keys to upper case
---
.../db/pipe/task/stage/PipeTaskExtractorStage.java | 11 +++++---
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 2 ++
.../connector/PipeConnectorSubtaskManager.java | 11 +++++---
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 31 +++++++++++-----------
4 files changed, 34 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
index 83fd4d8e098..1b4ed57251b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskExtractorStage.java
@@ -45,10 +45,15 @@ public class PipeTaskExtractorStage extends PipeTaskStage {
PipeParameters extractorParameters,
TConsensusGroupId dataRegionId,
PipeTaskMeta pipeTaskMeta) {
+ // Convert the value of `EXTRACTOR_KEY` or `SOURCE_KEY` to lowercase for
matching
+ // `IOTDB_EXTRACTOR`
final String pluginName =
- extractorParameters.getStringOrDefault(
- Arrays.asList(PipeExtractorConstant.EXTRACTOR_KEY,
PipeExtractorConstant.SOURCE_KEY),
- BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName());
+ extractorParameters
+ .getStringOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_KEY,
PipeExtractorConstant.SOURCE_KEY),
+ BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ .toLowerCase();
pipeExtractor =
pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
||
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 66d48365b04..6face1a58fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -62,11 +62,13 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
TConsensusGroupId dataRegionId,
EventSupplier pipeExtractorInputEventSupplier,
BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+ // Convert the value of `PROCESSOR_KEY` to lowercase for matching
`DO_NOTHING_PROCESSOR`
final PipeProcessor pipeProcessor =
pipeProcessorParameters
.getStringOrDefault(
PipeProcessorConstant.PROCESSOR_KEY,
BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
+ .toLowerCase()
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
? new DoNothingProcessor()
: PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 76fa00cd652..c1c75b49ba4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -77,10 +77,15 @@ public class PipeConnectorSubtaskManager {
final List<PipeConnectorSubtaskLifeCycle>
pipeConnectorSubtaskLifeCycleList =
new ArrayList<>(connectorNum);
+ // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase for
matching in
+ // `CONNECTOR_CONSTRUCTORS`
final String connectorKey =
- pipeConnectorParameters.getStringOrDefault(
- Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY),
- BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
+ pipeConnectorParameters
+ .getStringOrDefault(
+ Arrays.asList(
+ PipeConnectorConstant.CONNECTOR_KEY,
PipeConnectorConstant.SINK_KEY),
+ BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+ .toLowerCase();
// Shared pending queue for all subtasks
final BoundedBlockingPendingQueue<Event> pendingQueue =
new BoundedBlockingPendingQueue<>(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 70814c34b45..96c6bbae38f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -91,20 +91,21 @@ public enum BuiltinPipePlugin {
public static final Set<String> SHOW_PIPE_PLUGINS_BLACKLIST = new
HashSet<>();
static {
- SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName());
-
- SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
- SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName());
- SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName());
-
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName());
- SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_SINK.getPipePluginName());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName().toUpperCase());
+
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase());
+
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase());
+
SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_SINK.getPipePluginName().toUpperCase());
}
}