This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch IoTDBLocal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 794cc603cc3d06efa5fc7e66fa9f4b939d3b6451
Author: Weihao Li <[email protected]>
AuthorDate: Wed Jun 24 11:48:15 2026 +0800

    modify some
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../java/org/apache/iotdb/udf/api/IoTDBLocal.java  |   5 +-
 .../relational/ColumnTransformerBuilder.java       |  45 ++++---
 .../calc/plan/planner/TableOperatorGenerator.java  |  53 +++++----
 .../udf/UserDefineScalarFunctionTransformer.java   |  34 +++++-
 .../iotdb/db/protocol/session/SessionManager.java  |  17 +++
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |   2 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     |  33 +-----
 .../planner/DataNodeTableOperatorGenerator.java    |  40 ++-----
 .../db/queryengine/udf/InternalQueryExecutor.java  | 129 ++++++++++-----------
 .../db/queryengine/udf/InternalQueryResult.java    |  32 ++++-
 .../iotdb/db/queryengine/udf/IoTDBLocalImpl.java   | 111 ++++++------------
 .../udf/ScalarUdfExpressionDetector.java           |  58 ---------
 .../iotdb/db/queryengine/udf/UDFResultSetImpl.java |  28 +++--
 13 files changed, 264 insertions(+), 323 deletions(-)

diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java
index 0c96f919fc1..b84cb45965a 100644
--- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java
+++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/IoTDBLocal.java
@@ -65,9 +65,6 @@ public interface IoTDBLocal {
   /** Log at ERROR level with exception stack. */
   void error(String msg, Throwable t);
 
-  /**
-   * Release internal session and other resources. Called by the framework 
after beforeDestroy
-   * method.
-   */
+  /** Close internal session. Called by the framework after beforeDestroy 
method. */
   void close();
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
index 782222a1394..ba0464b0777 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.calc.execution.relational;
 
 import org.apache.iotdb.calc.i18n.CalcMessages;
+import 
org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFactory;
 import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.calc.plan.relational.metadata.ITypeMetadata;
 import org.apache.iotdb.calc.transformation.dag.column.ColumnTransformer;
@@ -198,7 +199,6 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.type.TypeNotFoundExc
 import org.apache.iotdb.commons.queryengine.plan.udf.TableUDFUtils;
 import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction;
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
-import org.apache.iotdb.udf.api.IoTDBLocal;
 import org.apache.iotdb.udf.api.customizer.analysis.ScalarFunctionAnalysis;
 import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments;
 import org.apache.iotdb.udf.api.relational.ScalarFunction;
@@ -1490,16 +1490,10 @@ public class ColumnTransformerBuilder
                     .collect(Collectors.toList()),
                 Collections.emptyMap());
         ScalarFunctionAnalysis analysis = scalarFunction.analyze(parameters);
-        Optional<IoTDBLocal> ioTDBLocal = context.getIoTDBLocal();
-        if (ioTDBLocal.isPresent()) {
-          scalarFunction.beforeStart(parameters, ioTDBLocal.get());
-        } else {
-          scalarFunction.beforeStart(parameters);
-        }
         Type returnType =
             
UDFDataTypeTransformer.transformUDFDataTypeToReadType(analysis.getOutputDataType());
         return new UserDefineScalarFunctionTransformer(
-            returnType, scalarFunction, childrenColumnTransformer, 
ioTDBLocal.orElse(null));
+            returnType, scalarFunction, childrenColumnTransformer, parameters, 
context);
       }
     }
     throw new IllegalArgumentException(
@@ -1960,7 +1954,11 @@ public class ColumnTransformerBuilder
     @SuppressWarnings("unused")
     private final Optional<MemoryReservationManager> memoryReservationManager;
 
-    private final Optional<IoTDBLocal> ioTDBLocal;
+    private final String fragmentInstanceId;
+
+    private final String outerQueryId;
+
+    @Nullable private final IoTDBLocalFactory ioTDBLocalFactory;
 
     public Context(
         SessionInfo sessionInfo,
@@ -1986,7 +1984,9 @@ public class ColumnTransformerBuilder
           typeProvider,
           metadata,
           memoryReservationManager,
-          Optional.empty());
+          null,
+          null,
+          null);
     }
 
     public Context(
@@ -2001,7 +2001,9 @@ public class ColumnTransformerBuilder
         ITableTypeProvider typeProvider,
         ITypeMetadata metadata,
         @Nullable MemoryReservationManager memoryReservationManager,
-        Optional<IoTDBLocal> ioTDBLocal) {
+        String fragmentInstanceId,
+        String outerQueryId,
+        @Nullable IoTDBLocalFactory ioTDBLocalFactory) {
       this.sessionInfo = sessionInfo;
       this.leafList = leafList;
       this.inputLocations = inputLocations;
@@ -2013,11 +2015,26 @@ public class ColumnTransformerBuilder
       this.typeProvider = typeProvider;
       this.metadata = metadata;
       this.memoryReservationManager = 
Optional.ofNullable(memoryReservationManager);
-      this.ioTDBLocal = ioTDBLocal;
+      this.fragmentInstanceId = fragmentInstanceId;
+      this.outerQueryId = outerQueryId;
+      this.ioTDBLocalFactory = ioTDBLocalFactory;
+    }
+
+    public SessionInfo getSessionInfo() {
+      return sessionInfo;
+    }
+
+    public String getFragmentInstanceId() {
+      return fragmentInstanceId;
+    }
+
+    public String getOuterQueryId() {
+      return outerQueryId;
     }
 
-    public Optional<IoTDBLocal> getIoTDBLocal() {
-      return ioTDBLocal;
+    @Nullable
+    public IoTDBLocalFactory getIoTDBLocalFactory() {
+      return ioTDBLocalFactory;
     }
 
     public Type getType(SymbolReference symbolReference) {
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
index 8b7f6cb714b..6454bd38d47 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
@@ -314,30 +314,10 @@ public abstract class TableOperatorGenerator<
       Map<Symbol, List<InputLocation>> inputLocations,
       PlanNodeId planNodeId,
       C context) {
-    return constructFilterAndProjectOperator(
-        predicate,
-        inputOperator,
-        projectExpressions,
-        inputDataTypes,
-        inputLocations,
-        planNodeId,
-        context,
-        getIoTDBLocal(context, planNodeId));
-  }
 
-  protected Optional<IoTDBLocal> getIoTDBLocal(C context, PlanNodeId 
planNodeId) {
-    return Optional.empty();
-  }
-
-  protected Operator constructFilterAndProjectOperator(
-      Optional<Expression> predicate,
-      Operator inputOperator,
-      Expression[] projectExpressions,
-      List<TSDataType> inputDataTypes,
-      Map<Symbol, List<InputLocation>> inputLocations,
-      PlanNodeId planNodeId,
-      C context,
-      Optional<IoTDBLocal> ioTDBLocal) {
+    String fragmentInstanceId = getFragmentInstanceId(context);
+    String outerQueryId = getQueryId(context);
+    IoTDBLocalFactory ioTDBLocalFactory = getIoTDBLocalFactory(context);
 
     final List<TSDataType> filterOutputDataTypes = new 
ArrayList<>(inputDataTypes);
 
@@ -367,7 +347,9 @@ public abstract class TableOperatorGenerator<
                           context.getTableTypeProvider(),
                           metadata,
                           context.getMemoryReservationManager(),
-                          ioTDBLocal);
+                          fragmentInstanceId,
+                          outerQueryId,
+                          ioTDBLocalFactory);
 
                   return visitor.process(p, filterColumnTransformerContext);
                 })
@@ -396,7 +378,9 @@ public abstract class TableOperatorGenerator<
             context.getTableTypeProvider(),
             metadata,
             context.getMemoryReservationManager(),
-            ioTDBLocal);
+            fragmentInstanceId,
+            outerQueryId,
+            ioTDBLocalFactory);
 
     for (Expression expression : projectExpressions) {
       projectOutputTransformerList.add(
@@ -420,6 +404,18 @@ public abstract class TableOperatorGenerator<
         predicate.isPresent());
   }
 
+  protected String getFragmentInstanceId(C context) {
+    return null;
+  }
+
+  protected String getQueryId(C context) {
+    return null;
+  }
+
+  protected IoTDBLocalFactory getIoTDBLocalFactory(C context) {
+    return null;
+  }
+
   @Override
   public Operator visitProject(ProjectNode node, C context) {
     ITableTypeProvider typeProvider = context.getTableTypeProvider();
@@ -2494,4 +2490,11 @@ public abstract class TableOperatorGenerator<
   }
 
   protected abstract SessionInfo getSessionInfo(C context);
+
+  /** Factory for creating {@link IoTDBLocal} inside UDF column transformers. 
*/
+  @FunctionalInterface
+  public interface IoTDBLocalFactory {
+
+    IoTDBLocal create(SessionInfo sessionInfo, String fragmentInstanceId, 
String queryId);
+  }
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
index dd7c175774d..ab2f99031d2 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
@@ -20,9 +20,13 @@
 package org.apache.iotdb.calc.transformation.dag.column.udf;
 
 import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.RecordIterator;
+import org.apache.iotdb.calc.execution.relational.ColumnTransformerBuilder;
+import 
org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFactory;
 import org.apache.iotdb.calc.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.calc.transformation.dag.column.multi.MultiColumnTransformer;
 import org.apache.iotdb.udf.api.IoTDBLocal;
+import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments;
+import org.apache.iotdb.udf.api.exception.UDFException;
 import org.apache.iotdb.udf.api.relational.ScalarFunction;
 import org.apache.iotdb.udf.api.relational.access.Record;
 
@@ -43,12 +47,36 @@ public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer
       Type returnType,
       ScalarFunction scalarFunction,
       List<ColumnTransformer> childrenTransformers,
-      IoTDBLocal ioTDBLocal) {
+      FunctionArguments parameters,
+      ColumnTransformerBuilder.Context context) {
     super(returnType, childrenTransformers);
     this.scalarFunction = scalarFunction;
-    this.ioTDBLocal = ioTDBLocal;
+    this.ioTDBLocal = createIoTDBLocal(context);
     this.inputTypes =
         
childrenTransformers.stream().map(ColumnTransformer::getType).collect(Collectors.toList());
+    try {
+      if (ioTDBLocal != null) {
+        scalarFunction.beforeStart(parameters, ioTDBLocal);
+      } else {
+        scalarFunction.beforeStart(parameters);
+      }
+    } catch (UDFException e) {
+      throw new RuntimeException(
+          "Error occurs when starting user-defined scalar function "
+              + scalarFunction.getClass().getName(),
+          e);
+    }
+  }
+
+  private static IoTDBLocal createIoTDBLocal(ColumnTransformerBuilder.Context 
context) {
+    IoTDBLocalFactory factory = context.getIoTDBLocalFactory();
+    if (factory == null
+        || context.getFragmentInstanceId() == null
+        || context.getOuterQueryId() == null) {
+      return null;
+    }
+    return factory.create(
+        context.getSessionInfo(), context.getFragmentInstanceId(), 
context.getOuterQueryId());
   }
 
   @Override
@@ -109,8 +137,8 @@ public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer
   public void close() {
     super.close();
     if (ioTDBLocal != null) {
-      scalarFunction.beforeDestroy(ioTDBLocal);
       ioTDBLocal.close();
+      scalarFunction.beforeDestroy(ioTDBLocal);
     } else {
       scalarFunction.beforeDestroy();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index b3fa3c049f3..04ddf7f4b9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -369,6 +369,23 @@ public class SessionManager implements SessionManagerMBean 
{
     return currSession.get();
   }
 
+  /**
+   * Swap the ThreadLocal session and return the previous one. Used by UDF 
internal queries to
+   * temporarily install an internal session without removing the previous 
session from the session
+   * map.
+   */
+  public IClientSession exchangeCurrSession(IClientSession session) {
+    IClientSession previous = currSession.get();
+    if (session != null) {
+      currSession.set(session);
+      sessions.putIfAbsent(session, placeHolder);
+    } else {
+      currSession.remove();
+      currSessionIdleTime.remove();
+    }
+    return previous;
+  }
+
   /** get current session and update session idle time. */
   public IClientSession getCurrSessionAndUpdateIdleTime() {
     IClientSession clientSession = getCurrSession();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 1aa14e053ea..d940d226096 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -562,7 +562,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
-  private static void clearUp(
+  public static void clearUp(
       IClientSession clientSession,
       Long statementId,
       Long queryId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 45b8643d02c..402424d1f69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
@@ -29,7 +30,6 @@ import 
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.memory.IMemoryBlock;
 import org.apache.iotdb.commons.memory.MemoryBlockType;
@@ -318,8 +318,7 @@ public class Coordinator {
       boolean userQuery,
       boolean debug,
       boolean readOnlyInternalQuery,
-      BiFunction<MPPQueryContext, Long, IQueryExecution> 
iQueryExecutionFactory)
-      throws IoTDBException {
+      BiFunction<MPPQueryContext, Long, IQueryExecution> 
iQueryExecutionFactory) {
     long startTime = System.currentTimeMillis();
     QueryId globalQueryId = queryIdGenerator.createNextQueryId();
     MPPQueryContext queryContext = null;
@@ -504,40 +503,12 @@ public class Coordinator {
       long timeOut,
       boolean userQuery,
       boolean debug) {
-    return executeForTableModel(
-        statement,
-        sqlParser,
-        clientSession,
-        queryId,
-        session,
-        sql,
-        metadata,
-        timeOut,
-        userQuery,
-        debug,
-        false);
-  }
-
-  public ExecutionResult executeForTableModel(
-      org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement 
statement,
-      SqlParser sqlParser,
-      IClientSession clientSession,
-      long queryId,
-      SessionInfo session,
-      String sql,
-      Metadata metadata,
-      long timeOut,
-      boolean userQuery,
-      boolean debug,
-      boolean readOnlyInternalQuery)
-      throws IoTDBException {
     return execution(
         queryId,
         session,
         sql,
         userQuery,
         debug,
-        readOnlyInternalQuery,
         ((queryContext, startTime) ->
             createQueryExecutionForTableModel(
                 statement,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index 9a2e35a8c1e..97383309a48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.La
 import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAggregator;
 import org.apache.iotdb.calc.execution.relational.ColumnTransformerBuilder;
 import org.apache.iotdb.calc.plan.planner.TableOperatorGenerator;
+import 
org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFactory;
 import 
org.apache.iotdb.calc.transformation.dag.column.leaf.LeafColumnTransformer;
 import 
org.apache.iotdb.calc.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -68,6 +69,7 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
 import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
+import org.apache.iotdb.db.queryengine.execution.operator.EmptyDataOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.ExplainAnalyzeOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.TableIntoOperator;
@@ -132,14 +134,13 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.udf.IoTDBLocalImpl;
-import org.apache.iotdb.db.queryengine.udf.ScalarUdfExpressionDetector;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils;
-import org.apache.iotdb.udf.api.IoTDBLocal;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.idcolumn.FourOrHigherLevelDBExtractor;
@@ -2104,34 +2105,17 @@ public class DataNodeTableOperatorGenerator
   }
 
   @Override
-  protected Optional<IoTDBLocal> getIoTDBLocal(
-      LocalExecutionPlanContext context, PlanNodeId planNodeId) {
-    return Optional.of(
-        IoTDBLocalImpl.create(
-            getSessionInfo(context), context.getFragmentInstanceId(), 
planNodeId));
+  protected String getFragmentInstanceId(LocalExecutionPlanContext context) {
+    return context.getFragmentInstanceId().getFullId();
   }
 
   @Override
-  protected Operator constructFilterAndProjectOperator(
-      Optional<Expression> predicate,
-      Operator inputOperator,
-      Expression[] projectExpressions,
-      List<TSDataType> inputDataTypes,
-      Map<Symbol, List<InputLocation>> inputLocations,
-      PlanNodeId planNodeId,
-      LocalExecutionPlanContext context) {
-    Optional<IoTDBLocal> ioTDBLocal =
-        ScalarUdfExpressionDetector.contains(predicate, projectExpressions)
-            ? getIoTDBLocal(context, planNodeId)
-            : Optional.empty();
-    return constructFilterAndProjectOperator(
-        predicate,
-        inputOperator,
-        projectExpressions,
-        inputDataTypes,
-        inputLocations,
-        planNodeId,
-        context,
-        ioTDBLocal);
+  protected String getQueryId(LocalExecutionPlanContext context) {
+    return context.getFragmentInstanceId().getQueryId().getId();
+  }
+
+  @Override
+  protected IoTDBLocalFactory getIoTDBLocalFactory(LocalExecutionPlanContext 
context) {
+    return IoTDBLocalImpl.FACTORY;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java
index 5a31c5910f7..328542d05c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryExecutor.java
@@ -20,12 +20,14 @@
 package org.apache.iotdb.db.queryengine.udf;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.QueryTimeoutException;
 import org.apache.iotdb.commons.exception.SemanticException;
 import org.apache.iotdb.commons.queryengine.common.SessionInfo;
 import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement;
 import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
@@ -45,82 +47,79 @@ public final class InternalQueryExecutor {
 
   private InternalQueryExecutor() {}
 
-  public static long computeRemainingTimeoutMs(
-      long outerQueryStartTimeMs, long outerQueryTimeoutMs) {
-    return outerQueryTimeoutMs - (System.currentTimeMillis() - 
outerQueryStartTimeMs);
-  }
-
   public static InternalQueryResult executeInternalQuery(
-      IClientSession internalSession,
       SessionInfo sessionInfo,
+      String fragmentInstanceId,
+      QueryId outerQueryId,
       String sql,
-      long outerQueryStartTimeMs,
-      long outerQueryTimeoutMs)
+      long timeoutMs)
       throws IoTDBException {
 
-    long timeoutMs = computeRemainingTimeoutMs(outerQueryStartTimeMs, 
outerQueryTimeoutMs);
-    if (timeoutMs <= 0) {
-      throw new QueryTimeoutException(
-          "Outer query timeout exceeded before UDF internal query starts");
-    }
+    IClientSession previousSession = SESSION_MANAGER.getCurrSession();
 
-    Statement parsedStatement = parseTableStatement(internalSession, 
sessionInfo, sql);
-
-    long statementId = SESSION_MANAGER.requestStatementId(internalSession);
-    long queryId = SESSION_MANAGER.requestQueryId(internalSession, 
statementId);
-
-    ExecutionResult result =
-        COORDINATOR.executeForTableModel(
-            parsedStatement,
-            RELATION_SQL_PARSER,
-            internalSession,
-            queryId,
-            sessionInfo,
-            sql,
-            METADATA,
-            timeoutMs,
-            false,
-            false,
-            true);
-
-    if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-      COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null);
-      internalSession.removeQueryId(statementId, queryId);
-      throw new IoTDBException(result.status.message, result.status.code);
-    }
+    InternalClientSession internalSession =
+        new InternalClientSession(formatInternalClientId(fragmentInstanceId, 
outerQueryId));
+    internalSession.setSqlDialect(sessionInfo.getSqlDialect());
+    sessionInfo.getDatabaseName().ifPresent(internalSession::setDatabaseName);
 
-    IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
-    if (queryExecution == null) {
-      COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null);
-      internalSession.removeQueryId(statementId, queryId);
-      throw new IoTDBException(
-          "Internal query execution not found", 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-    }
+    SESSION_MANAGER.supplySession(
+        internalSession,
+        sessionInfo.getUserId(),
+        sessionInfo.getUserName(),
+        sessionInfo.getZoneId(),
+        sessionInfo.getVersion());
 
-    return new InternalQueryResult(
-        queryExecution,
-        () -> {
-          COORDINATOR.cleanupQueryExecution(queryId, () -> sql, null);
-          internalSession.removeQueryId(statementId, queryId);
-        });
-  }
+    long statementId = -1;
+    long queryId = -1;
+    try {
+      SESSION_MANAGER.exchangeCurrSession(internalSession);
 
-  static void validateReadOnlyQuery(IQueryExecution execution) throws 
IoTDBException {
-    if (execution.getQueryType() != QueryType.READ) {
-      execution.stopAndCleanup(null);
-      throw new SemanticException("Only query is allowed when used IoTDBLocal 
in UDF");
-    }
-  }
+      statementId = SESSION_MANAGER.requestStatementId(internalSession);
+      queryId = SESSION_MANAGER.requestQueryId(internalSession, statementId);
 
-  private static Statement parseTableStatement(
-      IClientSession internalSession, SessionInfo sessionInfo, String sql) 
throws IoTDBException {
-    try {
-      Statement statement =
+      Statement parsedStatement =
           RELATION_SQL_PARSER.createStatement(sql, sessionInfo.getZoneId(), 
internalSession);
-      return statement;
+
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              parsedStatement,
+              RELATION_SQL_PARSER,
+              internalSession,
+              queryId,
+              sessionInfo,
+              sql,
+              METADATA,
+              timeoutMs,
+              false,
+              false);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new IoTDBException(result.status.message, result.status.code);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+      if (queryExecution == null) {
+        throw new IoTDBException(
+            "Internal query execution not found",
+            TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      }
+
+      return new InternalQueryResult(queryExecution, internalSession, 
statementId, queryId, sql);
     } catch (Exception e) {
-      throw new IoTDBException(e.getMessage(), 
TSStatusCode.SQL_PARSE_ERROR.getStatusCode());
+      ClientRPCServiceImpl.clearUp(internalSession, statementId, queryId, () 
-> sql, e);
+      throw new IoTDBException(e.getMessage(), 
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    } finally {
+      SESSION_MANAGER.exchangeCurrSession(previousSession);
+    }
+  }
+
+  static String formatInternalClientId(String fragmentInstanceId, QueryId 
outerQueryId) {
+    return String.format("udf-local-%s-%s", fragmentInstanceId, outerQueryId);
+  }
+
+  public static void validateReadOnlyQuery(IQueryExecution execution) {
+    if (execution.getQueryType() != QueryType.READ) {
+      throw new SemanticException("Only query is supported for IoTDBLocal 
query interface");
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java
index 9f35fa95618..419ea00eb00 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/InternalQueryResult.java
@@ -19,25 +19,47 @@
 
 package org.apache.iotdb.db.queryengine.udf;
 
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 
-/** Internal query result holding {@link IQueryExecution} and a cleanup 
action. */
+/** Internal query result holding {@link IQueryExecution} and cleanup 
metadata. */
 public final class InternalQueryResult implements AutoCloseable {
 
   private final IQueryExecution queryExecution;
-  private final Runnable releaseAction;
+  private final IClientSession internalSession;
+  private final long statementId;
+  private final long queryId;
+  private final String sql;
 
-  public InternalQueryResult(IQueryExecution queryExecution, Runnable 
releaseAction) {
+  public InternalQueryResult(
+      IQueryExecution queryExecution,
+      IClientSession internalSession,
+      long statementId,
+      long queryId,
+      String sql) {
     this.queryExecution = queryExecution;
-    this.releaseAction = releaseAction;
+    this.internalSession = internalSession;
+    this.statementId = statementId;
+    this.queryId = queryId;
+    this.sql = sql;
   }
 
   public IQueryExecution getQueryExecution() {
     return queryExecution;
   }
 
+  public long getQueryId() {
+    return queryId;
+  }
+
+  public DatasetHeader getDatasetHeader() {
+    return queryExecution.getDatasetHeader();
+  }
+
   @Override
   public void close() {
-    releaseAction.run();
+    ClientRPCServiceImpl.clearUp(internalSession, statementId, queryId, () -> 
sql, null);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
index 32540b20e06..47ee7d07f27 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
@@ -19,13 +19,12 @@
 
 package org.apache.iotdb.db.queryengine.udf;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import 
org.apache.iotdb.calc.plan.planner.TableOperatorGenerator.IoTDBLocalFactory;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.QueryTimeoutException;
 import org.apache.iotdb.commons.queryengine.common.SessionInfo;
-import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.protocol.session.InternalClientSession;
-import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.udf.api.IoTDBLocal;
@@ -41,74 +40,35 @@ import java.util.List;
 /** Server-side implementation of {@link IoTDBLocal}. */
 public class IoTDBLocalImpl implements IoTDBLocal {
 
+  public static final IoTDBLocalFactory FACTORY = IoTDBLocalImpl::new;
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBLocalImpl.class);
-  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
   private static final Coordinator COORDINATOR = Coordinator.getInstance();
 
   private final SessionInfo sessionInfo;
-  private final InternalClientSession internalSession;
-  private final long outerQueryStartTimeMs;
-  private final long outerQueryTimeoutMs;
+  private final String fragmentInstanceId;
+  private final QueryId outerQueryId;
   private final List<UDFResultSetImpl> openResultSets = new ArrayList<>();
-  private boolean closed;
 
-  public IoTDBLocalImpl(
-      SessionInfo sessionInfo,
-      String internalClientId,
-      long outerQueryStartTimeMs,
-      long outerQueryTimeoutMs) {
+  public IoTDBLocalImpl(SessionInfo sessionInfo, String fragmentInstanceId, 
String outerQueryId) {
     this.sessionInfo = sessionInfo;
-    this.outerQueryStartTimeMs = outerQueryStartTimeMs;
-    this.outerQueryTimeoutMs = outerQueryTimeoutMs;
-    this.internalSession = new InternalClientSession(internalClientId);
-    internalSession.setSqlDialect(sessionInfo.getSqlDialect());
-    sessionInfo.getDatabaseName().ifPresent(internalSession::setDatabaseName);
-    SESSION_MANAGER.supplySession(
-        internalSession,
-        sessionInfo.getUserId(),
-        sessionInfo.getUserName(),
-        sessionInfo.getZoneId(),
-        ClientVersion.V_1_0);
-  }
-
-  public static String formatInternalClientId(
-      FragmentInstanceId fragmentInstanceId, PlanNodeId planNodeId) {
-    return "udf-local-" + fragmentInstanceId + "-" + planNodeId;
-  }
-
-  public static IoTDBLocalImpl create(
-      SessionInfo sessionInfo, FragmentInstanceId fragmentInstanceId, 
PlanNodeId planNodeId) {
-    long outerStart = System.currentTimeMillis();
-    long outerTimeout =
-        org.apache.iotdb.db.conf.IoTDBDescriptor.getInstance()
-            .getConfig()
-            .getQueryTimeoutThreshold();
-    String globalQueryId = fragmentInstanceId.getQueryId().getId();
-    for (IQueryExecution execution : COORDINATOR.getAllQueryExecutions()) {
-      if (globalQueryId.equals(execution.getQueryId())) {
-        outerStart = execution.getStartExecutionTime();
-        outerTimeout = execution.getTimeout();
-        break;
-      }
-    }
-    return new IoTDBLocalImpl(
-        sessionInfo,
-        formatInternalClientId(fragmentInstanceId, planNodeId),
-        outerStart,
-        outerTimeout);
+    this.fragmentInstanceId = fragmentInstanceId;
+    this.outerQueryId = QueryId.valueOf(outerQueryId);
   }
 
   @Override
   public UDFResultSet query(String sql) throws UDFException {
-    if (closed) {
-      throw new UDFException("IoTDBLocal is already closed");
-    }
     try {
+      long timeoutMs = computeRemainingTimeoutMs();
+      if (timeoutMs <= 0) {
+        throw new QueryTimeoutException(
+            "Outer query timeout exceeded before IoTDBLocal query starts");
+      }
       InternalQueryResult result =
           InternalQueryExecutor.executeInternalQuery(
-              internalSession, sessionInfo, sql, outerQueryStartTimeMs, 
outerQueryTimeoutMs);
+              sessionInfo, fragmentInstanceId, outerQueryId, sql, timeoutMs);
       int index = openResultSets.size();
-      UDFResultSetImpl rs = new UDFResultSetImpl(result, this, index);
+      UDFResultSetImpl rs = new UDFResultSetImpl(openResultSets, index, 
result);
       openResultSets.add(rs);
       return rs;
     } catch (IoTDBException e) {
@@ -116,29 +76,32 @@ public class IoTDBLocalImpl implements IoTDBLocal {
     }
   }
 
-  void markResultSetClosed(int index) {
-    if (index >= 0 && index < openResultSets.size()) {
-      openResultSets.set(index, null);
-    }
-  }
-
-  public void closeAllResultSets() {
-    for (UDFResultSetImpl rs : openResultSets) {
+  @Override
+  public void close() {
+    for (int i = 0; i < openResultSets.size(); i++) {
+      UDFResultSetImpl rs = openResultSets.get(i);
       if (rs != null) {
-        rs.close();
+        try {
+          rs.close();
+        } catch (UDFException e) {
+          LOGGER.warn("Failed to close UDF result set at index {}", i, e);
+        }
       }
     }
     openResultSets.clear();
   }
 
-  @Override
-  public void close() {
-    if (closed) {
-      return;
+  private long computeRemainingTimeoutMs() {
+    long outerStart = System.currentTimeMillis();
+    long outerTimeout = 
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+    for (IQueryExecution execution : COORDINATOR.getAllQueryExecutions()) {
+      if (outerQueryId.getId().equals(execution.getQueryId())) {
+        outerStart = execution.getStartExecutionTime();
+        outerTimeout = execution.getTimeout();
+        break;
+      }
     }
-    closed = true;
-    closeAllResultSets();
-    SESSION_MANAGER.closeSession(internalSession, 
COORDINATOR::cleanupQueryExecution);
+    return outerTimeout - (System.currentTimeMillis() - outerStart);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java
deleted file mode 100644
index cf87bee846a..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/ScalarUdfExpressionDetector.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.udf;
-
-import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
-import 
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.FunctionCall;
-import org.apache.iotdb.commons.queryengine.plan.udf.TableUDFUtils;
-import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultTraversalVisitor;
-
-import java.util.Optional;
-
-/** Detects whether filter or project expressions contain a user-defined 
scalar function. */
-public final class ScalarUdfExpressionDetector extends 
DefaultTraversalVisitor<Void> {
-
-  private boolean found;
-
-  private ScalarUdfExpressionDetector() {}
-
-  public static boolean contains(Optional<Expression> predicate, Expression[] 
projectExpressions) {
-    ScalarUdfExpressionDetector detector = new ScalarUdfExpressionDetector();
-    if (predicate.isPresent()) {
-      detector.process(predicate.get(), null);
-    }
-    for (Expression expression : projectExpressions) {
-      if (detector.found) {
-        return true;
-      }
-      detector.process(expression, null);
-    }
-    return detector.found;
-  }
-
-  @Override
-  public Void visitFunctionCall(FunctionCall node, Void context) {
-    if (TableUDFUtils.isScalarFunction(node.getName().getSuffix())) {
-      found = true;
-      return null;
-    }
-    return super.visitFunctionCall(node, context);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java
index 647e0115a64..bc3f163eee8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/UDFResultSetImpl.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.Re
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.schema.column.ColumnHeader;
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
-import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.udf.api.UDFResultSet;
 import org.apache.iotdb.udf.api.exception.UDFException;
 import org.apache.iotdb.udf.api.relational.access.Record;
@@ -41,19 +40,20 @@ import java.util.stream.Collectors;
 /** Server-side implementation of {@link UDFResultSet}. */
 public class UDFResultSetImpl implements UDFResultSet {
 
-  private final InternalQueryResult queryResult;
-  private final IoTDBLocalImpl owner;
+  private final List<UDFResultSetImpl> openResultSets;
   private final int index;
+  private final InternalQueryResult queryResult;
+  private final List<Type> columnTypes;
 
   private Iterator<Record> rowIterator;
-  private final List<Type> columnTypes;
   private boolean closed;
 
-  public UDFResultSetImpl(InternalQueryResult queryResult, IoTDBLocalImpl 
owner, int index) {
-    this.queryResult = queryResult;
-    this.owner = owner;
+  public UDFResultSetImpl(
+      List<UDFResultSetImpl> openResultSets, int index, InternalQueryResult 
queryResult) {
+    this.openResultSets = openResultSets;
     this.index = index;
-    this.columnTypes = buildColumnTypes(queryResult.getQueryExecution());
+    this.queryResult = queryResult;
+    this.columnTypes = 
buildColumnTypes(queryResult.getDatasetHeader().getColumnHeaders());
   }
 
   @Override
@@ -68,7 +68,7 @@ public class UDFResultSetImpl implements UDFResultSet {
       try {
         batch = queryResult.getQueryExecution().getBatchResult();
       } catch (IoTDBException e) {
-        throw new UDFException(e.getMessage());
+        throw new UDFException(e.getMessage(), e);
       }
       if (!batch.isPresent()) {
         return false;
@@ -85,9 +85,7 @@ public class UDFResultSetImpl implements UDFResultSet {
 
   @Override
   public Record next() throws UDFException {
-    ensureOpen();
-
-    if (rowIterator == null || !rowIterator.hasNext()) {
+    if (!hasNext()) {
       throw new NoSuchElementException();
     }
     return rowIterator.next();
@@ -99,12 +97,12 @@ public class UDFResultSetImpl implements UDFResultSet {
       return;
     }
     closed = true;
+    openResultSets.set(index, null);
     try {
       queryResult.close();
     } catch (RuntimeException e) {
       throw new UDFException("Failed to close internal query result", e);
     }
-    owner.markResultSetClosed(index);
   }
 
   private void ensureOpen() throws UDFException {
@@ -113,8 +111,8 @@ public class UDFResultSetImpl implements UDFResultSet {
     }
   }
 
-  private static List<Type> buildColumnTypes(IQueryExecution queryExecution) {
-    return queryExecution.getDatasetHeader().getColumnHeaders().stream()
+  private static List<Type> buildColumnTypes(List<ColumnHeader> columnHeaders) 
{
+    return columnHeaders.stream()
         .map(ColumnHeader::getColumnType)
         .map(UDFDataTypeTransformer::transformToUDFDataType)
         .map(UDFDataTypeTransformer::transformUDFDataTypeToReadType)


Reply via email to