This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 80d5aea545d [To dev/1.3] PipePlugin: Optimized the errorCode && Fixed
the case-sensitive semantic (#16851) (#16852)
80d5aea545d is described below
commit 80d5aea545d807554704dea0636bd94e6b901834
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 4 18:38:37 2025 +0800
[To dev/1.3] PipePlugin: Optimized the errorCode && Fixed the
case-sensitive semantic (#16851) (#16852)
* PipePlugin: Optimized the errorCode && Fixed the case-sensitive semantic
(#16851)
* fix
* fix
* fix
* fix
* added
---
.../pipe/it/autocreate/IoTDBPipeSyntaxIT.java | 61 +++++++++
.../config/executor/ClusterConfigTaskExecutor.java | 150 +++++++++++----------
.../execution/ClusterConfigTaskExecutorTest.java | 83 ++++++++++++
3 files changed, 221 insertions(+), 73 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
index b9de28b7158..26cb3e9e733 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
@@ -34,6 +34,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.io.File;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -729,4 +730,64 @@ public class IoTDBPipeSyntaxIT extends
AbstractPipeDualAutoIT {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testPipePluginValidation() {
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ try {
+ statement.execute(
+ "create pipePlugin TestProcessor as
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'xxx'");
+ fail();
+ } catch (final SQLException e) {
+ Assert.assertEquals(
+ "701: Untrusted uri xxx, current trusted_uri_pattern is file:.*",
e.getMessage());
+ }
+ try {
+ statement.execute(
+ "create pipePlugin TestProcessor as
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI 'file:.*'");
+ fail();
+ } catch (final SQLException e) {
+ Assert.assertEquals("701: URI is not hierarchical", e.getMessage());
+ }
+ try {
+ statement.execute(
+ String.format(
+ "create pipePlugin TestProcessor as
'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI '%s'",
+ new File(
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + "test-classes"
+ + File.separator)
+ .toURI()
+ + "PipePlugin.jar"));
+ fail();
+ } catch (final SQLException e) {
+ Assert.assertEquals(
+ "1603: Failed to get executable for PipePlugin TestProcessor,
please check the URI.",
+ e.getMessage());
+ }
+ try {
+ statement.execute("drop pipePlugin test_processor");
+ fail();
+ } catch (final SQLException e) {
+ Assert.assertEquals(
+ "1601: Failed to drop PipePlugin [TEST_PROCESSOR], this PipePlugin
has not been created",
+ e.getMessage());
+ }
+ try {
+ statement.execute("drop pipePlugin `Do-Nothing-Sink`");
+ fail();
+ } catch (final SQLException e) {
+ Assert.assertEquals(
+ "1601: Failed to drop PipePlugin [DO-NOTHING-SINK], the PipePlugin
is a built-in PipePlugin",
+ e.getMessage());
+ }
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 1ca57bea55f..b711b652829 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -821,85 +821,89 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
- try (final ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final String libRoot;
- final ByteBuffer jarFile;
- final String jarMd5;
+ final String libRoot;
+ final ByteBuffer jarFile;
+ final String jarMd5;
- final String jarFileName = new File(uriString).getName();
- try {
- final URI uri = new URI(uriString);
- if (uri.getScheme() == null) {
- future.setException(
- new IoTDBException(
- "The scheme of URI is not set, please specify the scheme of
URI.",
- TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
- return future;
- }
- if (!uri.getScheme().equals("file")) {
- // Download executable
- final ExecutableResource resource =
- PipePluginExecutableManager.getInstance()
- .request(Collections.singletonList(uriString));
- final String jarFilePathUnderTempDir =
- PipePluginExecutableManager.getInstance()
-
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
- + jarFileName;
- // libRoot should be the path of the specified jar
- libRoot = jarFilePathUnderTempDir;
- jarFile =
ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
- jarMd5 =
DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
- } else {
- // libRoot should be the path of the specified jar
- libRoot = new File(new URI(uriString)).getAbsolutePath();
- // If jarPath is a file path on datanode, we transfer it to
ByteBuffer and send it to
- // ConfigNode.
- jarFile = ExecutableManager.transferToBytebuffer(libRoot);
- // Set md5 of the jar file
- jarMd5 =
DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
- }
- } catch (final IOException | URISyntaxException e) {
- LOGGER.warn(
- "Failed to get executable for PipePlugin({}) using URI: {}.",
- createPipePluginStatement.getPluginName(),
- createPipePluginStatement.getUriString(),
- e);
+ final String jarFileName = new File(uriString).getName();
+ try {
+ final URI uri = new URI(uriString);
+ if (uri.getScheme() == null) {
future.setException(
new IoTDBException(
- "Failed to get executable for PipePlugin"
- + createPipePluginStatement.getPluginName()
- + "', please check the URI.",
+ "The scheme of URI is not set, please specify the scheme of
URI.",
TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
+ if (!uri.getScheme().equals("file")) {
+ // Download executable
+ final ExecutableResource resource =
+
PipePluginExecutableManager.getInstance().request(Collections.singletonList(uriString));
+ final String jarFilePathUnderTempDir =
+ PipePluginExecutableManager.getInstance()
+
.getDirStringUnderTempRootByRequestId(resource.getRequestId())
+ + jarFileName;
+ // libRoot should be the path of the specified jar
+ libRoot = jarFilePathUnderTempDir;
+ jarFile =
ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
+ jarMd5 =
DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
+ } else {
+ // libRoot should be the path of the specified jar
+ libRoot = new File(new URI(uriString)).getAbsolutePath();
+ // If jarPath is a file path on datanode, we transfer it to ByteBuffer
and send it to
+ // ConfigNode.
+ jarFile = ExecutableManager.transferToBytebuffer(libRoot);
+ // Set md5 of the jar file
+ jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
+ }
+ } catch (final URISyntaxException | IllegalArgumentException e) {
+ future.setException(
+ new IoTDBException(e.getMessage(),
TSStatusCode.SEMANTIC_ERROR.getStatusCode()));
+ return future;
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "Failed to get executable for PipePlugin({}) using URI: {}.",
+ createPipePluginStatement.getPluginName(),
+ createPipePluginStatement.getUriString(),
+ e);
+ future.setException(
+ new IoTDBException(
+ "Failed to get executable for PipePlugin "
+ + createPipePluginStatement.getPluginName()
+ + ", please check the URI.",
+ TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+ return future;
+ }
- // try to create instance, this request will fail if creation is not
successful
- try (final PipePluginClassLoader classLoader = new
PipePluginClassLoader(libRoot)) {
- // ensure that jar file contains the class and the class is a pipe
plugin
- final Class<?> clazz =
- Class.forName(createPipePluginStatement.getClassName(), true,
classLoader);
- final PipePlugin ignored = (PipePlugin)
clazz.getDeclaredConstructor().newInstance();
- } catch (final ClassNotFoundException
- | NoSuchMethodException
- | InstantiationException
- | IllegalAccessException
- | InvocationTargetException
- | ClassCastException e) {
- LOGGER.warn(
- "Failed to create function when try to create PipePlugin({})
instance first.",
- createPipePluginStatement.getPluginName(),
- e);
- future.setException(
- new IoTDBException(
- "Failed to load class '"
- + createPipePluginStatement.getClassName()
- + "', because it's not found in jar file or is invalid: "
- + createPipePluginStatement.getUriString(),
- TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
- return future;
- }
+ // try to create instance, this request will fail if creation is not
successful
+ try (final PipePluginClassLoader classLoader = new
PipePluginClassLoader(libRoot)) {
+ // ensure that jar file contains the class and the class is a pipe plugin
+ final Class<?> clazz =
+ Class.forName(createPipePluginStatement.getClassName(), true,
classLoader);
+ final PipePlugin ignored = (PipePlugin)
clazz.getDeclaredConstructor().newInstance();
+ } catch (final ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException
+ | ClassCastException
+ | IOException e) {
+ LOGGER.warn(
+ "Failed to create pipePlugin when try to create PipePlugin({})
instance first.",
+ createPipePluginStatement.getPluginName(),
+ e);
+ future.setException(
+ new IoTDBException(
+ "Failed to load class '"
+ + createPipePluginStatement.getClassName()
+ + "', because it's not found in jar file or is invalid: "
+ + createPipePluginStatement.getUriString(),
+ TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
+ return future;
+ }
+ try (final ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus executionStatus =
client.createPipePlugin(
new TCreatePipePluginReq()
@@ -924,7 +928,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (final ClientManagerException | TException | IOException e) {
+ } catch (final ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -939,7 +943,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final TSStatus executionStatus =
client.dropPipePlugin(
new TDropPipePluginReq()
- .setPluginName(dropPipePluginStatement.getPluginName())
+
.setPluginName(dropPipePluginStatement.getPluginName().toUpperCase())
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
executionStatus.getCode()) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/ClusterConfigTaskExecutorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/ClusterConfigTaskExecutorTest.java
new file mode 100644
index 00000000000..fa974463e5c
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/ClusterConfigTaskExecutorTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution;
+
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.fail;
+
+public class ClusterConfigTaskExecutorTest {
+
+ @Test
+ public void testPipePlugin() {
+ try {
+ ClusterConfigTaskExecutor.getInstance()
+ .createPipePlugin(
+ new CreatePipePluginStatement("TestProcessor", false,
"someClass", "uri"))
+ .get();
+ Assert.fail();
+ } catch (final Exception e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains("The scheme of URI is not set, please specify the
scheme of URI."));
+ }
+
+ try {
+ ClusterConfigTaskExecutor.getInstance()
+ .createPipePlugin(
+ new CreatePipePluginStatement("TestProcessor", false,
"someClass", "file:.*"))
+ .get();
+ Assert.fail();
+ } catch (final Exception e) {
+ Assert.assertTrue(e.getMessage().contains("URI is not hierarchical"));
+ }
+
+ try {
+ ClusterConfigTaskExecutor.getInstance()
+ .createPipePlugin(
+ new CreatePipePluginStatement(
+ "TestProcessor",
+ false,
+ "org.apache.iotdb.db.pipe.example.TestProcessor",
+ new File(
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + "test-classes"
+ + File.separator)
+ .toURI()
+ + "PipePlugin.jar"))
+ .get();
+ fail();
+ } catch (final Exception e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains(
+ "Failed to get executable for PipePlugin TestProcessor,
please check the URI."));
+ }
+ }
+}