This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch test-4
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16faa06890cdc8f42b7657624108041d61564a94
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jun 20 13:20:30 2025 +0800

    execute in a thread sandbox
---
 .../config/executor/ClusterConfigTaskExecutor.java | 56 ++++++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 10 ++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 ++
 3 files changed, 71 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 5c0d3e967d1..d1139585822 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -256,6 +256,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.trigger.api.Trigger;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
 import org.apache.iotdb.udf.api.UDF;
+import org.apache.iotdb.udf.api.exception.UDFException;
 
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.codec.digest.DigestUtils;
@@ -285,6 +286,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.protocol.client.ConfigNodeClient.MSG_RECONNECTION_FAIL;
@@ -559,6 +566,55 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       String libRoot,
       CreateFunctionStatement createFunctionStatement,
       SettableFuture<ConfigTaskResult> future) {
+    try {
+      // Execute in a thread sandbox to prevent UDF from blocking the main 
thread
+      return executeInThreadSandboxWithTimeout(
+          () -> tryReflectAndValidateUDF(libRoot, createFunctionStatement, 
future),
+          
CommonDescriptor.getInstance().getConfig().getUdfValidationTimeoutMs(),
+          TimeUnit.MILLISECONDS);
+    } catch (final Exception e) {
+      // If the validation fails, we set the exception to the future
+      LOGGER.warn(
+          "Failed to validate UDF class [{}] for function [{}] in sandbox, 
reason: {}",
+          createFunctionStatement.getClassName(),
+          createFunctionStatement.getUdfName(),
+          e.getMessage(),
+          e);
+      future.setException(
+          new IoTDBException(
+              "Failed to validate UDF class '"
+                  + createFunctionStatement.getClassName()
+                  + "' for function '"
+                  + createFunctionStatement.getUdfName()
+                  + "', reason: "
+                  + e.getMessage(),
+              TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()));
+      return Optional.of(future);
+    }
+  }
+
+  private static Optional<SettableFuture<ConfigTaskResult>> 
executeInThreadSandboxWithTimeout(
+      Callable<Optional<SettableFuture<ConfigTaskResult>>> task, long timeout, 
TimeUnit unit) {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<Optional<SettableFuture<ConfigTaskResult>>> future = 
executor.submit(task);
+
+    try {
+      return future.get(timeout, unit);
+    } catch (TimeoutException e) {
+      future.cancel(true);
+      throw new UDFException(
+          "UDF validation timed out after " + timeout + " " + 
unit.toString().toLowerCase(), e);
+    } catch (Exception e) {
+      throw new UDFException("UDF validation failed", e);
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  private Optional<SettableFuture<ConfigTaskResult>> tryReflectAndValidateUDF(
+      String libRoot,
+      CreateFunctionStatement createFunctionStatement,
+      SettableFuture<ConfigTaskResult> future) {
     // try to create instance, this request will fail if creation is not 
successful
     try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) {
       // ensure that jar file contains the class and the class is a UDF
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ef289d75c19..99ad5fc5b20 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -362,6 +362,8 @@ public class CommonConfig {
 
   private int log2SizeClassGroup = 3;
 
+  private int udfValidationTimeoutMs = 5000;
+
   // time in nanosecond precision when starting up
   private final long startUpNanosecond = System.nanoTime();
 
@@ -1784,6 +1786,14 @@ public class CommonConfig {
     this.log2SizeClassGroup = log2SizeClassGroup;
   }
 
+  public long getUdfValidationTimeoutMs() {
+    return udfValidationTimeoutMs;
+  }
+
+  public void setUdfValidationTimeoutMs(int udfValidationTimeoutMs) {
+    this.udfValidationTimeoutMs = udfValidationTimeoutMs;
+  }
+
   /**
    * @param querySamplingRateLimit query_sample_throughput_bytes_per_sec
    */
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 676458299b7..b8e45724d07 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -248,6 +248,11 @@ public class CommonDescriptor {
                 "cluster_device_limit_threshold",
                 String.valueOf(config.getDeviceLimitThreshold()))));
 
+    config.setUdfValidationTimeoutMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "udf_validation_timeout_ms", 
String.valueOf(config.getUdfValidationTimeoutMs()))));
+
     loadRetryProperties(properties);
     loadBinaryAllocatorProps(properties);
   }

Reply via email to