This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch test-4 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 16faa06890cdc8f42b7657624108041d61564a94 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Jun 20 13:20:30 2025 +0800 execute in a thread sandbox --- .../config/executor/ClusterConfigTaskExecutor.java | 56 ++++++++++++++++++++++ .../apache/iotdb/commons/conf/CommonConfig.java | 10 ++++ .../iotdb/commons/conf/CommonDescriptor.java | 5 ++ 3 files changed, 71 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 5c0d3e967d1..d1139585822 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -256,6 +256,7 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.trigger.api.Trigger; import org.apache.iotdb.trigger.api.enums.FailureStrategy; import org.apache.iotdb.udf.api.UDF; +import org.apache.iotdb.udf.api.exception.UDFException; import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.codec.digest.DigestUtils; @@ -285,6 +286,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.apache.iotdb.db.protocol.client.ConfigNodeClient.MSG_RECONNECTION_FAIL; @@ -559,6 +566,55 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { String libRoot, CreateFunctionStatement createFunctionStatement, SettableFuture<ConfigTaskResult> future) { + try { + // Execute in a thread sandbox to prevent UDF from blocking the main thread + return executeInThreadSandboxWithTimeout( + () -> tryReflectAndValidateUDF(libRoot, createFunctionStatement, future), + CommonDescriptor.getInstance().getConfig().getUdfValidationTimeoutMs(), + TimeUnit.MILLISECONDS); + } catch (final Exception e) { + // If the validation fails, we set the exception to the future + LOGGER.warn( + "Failed to validate UDF class [{}] for function [{}] in sandbox, reason: {}", + createFunctionStatement.getClassName(), + createFunctionStatement.getUdfName(), + e.getMessage(), + e); + future.setException( + new IoTDBException( + "Failed to validate UDF class '" + + createFunctionStatement.getClassName() + + "' for function '" + + createFunctionStatement.getUdfName() + + "', reason: " + + e.getMessage(), + TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode())); + return Optional.of(future); + } + } + + private static Optional<SettableFuture<ConfigTaskResult>> executeInThreadSandboxWithTimeout( + Callable<Optional<SettableFuture<ConfigTaskResult>>> task, long timeout, TimeUnit unit) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future<Optional<SettableFuture<ConfigTaskResult>>> future = executor.submit(task); + + try { + return future.get(timeout, unit); + } catch (TimeoutException e) { + future.cancel(true); + throw new UDFException( + "UDF validation timed out after " + timeout + " " + unit.toString().toLowerCase(), e); + } catch (Exception e) { + throw new UDFException("UDF validation failed", e); + } finally { + executor.shutdownNow(); + } + } + + private Optional<SettableFuture<ConfigTaskResult>> tryReflectAndValidateUDF( + String libRoot, + CreateFunctionStatement createFunctionStatement, + SettableFuture<ConfigTaskResult> future) { // try to create instance, this request will fail if creation is not successful try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) { // ensure that jar file contains the class and the class is a UDF diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index ef289d75c19..99ad5fc5b20 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -362,6 +362,8 @@ public class CommonConfig { private int log2SizeClassGroup = 3; + private int udfValidationTimeoutMs = 5000; + // time in nanosecond precision when starting up private final long startUpNanosecond = System.nanoTime(); @@ -1784,6 +1786,14 @@ public class CommonConfig { this.log2SizeClassGroup = log2SizeClassGroup; } + public long getUdfValidationTimeoutMs() { + return udfValidationTimeoutMs; + } + + public void setUdfValidationTimeoutMs(int udfValidationTimeoutMs) { + this.udfValidationTimeoutMs = udfValidationTimeoutMs; + } + /** * @param querySamplingRateLimit query_sample_throughput_bytes_per_sec */ diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 676458299b7..b8e45724d07 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -248,6 +248,11 @@ public class CommonDescriptor { "cluster_device_limit_threshold", String.valueOf(config.getDeviceLimitThreshold())))); + config.setUdfValidationTimeoutMs( + Integer.parseInt( + properties.getProperty( + "udf_validation_timeout_ms", String.valueOf(config.getUdfValidationTimeoutMs())))); + loadRetryProperties(properties); loadBinaryAllocatorProps(properties); }
