This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch test-4 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac1ac4864a3113744f291fe7bdcefb32a2e3abe2 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Jun 20 12:46:25 2025 +0800 Update ClusterConfigTaskExecutor.java --- .../config/executor/ClusterConfigTaskExecutor.java | 109 ++++++++++++--------- 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 4f4e7c7a27f..5c0d3e967d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -282,6 +282,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; @@ -531,54 +532,10 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } } - // try to create instance, this request will fail if creation is not successful - try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) { - // ensure that jar file contains the class and the class is a UDF - Class<?> clazz = Class.forName(createFunctionStatement.getClassName(), true, classLoader); - UDF udf = (UDF) clazz.getDeclaredConstructor().newInstance(); - - final TSStatus maybeValidationFailed = - udf.validate() - .map( - e -> - RpcUtils.getStatus( - TSStatusCode.UDF_LOAD_CLASS_ERROR, - String.format( - "Fail to validate UDF class [%s] for function [%s] in sandbox, reason: %s", - createFunctionStatement.getClassName(), - createFunctionStatement.getUdfName(), - e.getMessage()))) - .orElse(RpcUtils.SUCCESS_STATUS); - if (maybeValidationFailed.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn( - "Failed to validate UDF class [{}] for function [{}] in sandbox, reason: {}", - createFunctionStatement.getClassName(), - createFunctionStatement.getUdfName(), - maybeValidationFailed.getMessage()); - future.setException( - new IoTDBException( - "Failed to validate UDF class '" - + createFunctionStatement.getClassName() - + "' for function '" - + createFunctionStatement.getUdfName() - + "', reason: " - + maybeValidationFailed.getMessage(), - maybeValidationFailed.getCode())); - return future; - } - } catch (Exception e) { - LOGGER.warn( - "Failed to create function when try to create UDF({}) instance first.", - createFunctionStatement.getUdfName(), - e); - future.setException( - new IoTDBException( - "Failed to load class '" - + createFunctionStatement.getClassName() - + "', because it's not found in jar file or is invalid: " - + createFunctionStatement.getUriString(), - TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode())); - return future; + final Optional<SettableFuture<ConfigTaskResult>> maybeValidationFailed = + tryReflectAndValidateUDFInSandbox(libRoot, createFunctionStatement, future); + if (maybeValidationFailed.isPresent()) { + return maybeValidationFailed.get(); } final TSStatus executionStatus = client.createFunction(tCreateFunctionReq); @@ -598,6 +555,62 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { return future; } + private Optional<SettableFuture<ConfigTaskResult>> tryReflectAndValidateUDFInSandbox( + String libRoot, + CreateFunctionStatement createFunctionStatement, + SettableFuture<ConfigTaskResult> future) { + // try to create instance, this request will fail if creation is not successful + try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) { + // ensure that jar file contains the class and the class is a UDF + Class<?> clazz = Class.forName(createFunctionStatement.getClassName(), true, classLoader); + UDF udf = (UDF) clazz.getDeclaredConstructor().newInstance(); + + final TSStatus maybeValidationFailed = + udf.validate() + .map( + e -> + RpcUtils.getStatus( + TSStatusCode.UDF_LOAD_CLASS_ERROR, + String.format( + "Fail to validate UDF class [%s] for function [%s] in sandbox, reason: %s", + createFunctionStatement.getClassName(), + createFunctionStatement.getUdfName(), + e.getMessage()))) + .orElse(RpcUtils.SUCCESS_STATUS); + if (maybeValidationFailed.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn( + "Failed to validate UDF class [{}] for function [{}] in sandbox, reason: {}", + createFunctionStatement.getClassName(), + createFunctionStatement.getUdfName(), + maybeValidationFailed.getMessage()); + future.setException( + new IoTDBException( + "Failed to validate UDF class '" + + createFunctionStatement.getClassName() + + "' for function '" + + createFunctionStatement.getUdfName() + + "', reason: " + + maybeValidationFailed.getMessage(), + maybeValidationFailed.getCode())); + return Optional.of(future); + } + } catch (Exception e) { + LOGGER.warn( + "Failed to create function when try to create UDF({}) instance first.", + createFunctionStatement.getUdfName(), + e); + future.setException( + new IoTDBException( + "Failed to load class '" + + createFunctionStatement.getClassName() + + "', because it's not found in jar file or is invalid: " + + createFunctionStatement.getUriString(), + TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode())); + return Optional.of(future); + } + return Optional.empty(); + } + @Override public SettableFuture<ConfigTaskResult> dropFunction(String udfName) { SettableFuture<ConfigTaskResult> future = SettableFuture.create();
