This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 80d5aea545d [To dev/1.3] PipePlugin: Optimized the errorCode && Fixed 
the case-sensitive semantic (#16851) (#16852)
80d5aea545d is described below

commit 80d5aea545d807554704dea0636bd94e6b901834
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 4 18:38:37 2025 +0800

    [To dev/1.3] PipePlugin: Optimized the errorCode && Fixed the 
case-sensitive semantic (#16851) (#16852)
    
    * PipePlugin: Optimized the errorCode && Fixed the case-sensitive semantic 
(#16851)
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * added
---
 .../pipe/it/autocreate/IoTDBPipeSyntaxIT.java      |  61 +++++++++
 .../config/executor/ClusterConfigTaskExecutor.java | 150 +++++++++++----------
 .../execution/ClusterConfigTaskExecutorTest.java   |  83 ++++++++++++
 3 files changed, 221 insertions(+), 73 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
index b9de28b7158..26cb3e9e733 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.io.File;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -729,4 +730,64 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeDualAutoIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testPipePluginValidation() {
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      try {
+        statement.execute(
+            "create pipePlugin TestProcessor as 
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'xxx'");
+        fail();
+      } catch (final SQLException e) {
+        Assert.assertEquals(
+            "701: Untrusted uri xxx, current trusted_uri_pattern is file:.*", 
e.getMessage());
+      }
+      try {
+        statement.execute(
+            "create pipePlugin TestProcessor as 
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'file:.*'");
+        fail();
+      } catch (final SQLException e) {
+        Assert.assertEquals("701: URI is not hierarchical", e.getMessage());
+      }
+      try {
+        statement.execute(
+            String.format(
+                "create pipePlugin TestProcessor as 
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI '%s'",
+                new File(
+                            System.getProperty("user.dir")
+                                + File.separator
+                                + "target"
+                                + File.separator
+                                + "test-classes"
+                                + File.separator)
+                        .toURI()
+                    + "PipePlugin.jar"));
+        fail();
+      } catch (final SQLException e) {
+        Assert.assertEquals(
+            "1603: Failed to get executable for PipePlugin TestProcessor, 
please check the URI.",
+            e.getMessage());
+      }
+      try {
+        statement.execute("drop pipePlugin test_processor");
+        fail();
+      } catch (final SQLException e) {
+        Assert.assertEquals(
+            "1601: Failed to drop PipePlugin [TEST_PROCESSOR], this PipePlugin 
has not been created",
+            e.getMessage());
+      }
+      try {
+        statement.execute("drop pipePlugin `Do-Nothing-Sink`");
+        fail();
+      } catch (final SQLException e) {
+        Assert.assertEquals(
+            "1601: Failed to drop PipePlugin [DO-NOTHING-SINK], the PipePlugin 
is a built-in PipePlugin",
+            e.getMessage());
+      }
+    } catch (final SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 1ca57bea55f..b711b652829 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -821,85 +821,89 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       return future;
     }
 
-    try (final ConfigNodeClient client =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final String libRoot;
-      final ByteBuffer jarFile;
-      final String jarMd5;
+    final String libRoot;
+    final ByteBuffer jarFile;
+    final String jarMd5;
 
-      final String jarFileName = new File(uriString).getName();
-      try {
-        final URI uri = new URI(uriString);
-        if (uri.getScheme() == null) {
-          future.setException(
-              new IoTDBException(
-                  "The scheme of URI is not set, please specify the scheme of 
URI.",
-                  TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
-          return future;
-        }
-        if (!uri.getScheme().equals("file")) {
-          // Download executable
-          final ExecutableResource resource =
-              PipePluginExecutableManager.getInstance()
-                  .request(Collections.singletonList(uriString));
-          final String jarFilePathUnderTempDir =
-              PipePluginExecutableManager.getInstance()
-                      
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
-                  + jarFileName;
-          // libRoot should be the path of the specified jar
-          libRoot = jarFilePathUnderTempDir;
-          jarFile = 
ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
-          jarMd5 = 
DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
-        } else {
-          // libRoot should be the path of the specified jar
-          libRoot = new File(new URI(uriString)).getAbsolutePath();
-          // If jarPath is a file path on datanode, we transfer it to 
ByteBuffer and send it to
-          // ConfigNode.
-          jarFile = ExecutableManager.transferToBytebuffer(libRoot);
-          // Set md5 of the jar file
-          jarMd5 = 
DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
-        }
-      } catch (final IOException | URISyntaxException e) {
-        LOGGER.warn(
-            "Failed to get executable for PipePlugin({}) using URI: {}.",
-            createPipePluginStatement.getPluginName(),
-            createPipePluginStatement.getUriString(),
-            e);
+    final String jarFileName = new File(uriString).getName();
+    try {
+      final URI uri = new URI(uriString);
+      if (uri.getScheme() == null) {
         future.setException(
             new IoTDBException(
-                "Failed to get executable for PipePlugin"
-                    + createPipePluginStatement.getPluginName()
-                    + "', please check the URI.",
+                "The scheme of URI is not set, please specify the scheme of 
URI.",
                 TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
         return future;
       }
+      if (!uri.getScheme().equals("file")) {
+        // Download executable
+        final ExecutableResource resource =
+            
PipePluginExecutableManager.getInstance().request(Collections.singletonList(uriString));
+        final String jarFilePathUnderTempDir =
+            PipePluginExecutableManager.getInstance()
+                    
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
+                + jarFileName;
+        // libRoot should be the path of the specified jar
+        libRoot = jarFilePathUnderTempDir;
+        jarFile = 
ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
+        jarMd5 = 
DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
+      } else {
+        // libRoot should be the path of the specified jar
+        libRoot = new File(new URI(uriString)).getAbsolutePath();
+        // If jarPath is a file path on datanode, we transfer it to ByteBuffer 
and send it to
+        // ConfigNode.
+        jarFile = ExecutableManager.transferToBytebuffer(libRoot);
+        // Set md5 of the jar file
+        jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
+      }
+    } catch (final URISyntaxException | IllegalArgumentException e) {
+      future.setException(
+          new IoTDBException(e.getMessage(), 
TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
+      return future;
+    } catch (final IOException e) {
+      LOGGER.warn(
+          "Failed to get executable for PipePlugin({}) using URI: {}.",
+          createPipePluginStatement.getPluginName(),
+          createPipePluginStatement.getUriString(),
+          e);
+      future.setException(
+          new IoTDBException(
+              "Failed to get executable for PipePlugin "
+                  + createPipePluginStatement.getPluginName()
+                  + ", please check the URI.",
+              TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+      return future;
+    }
 
-      // try to create instance, this request will fail if creation is not 
successful
-      try (final PipePluginClassLoader classLoader = new 
PipePluginClassLoader(libRoot)) {
-        // ensure that jar file contains the class and the class is a pipe 
plugin
-        final Class<?> clazz =
-            Class.forName(createPipePluginStatement.getClassName(), true, 
classLoader);
-        final PipePlugin ignored = (PipePlugin) 
clazz.getDeclaredConstructor().newInstance();
-      } catch (final ClassNotFoundException
-          | NoSuchMethodException
-          | InstantiationException
-          | IllegalAccessException
-          | InvocationTargetException
-          | ClassCastException e) {
-        LOGGER.warn(
-            "Failed to create function when try to create PipePlugin({}) 
instance first.",
-            createPipePluginStatement.getPluginName(),
-            e);
-        future.setException(
-            new IoTDBException(
-                "Failed to load class '"
-                    + createPipePluginStatement.getClassName()
-                    + "', because it's not found in jar file or is invalid: "
-                    + createPipePluginStatement.getUriString(),
-                TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
-        return future;
-      }
+    // try to create instance, this request will fail if creation is not 
successful
+    try (final PipePluginClassLoader classLoader = new 
PipePluginClassLoader(libRoot)) {
+      // ensure that jar file contains the class and the class is a pipe plugin
+      final Class<?> clazz =
+          Class.forName(createPipePluginStatement.getClassName(), true, 
classLoader);
+      final PipePlugin ignored = (PipePlugin) 
clazz.getDeclaredConstructor().newInstance();
+    } catch (final ClassNotFoundException
+        | NoSuchMethodException
+        | InstantiationException
+        | IllegalAccessException
+        | InvocationTargetException
+        | ClassCastException
+        | IOException e) {
+      LOGGER.warn(
+          "Failed to create pipePlugin when try to create PipePlugin({}) 
instance first.",
+          createPipePluginStatement.getPluginName(),
+          e);
+      future.setException(
+          new IoTDBException(
+              "Failed to load class '"
+                  + createPipePluginStatement.getClassName()
+                  + "', because it's not found in jar file or is invalid: "
+                  + createPipePluginStatement.getUriString(),
+              TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
+      return future;
+    }
 
+    try (final ConfigNodeClient client =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       final TSStatus executionStatus =
           client.createPipePlugin(
               new TCreatePipePluginReq()
@@ -924,7 +928,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
-    } catch (final ClientManagerException | TException | IOException e) {
+    } catch (final ClientManagerException | TException e) {
       future.setException(e);
     }
     return future;
@@ -939,7 +943,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       final TSStatus executionStatus =
           client.dropPipePlugin(
               new TDropPipePluginReq()
-                  .setPluginName(dropPipePluginStatement.getPluginName())
+                  
.setPluginName(dropPipePluginStatement.getPluginName().toUpperCase())
                   
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()));
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
executionStatus.getCode()) {
         LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/ClusterConfigTaskExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/ClusterConfigTaskExecutorTest.java
new file mode 100644
index 00000000000..fa974463e5c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/ClusterConfigTaskExecutorTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution;
+
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.fail;
+
+public class ClusterConfigTaskExecutorTest {
+
+  @Test
+  public void testPipePlugin() {
+    try {
+      ClusterConfigTaskExecutor.getInstance()
+          .createPipePlugin(
+              new CreatePipePluginStatement("TestProcessor", false, 
"someClass", "uri"))
+          .get();
+      Assert.fail();
+    } catch (final Exception e) {
+      Assert.assertTrue(
+          e.getMessage()
+              .contains("The scheme of URI is not set, please specify the 
scheme of URI."));
+    }
+
+    try {
+      ClusterConfigTaskExecutor.getInstance()
+          .createPipePlugin(
+              new CreatePipePluginStatement("TestProcessor", false, 
"someClass", "file:.*"))
+          .get();
+      Assert.fail();
+    } catch (final Exception e) {
+      Assert.assertTrue(e.getMessage().contains("URI is not hierarchical"));
+    }
+
+    try {
+      ClusterConfigTaskExecutor.getInstance()
+          .createPipePlugin(
+              new CreatePipePluginStatement(
+                  "TestProcessor",
+                  false,
+                  "org.apache.iotdb.db.pipe.example.TestProcessor",
+                  new File(
+                              System.getProperty("user.dir")
+                                  + File.separator
+                                  + "target"
+                                  + File.separator
+                                  + "test-classes"
+                                  + File.separator)
+                          .toURI()
+                      + "PipePlugin.jar"))
+          .get();
+      fail();
+    } catch (final Exception e) {
+      Assert.assertTrue(
+          e.getMessage()
+              .contains(
+                  "Failed to get executable for PipePlugin TestProcessor, 
please check the URI."));
+    }
+  }
+}

Reply via email to