This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch IoTDBLocal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a5e0539e3c7d61c06c9d262e2b1446d4d597cd00
Author: Weihao Li <[email protected]>
AuthorDate: Fri Jun 26 02:55:38 2026 +0800

    fix IT and add more
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../iotdblocal/IoTDBLocalLogAggregateFunction.java | 133 +++++++++++++++++
 .../iotdblocal/IoTDBLocalLogFunction.java          |  46 +++---
 .../relational/iotdblocal/IoTDBLocalLogHelper.java |  93 ++++++++++++
 .../iotdblocal/IoTDBLocalLogTableFunction.java     | 134 +++++++++++++++++
 .../relational/it/db/it/udf/IoTDBLocalIT.java      |  62 +++-----
 .../relational/it/db/it/udf/IoTDBLocalLogIT.java   | 159 +++++++++++++++++++++
 .../relational/it/db/it/udf/SQLFunctionUtils.java  |   1 -
 .../relational/ColumnTransformerBuilder.java       |  10 +-
 .../calc/plan/planner/TableOperatorGenerator.java  |  46 +++---
 .../udf/UserDefineScalarFunctionTransformer.java   |  26 +---
 .../fragment/FragmentInstanceContext.java          |  16 +--
 .../fragment/FragmentInstanceManager.java          |   2 +-
 .../planner/DataNodeTableOperatorGenerator.java    |   4 +-
 .../plan/planner/LocalExecutionPlanContext.java    |   4 +-
 .../SimpleFragmentParallelPlanner.java             |   3 +-
 .../plan/planner/plan/FragmentInstance.java        |  18 ++-
 .../distribute/TableModelQueryFragmentPlanner.java |   3 +-
 .../iotdb/db/queryengine/udf/IoTDBLocalImpl.java   |  17 +--
 18 files changed, 631 insertions(+), 146 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogAggregateFunction.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogAggregateFunction.java
new file mode 100644
index 00000000000..7b7630bf5b0
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogAggregateFunction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.example.relational.iotdblocal;
+
+import org.apache.iotdb.udf.api.IoTDBLocal;
+import org.apache.iotdb.udf.api.State;
+import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis;
+import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments;
+import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
+import org.apache.iotdb.udf.api.relational.AggregateFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.type.Type;
+import org.apache.iotdb.udf.api.utils.ResultValue;
+
+import java.nio.ByteBuffer;
+
+/** Exercises IoTDBLocal log APIs at each aggregate-function lifecycle hook. */
+public class IoTDBLocalLogAggregateFunction implements AggregateFunction {
+
+  private static class CountState implements State {
+    long count;
+    boolean addInputLogged;
+    boolean combineStateLogged;
+    boolean outputFinalLogged;
+    boolean destroyLogged;
+
+    @Override
+    public void reset() {
+      count = 0;
+    }
+
+    @Override
+    public byte[] serialize() {
+      ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+      buffer.putLong(count);
+      return buffer.array();
+    }
+
+    @Override
+    public void deserialize(byte[] bytes) {
+      count = ByteBuffer.wrap(bytes).getLong();
+    }
+  }
+
+  @Override
+  public AggregateFunctionAnalysis analyze(FunctionArguments arguments)
+      throws UDFArgumentNotValidException {
+    if (arguments.getArgumentsSize() != 1) {
+      throw new UDFArgumentNotValidException("IoTDBLocalLogAggregateFunction 
accepts one column");
+    }
+    return new 
AggregateFunctionAnalysis.Builder().outputDataType(Type.INT64).build();
+  }
+
+  @Override
+  public void beforeStart(FunctionArguments arguments, IoTDBLocal local) {
+    IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.UDAF_BEFORE_START);
+  }
+
+  @Override
+  public State createState() {
+    return new CountState();
+  }
+
+  @Override
+  public void addInput(State state, Record input) {
+    CountState countState = (CountState) state;
+    countState.count++;
+  }
+
+  @Override
+  public void addInput(State state, Record input, IoTDBLocal local) {
+    CountState countState = (CountState) state;
+    if (!countState.addInputLogged) {
+      countState.addInputLogged = true;
+      IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.UDAF_ADD_INPUT);
+    }
+    addInput(state, input);
+  }
+
+  @Override
+  public void combineState(State state, State rhs) {
+    CountState left = (CountState) state;
+    CountState right = (CountState) rhs;
+    left.count += right.count;
+  }
+
+  @Override
+  public void combineState(State state, State rhs, IoTDBLocal local) {
+    CountState countState = (CountState) state;
+    if (!countState.combineStateLogged) {
+      countState.combineStateLogged = true;
+      IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.UDAF_COMBINE_STATE);
+    }
+    combineState(state, rhs);
+  }
+
+  @Override
+  public void outputFinal(State state, ResultValue resultValue) {
+    resultValue.setLong(((CountState) state).count);
+  }
+
+  @Override
+  public void outputFinal(State state, ResultValue resultValue, IoTDBLocal 
local) {
+    CountState countState = (CountState) state;
+    if (!countState.outputFinalLogged) {
+      countState.outputFinalLogged = true;
+      IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.UDAF_OUTPUT_FINAL);
+    }
+    outputFinal(state, resultValue);
+  }
+
+  @Override
+  public void beforeDestroy(IoTDBLocal local) {
+    IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.UDAF_BEFORE_DESTROY);
+  }
+}
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogFunction.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogFunction.java
index b22c5a1841f..720828dcf1b 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogFunction.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogFunction.java
@@ -27,18 +27,14 @@ import org.apache.iotdb.udf.api.relational.ScalarFunction;
 import org.apache.iotdb.udf.api.relational.access.Record;
 import org.apache.iotdb.udf.api.type.Type;
 
-/** Exercises all IoTDBLocal log APIs with distinctive markers for integration 
tests. */
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+
+/** Exercises IoTDBLocal log APIs at each scalar-function lifecycle hook. */
 public class IoTDBLocalLogFunction implements ScalarFunction {
 
-  public static final String INFO_PLAIN = "IOTDB_LOCAL_IT_INFO_PLAIN";
-  public static final String INFO_FORMAT = "IOTDB_LOCAL_IT_INFO_FORMAT loaded 
3 rows";
-  public static final String INFO_CAUSE = "IOTDB_LOCAL_IT_INFO_CAUSE";
-  public static final String WARN_PLAIN = "IOTDB_LOCAL_IT_WARN_PLAIN";
-  public static final String WARN_FORMAT = "IOTDB_LOCAL_IT_WARN_FORMAT warn 
ab";
-  public static final String WARN_CAUSE = "IOTDB_LOCAL_IT_WARN_CAUSE";
-  public static final String ERROR_PLAIN = "IOTDB_LOCAL_IT_ERROR_PLAIN";
-  public static final String ERROR_FORMAT = "IOTDB_LOCAL_IT_ERROR_FORMAT error 
code=500";
-  public static final String ERROR_CAUSE = "IOTDB_LOCAL_IT_ERROR_CAUSE";
+  private boolean evaluateLogged;
+  private boolean destroyLogged;
 
   @Override
   public ScalarFunctionAnalysis analyze(FunctionArguments arguments)
@@ -48,20 +44,28 @@ public class IoTDBLocalLogFunction implements 
ScalarFunction {
 
   @Override
   public void beforeStart(FunctionArguments arguments, IoTDBLocal local) {
-    RuntimeException cause = new RuntimeException("iotdb-local-it-log-cause");
-    local.info(INFO_PLAIN);
-    local.info("IOTDB_LOCAL_IT_INFO_FORMAT loaded {} rows", 3);
-    local.info(INFO_CAUSE, cause);
-    local.warn(WARN_PLAIN);
-    local.warn("IOTDB_LOCAL_IT_WARN_FORMAT warn {} {}", "a", "b");
-    local.warn(WARN_CAUSE, cause);
-    local.error(ERROR_PLAIN);
-    local.error("IOTDB_LOCAL_IT_ERROR_FORMAT error code={}", 500);
-    local.error(ERROR_CAUSE, cause);
+    IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.SCALAR_BEFORE_START);
   }
 
   @Override
   public Object evaluate(Record input) {
-    return "ok";
+    return new Binary("ok", TSFileConfig.STRING_CHARSET);
+  }
+
+  @Override
+  public Object evaluate(Record input, IoTDBLocal local) {
+    if (!evaluateLogged) {
+      evaluateLogged = true;
+      IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.SCALAR_EVALUATE);
+    }
+    return evaluate(input);
+  }
+
+  @Override
+  public void beforeDestroy(IoTDBLocal local) {
+    if (!destroyLogged) {
+      destroyLogged = true;
+      IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.SCALAR_BEFORE_DESTROY);
+    }
   }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogHelper.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogHelper.java
new file mode 100644
index 00000000000..970b25c3e84
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.example.relational.iotdblocal;
+
+import org.apache.iotdb.udf.api.IoTDBLocal;
+
+/** Shared IoTDBLocal log markers for integration tests. */
+public final class IoTDBLocalLogHelper {
+
+  public static final String CAUSE_MESSAGE = "iotdb-local-it-log-cause";
+
+  public static final String SCALAR_BEFORE_START = 
"IOTDB_LOCAL_SCALAR_BEFORE_START";
+  public static final String SCALAR_EVALUATE = "IOTDB_LOCAL_SCALAR_EVALUATE";
+  public static final String SCALAR_BEFORE_DESTROY = 
"IOTDB_LOCAL_SCALAR_BEFORE_DESTROY";
+
+  public static final String UDAF_BEFORE_START = 
"IOTDB_LOCAL_UDAF_BEFORE_START";
+  public static final String UDAF_ADD_INPUT = "IOTDB_LOCAL_UDAF_ADD_INPUT";
+  public static final String UDAF_COMBINE_STATE = 
"IOTDB_LOCAL_UDAF_COMBINE_STATE";
+  public static final String UDAF_OUTPUT_FINAL = 
"IOTDB_LOCAL_UDAF_OUTPUT_FINAL";
+  public static final String UDAF_BEFORE_DESTROY = 
"IOTDB_LOCAL_UDAF_BEFORE_DESTROY";
+
+  public static final String TVF_BEFORE_START = "IOTDB_LOCAL_TVF_BEFORE_START";
+  public static final String TVF_PROCESS = "IOTDB_LOCAL_TVF_PROCESS";
+  public static final String TVF_BEFORE_DESTROY = 
"IOTDB_LOCAL_TVF_BEFORE_DESTROY";
+
+  private IoTDBLocalLogHelper() {}
+
+  public static void logAllApis(IoTDBLocal local, String phase) {
+    RuntimeException cause = new RuntimeException(CAUSE_MESSAGE);
+    local.info(phase + "_INFO_PLAIN");
+    local.info(phase + "_INFO_FORMAT loaded {} rows", 3);
+    local.info(phase + "_INFO_CAUSE", cause);
+    local.warn(phase + "_WARN_PLAIN");
+    local.warn(phase + "_WARN_FORMAT warn {} {}", "a", "b");
+    local.warn(phase + "_WARN_CAUSE", cause);
+    local.error(phase + "_ERROR_PLAIN");
+    local.error(phase + "_ERROR_FORMAT error code={}", 500);
+    local.error(phase + "_ERROR_CAUSE", cause);
+  }
+
+  public static String infoPlain(String phase) {
+    return phase + "_INFO_PLAIN";
+  }
+
+  public static String infoFormat(String phase) {
+    return phase + "_INFO_FORMAT loaded 3 rows";
+  }
+
+  public static String infoCause(String phase) {
+    return phase + "_INFO_CAUSE";
+  }
+
+  public static String warnPlain(String phase) {
+    return phase + "_WARN_PLAIN";
+  }
+
+  public static String warnFormat(String phase) {
+    return phase + "_WARN_FORMAT warn a b";
+  }
+
+  public static String warnCause(String phase) {
+    return phase + "_WARN_CAUSE";
+  }
+
+  public static String errorPlain(String phase) {
+    return phase + "_ERROR_PLAIN";
+  }
+
+  public static String errorFormat(String phase) {
+    return phase + "_ERROR_FORMAT error code=500";
+  }
+
+  public static String errorCause(String phase) {
+    return phase + "_ERROR_CAUSE";
+  }
+}
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogTableFunction.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogTableFunction.java
new file mode 100644
index 00000000000..39dfa057751
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogTableFunction.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.example.relational.iotdblocal;
+
+import org.apache.iotdb.udf.api.IoTDBLocal;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Exercises IoTDBLocal log APIs at each table-function lifecycle hook. */
+public class IoTDBLocalLogTableFunction implements TableFunction {
+
+  private static final String INPUT_PARAMETER_NAME = "input";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Collections.singletonList(
+        ScalarParameterSpecification.builder()
+            .name(INPUT_PARAMETER_NAME)
+            .type(Type.STRING)
+            .build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    DescribedSchema schema = DescribedSchema.builder().addField("output", 
Type.STRING).build();
+    MapTableFunctionHandle handle =
+        new MapTableFunctionHandle.Builder()
+            .addProperty(
+                INPUT_PARAMETER_NAME,
+                ((ScalarArgument) 
arguments.get(INPUT_PARAMETER_NAME)).getValue())
+            .build();
+    return 
TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build();
+  }
+
+  @Override
+  public TableFunctionHandle createTableFunctionHandle() {
+    return new MapTableFunctionHandle();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(
+      TableFunctionHandle tableFunctionHandle) {
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionLeafProcessor getSplitProcessor() {
+        return new LogSplitProcessor(
+            (String)
+                ((MapTableFunctionHandle) 
tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME));
+      }
+    };
+  }
+
+  private static class LogSplitProcessor implements TableFunctionLeafProcessor 
{
+    private final String input;
+    private boolean finish;
+    private boolean processLogged;
+    private boolean destroyLogged;
+
+    LogSplitProcessor(String input) {
+      this.input = input;
+    }
+
+    @Override
+    public void beforeStart(IoTDBLocal local) {
+      IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.TVF_BEFORE_START);
+    }
+
+    @Override
+    public void process(List<ColumnBuilder> columnBuilders) {
+      for (String value : input.split(",")) {
+        columnBuilders.get(0).writeBinary(new Binary(value, 
Charset.defaultCharset()));
+      }
+      finish = true;
+    }
+
+    @Override
+    public void process(List<ColumnBuilder> columnBuilders, IoTDBLocal local) {
+      if (!processLogged) {
+        processLogged = true;
+        IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.TVF_PROCESS);
+      }
+      process(columnBuilders);
+    }
+
+    @Override
+    public boolean isFinish() {
+      return finish;
+    }
+
+    @Override
+    public void beforeDestroy(IoTDBLocal local) {
+      if (!destroyLogged) {
+        destroyLogged = true;
+        IoTDBLocalLogHelper.logAllApis(local, 
IoTDBLocalLogHelper.TVF_BEFORE_DESTROY);
+      }
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalIT.java
index c50d360ba66..0cebd2afe7a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalIT.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.relational.it.db.it.udf;
 
-import 
org.apache.iotdb.db.query.udf.example.relational.iotdblocal.IoTDBLocalLogFunction;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.TableClusterIT;
@@ -27,13 +26,11 @@ import 
org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
 
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.Statement;
 
@@ -43,7 +40,7 @@ import static org.junit.Assert.fail;
 
 /**
  * Integration tests for {@link org.apache.iotdb.udf.api.IoTDBLocal} in 
table-model UDF, covering
- * compatibility, embedded query, logging, permission inheritance and auto 
resource cleanup.
+ * compatibility, embedded query, permission inheritance and auto resource 
cleanup.
  */
 @RunWith(IoTDBTestRunner.class)
 @Category({TableLocalStandaloneIT.class, TableClusterIT.class})
@@ -118,8 +115,8 @@ public class IoTDBLocalIT {
         new String[] {
           "1970-01-01T00:00:00.001Z,false,false,",
           "1970-01-01T00:00:00.002Z,true,true,",
-          "1970-01-01T00:00:00.003Z,true,false,",
-          "1970-01-01T00:00:00.005Z,true,true,"
+          "1970-01-01T00:00:00.003Z,false,false,",
+          "1970-01-01T00:00:00.005Z,true,false,"
         },
         DATABASE_NAME);
     tableResultSetEqualTest(
@@ -128,8 +125,8 @@ public class IoTDBLocalIT {
         new String[] {
           "1970-01-01T00:00:00.001Z,2,",
           "1970-01-01T00:00:00.002Z,2,",
-          "1970-01-01T00:00:00.003Z,3,",
-          "5,4,"
+          "1970-01-01T00:00:00.003Z,6,",
+          "1970-01-01T00:00:00.005Z,4,"
         },
         DATABASE_NAME);
   }
@@ -140,7 +137,7 @@ public class IoTDBLocalIT {
     tableResultSetEqualTest(
         "SELECT device_id, my_avg(s1) AS avg_s1 FROM vehicle GROUP BY 
device_id",
         new String[] {"device_id", "avg_s1"},
-        new String[] {"d0,2.0,"},
+        new String[] {"d0,2.6666666666666665,"},
         DATABASE_NAME);
   }
 
@@ -149,12 +146,12 @@ public class IoTDBLocalIT {
     SQLFunctionUtils.createUDF("my_split", PKG + ".MySplit");
     SQLFunctionUtils.createUDF("my_repeat", PKG + ".MyRepeatWithoutIndex");
     tableResultSetEqualTest(
-        "SELECT * FROM TABLE(my_split('a,b,c'))",
+        "select * from my_split('a,b,c')",
         new String[] {"output"},
         new String[] {"a,", "b,", "c,"},
         DATABASE_NAME);
     tableResultSetEqualTest(
-        "SELECT * FROM my_repeat(vehicle, 2) ORDER BY time",
+        "select * from my_repeat((select time, device_id, s1 from vehicle), 2) 
order by time",
         new String[] {"time", "device_id", "s1"},
         new String[] {
           "1970-01-01T00:00:00.001Z,d0,1,",
@@ -178,7 +175,9 @@ public class IoTDBLocalIT {
         "SELECT time, device_id, device_name(device_id) AS name, temperature 
FROM readings ORDER BY time",
         new String[] {"time", "device_id", "name", "temperature"},
         new String[] {
-          "1000,d1,一号车间温度传感器,25.5,", "1001,d2,二号车间温度传感器,32.0,", 
"1002,d3,未知设备,20.0,",
+          "1970-01-01T00:00:01.000Z,d1,一号车间温度传感器,25.5,",
+          "1970-01-01T00:00:01.001Z,d2,二号车间温度传感器,32.0,",
+          "1970-01-01T00:00:01.002Z,d3,未知设备,20.0,",
         },
         DATABASE_NAME);
   }
@@ -190,9 +189,9 @@ public class IoTDBLocalIT {
         "SELECT time, device_id, temperature, device_summary(device_id) AS 
summary FROM readings ORDER BY time",
         new String[] {"time", "device_id", "temperature", "summary"},
         new String[] {
-          "1000,d1,25.5,一号车间温度传感器(上限:30.0),",
-          "1001,d2,32.0,二号车间温度传感器(上限:35.0),",
-          "1002,d3,20.0,未知设备(上限:未知),",
+          "1970-01-01T00:00:01.000Z,d1,25.5,一号车间温度传感器(上限:30.0),",
+          "1970-01-01T00:00:01.001Z,d2,32.0,二号车间温度传感器(上限:35.0),",
+          "1970-01-01T00:00:01.002Z,d3,20.0,未知设备(上限:未知),",
         },
         DATABASE_NAME);
   }
@@ -213,35 +212,6 @@ public class IoTDBLocalIT {
         DATABASE_NAME);
   }
 
-  // ── IoTDBLocal logging 
──────────────────────────────────────────────────────
-
-  @Test
-  public void testIoTDBLocalLogApis() throws IOException {
-    SQLFunctionUtils.createUDF("iotdb_local_log", IOTDB_LOCAL_PKG + 
".IoTDBLocalLogFunction");
-    EnvFactory.getEnv().getDataNodeWrapper(0).clearLogContent();
-    tableResultSetEqualTest(
-        "SELECT iotdb_local_log(device_id) AS log_ok FROM readings WHERE 
device_id = 'd1'",
-        new String[] {"log_ok"},
-        new String[] {"ok,"},
-        DATABASE_NAME);
-    assertLogContains(IoTDBLocalLogFunction.INFO_PLAIN);
-    assertLogContains(IoTDBLocalLogFunction.INFO_FORMAT);
-    assertLogContains(IoTDBLocalLogFunction.INFO_CAUSE);
-    assertLogContains(IoTDBLocalLogFunction.WARN_PLAIN);
-    assertLogContains(IoTDBLocalLogFunction.WARN_FORMAT);
-    assertLogContains(IoTDBLocalLogFunction.WARN_CAUSE);
-    assertLogContains(IoTDBLocalLogFunction.ERROR_PLAIN);
-    assertLogContains(IoTDBLocalLogFunction.ERROR_FORMAT);
-    assertLogContains(IoTDBLocalLogFunction.ERROR_CAUSE);
-    assertLogContains("iotdb-local-it-log-cause");
-  }
-
-  private static void assertLogContains(String content) throws IOException {
-    Assert.assertTrue(
-        "Expected log to contain: " + content,
-        EnvFactory.getEnv().getDataNodeWrapper(0).logContains(content));
-  }
-
   // ── permission inheritance 
──────────────────────────────────────────────────
 
   @Test
@@ -252,9 +222,9 @@ public class IoTDBLocalIT {
         "SELECT device_name(device_id) AS name FROM readings WHERE device_id = 
'd1'",
         new String[] {"name"},
         new String[] {"一号车间温度传感器,"},
-        DATABASE_NAME,
         LIMITED_USER,
-        LIMITED_PASSWORD);
+        LIMITED_PASSWORD,
+        DATABASE_NAME);
     dropLimitedUser();
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalLogIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalLogIT.java
new file mode 100644
index 00000000000..c045357296a
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalLogIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it.udf;
+
+import 
org.apache.iotdb.db.query.udf.example.relational.iotdblocal.IoTDBLocalLogHelper;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static org.junit.Assert.fail;
+
+/**
+ * Verifies {@link org.apache.iotdb.udf.api.IoTDBLocal} logging APIs on 
DataNode log output. Runs
+ * only in 1C1D ({@link TableLocalStandaloneIT}) because log inspection uses 
{@code
+ * DataNodeWrapper(0)}.
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class})
+public class IoTDBLocalLogIT {
+
+  private static final String DATABASE_NAME = "iotdb_local_log_it";
+  private static final String IOTDB_LOCAL_PKG =
+      "org.apache.iotdb.db.query.udf.example.relational.iotdblocal";
+
+  private static DataNodeWrapper dataNodeWrapper;
+
+  private static final String[] SETUP_SQLS =
+      new String[] {
+        "CREATE DATABASE " + DATABASE_NAME,
+        "USE " + DATABASE_NAME,
+        "CREATE TABLE readings (device_id STRING TAG, temperature DOUBLE 
FIELD)",
+        "CREATE TABLE vehicle (device_id STRING TAG, s1 INT32 FIELD)",
+        "INSERT INTO readings(time, device_id, temperature) VALUES (1000, 
'd1', 25.5)",
+        "INSERT INTO vehicle(time, device_id, s1) VALUES (1, 'd0', 1), (2, 
'd0', 2), (3, 'd0', 3)",
+        "FLUSH",
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0);
+    executeAsRoot(SETUP_SQLS);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @After
+  public void dropFunctions() {
+    SQLFunctionUtils.dropAllUDF();
+  }
+
+  @Test
+  public void testScalarFunctionLogApis() throws IOException {
+    SQLFunctionUtils.createUDF(
+        "iotdb_local_log_scalar", IOTDB_LOCAL_PKG + ".IoTDBLocalLogFunction");
+    dataNodeWrapper.clearLogContent();
+    tableResultSetEqualTest(
+        "SELECT iotdb_local_log_scalar(device_id) AS log_ok FROM readings 
WHERE device_id = 'd1'",
+        new String[] {"log_ok"},
+        new String[] {"ok,"},
+        DATABASE_NAME);
+    assertPhaseLogs(IoTDBLocalLogHelper.SCALAR_BEFORE_START);
+    assertPhaseLogs(IoTDBLocalLogHelper.SCALAR_EVALUATE);
+    assertPhaseLogs(IoTDBLocalLogHelper.SCALAR_BEFORE_DESTROY);
+  }
+
+  @Test
+  public void testAggregateFunctionLogApis() throws IOException {
+    SQLFunctionUtils.createUDF(
+        "iotdb_local_log_udaf", IOTDB_LOCAL_PKG + 
".IoTDBLocalLogAggregateFunction");
+    dataNodeWrapper.clearLogContent();
+    tableResultSetEqualTest(
+        "SELECT iotdb_local_log_udaf(s1) AS row_count FROM vehicle",
+        new String[] {"row_count"},
+        new String[] {"3,"},
+        DATABASE_NAME);
+    assertPhaseLogs(IoTDBLocalLogHelper.UDAF_BEFORE_START);
+    assertPhaseLogs(IoTDBLocalLogHelper.UDAF_ADD_INPUT);
+    assertPhaseLogs(IoTDBLocalLogHelper.UDAF_OUTPUT_FINAL);
+    assertPhaseLogs(IoTDBLocalLogHelper.UDAF_BEFORE_DESTROY);
+  }
+
+  @Test
+  public void testTableFunctionLogApis() throws IOException {
+    SQLFunctionUtils.createUDF(
+        "iotdb_local_log_split", IOTDB_LOCAL_PKG + 
".IoTDBLocalLogTableFunction");
+    dataNodeWrapper.clearLogContent();
+    tableResultSetEqualTest(
+        "SELECT * FROM iotdb_local_log_split('a,b')",
+        new String[] {"output"},
+        new String[] {"a,", "b,"},
+        DATABASE_NAME);
+    assertPhaseLogs(IoTDBLocalLogHelper.TVF_BEFORE_START);
+    assertPhaseLogs(IoTDBLocalLogHelper.TVF_PROCESS);
+    assertPhaseLogs(IoTDBLocalLogHelper.TVF_BEFORE_DESTROY);
+  }
+
+  private static void assertPhaseLogs(String phase) throws IOException {
+    assertLogContains(IoTDBLocalLogHelper.infoPlain(phase));
+    assertLogContains(IoTDBLocalLogHelper.infoFormat(phase));
+    assertLogContains(IoTDBLocalLogHelper.infoCause(phase));
+    assertLogContains(IoTDBLocalLogHelper.warnPlain(phase));
+    assertLogContains(IoTDBLocalLogHelper.warnFormat(phase));
+    assertLogContains(IoTDBLocalLogHelper.warnCause(phase));
+    assertLogContains(IoTDBLocalLogHelper.errorPlain(phase));
+    assertLogContains(IoTDBLocalLogHelper.errorFormat(phase));
+    assertLogContains(IoTDBLocalLogHelper.errorCause(phase));
+    assertLogContains(IoTDBLocalLogHelper.CAUSE_MESSAGE);
+  }
+
+  private static void assertLogContains(String content) throws IOException {
+    Assert.assertTrue("Expected log to contain: " + content, 
dataNodeWrapper.logContains(content));
+  }
+
+  private static void executeAsRoot(String... sqls) {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      for (String sql : sqls) {
+        statement.execute(sql);
+      }
+    } catch (Exception e) {
+      fail("executeAsRoot failed: " + e.getMessage());
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java
index 34f57b2940b..6174f06bb5b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java
@@ -41,7 +41,6 @@ public class SQLFunctionUtils {
   public static void createUDF(String udfName, String classPath) {
     try (Connection connection = EnvFactory.getEnv().getTableConnection();
         Statement statement = connection.createStatement()) {
-      // create
       statement.execute(String.format("create function %s as '%s'", udfName, 
classPath));
       // check
       try (ResultSet resultSet = statement.executeQuery("show functions")) {
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
index 20d35fe0ab3..7d3c1ab9bc5 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java
@@ -1958,7 +1958,7 @@ public class ColumnTransformerBuilder
 
     private final String outerGlobalQueryId;
 
-    private final long outerLocalQueryId;
+    private final long outerQueryDeadlineMs;
 
     @Nullable private final IoTDBLocalFactory ioTDBLocalFactory;
 
@@ -2006,7 +2006,7 @@ public class ColumnTransformerBuilder
         @Nullable MemoryReservationManager memoryReservationManager,
         String fragmentInstanceId,
         String outerGlobalQueryId,
-        long outerLocalQueryId,
+        long outerQueryDeadlineMs,
         @Nullable IoTDBLocalFactory ioTDBLocalFactory) {
       this.sessionInfo = sessionInfo;
       this.leafList = leafList;
@@ -2021,7 +2021,7 @@ public class ColumnTransformerBuilder
       this.memoryReservationManager = 
Optional.ofNullable(memoryReservationManager);
       this.fragmentInstanceId = fragmentInstanceId;
       this.outerGlobalQueryId = outerGlobalQueryId;
-      this.outerLocalQueryId = outerLocalQueryId;
+      this.outerQueryDeadlineMs = outerQueryDeadlineMs;
       this.ioTDBLocalFactory = ioTDBLocalFactory;
     }
 
@@ -2037,8 +2037,8 @@ public class ColumnTransformerBuilder
       return outerGlobalQueryId;
     }
 
-    public long getOuterLocalQueryId() {
-      return outerLocalQueryId;
+    public long getOuterQueryDeadlineMs() {
+      return outerQueryDeadlineMs;
     }
 
     @Nullable
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
index f12dadbb87a..86786404bc4 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
@@ -318,7 +318,7 @@ public abstract class TableOperatorGenerator<
 
     String fragmentInstanceId = getFragmentInstanceId(context);
     String outerGlobalQueryId = getQueryId(context);
-    long outerLocalQueryId = getLocalQueryId(context);
+    long outerQueryDeadlineMs = getOuterQueryDeadlineMs(context);
     IoTDBLocalFactory ioTDBLocalFactory = getIoTDBLocalFactory(context);
 
     final List<TSDataType> filterOutputDataTypes = new 
ArrayList<>(inputDataTypes);
@@ -351,7 +351,7 @@ public abstract class TableOperatorGenerator<
                           context.getMemoryReservationManager(),
                           fragmentInstanceId,
                           outerGlobalQueryId,
-                          outerLocalQueryId,
+                          outerQueryDeadlineMs,
                           ioTDBLocalFactory);
 
                   return visitor.process(p, filterColumnTransformerContext);
@@ -383,7 +383,7 @@ public abstract class TableOperatorGenerator<
             context.getMemoryReservationManager(),
             fragmentInstanceId,
             outerGlobalQueryId,
-            outerLocalQueryId,
+            outerQueryDeadlineMs,
             ioTDBLocalFactory);
 
     for (Expression expression : projectExpressions) {
@@ -416,7 +416,7 @@ public abstract class TableOperatorGenerator<
     return null;
   }
 
-  protected long getLocalQueryId(C context) {
+  protected long getOuterQueryDeadlineMs(C context) {
     return -1L;
   }
 
@@ -2520,18 +2520,12 @@ public abstract class TableOperatorGenerator<
   protected abstract SessionInfo getSessionInfo(C context);
 
   protected IoTDBLocal createIoTDBLocal(C context) {
-    IoTDBLocalFactory factory = getIoTDBLocalFactory(context);
-    String fragmentInstanceId = getFragmentInstanceId(context);
-    String outerGlobalQueryId = getQueryId(context);
-    long outerLocalQueryId = getLocalQueryId(context);
-    checkArgument(factory != null, "IoTDBLocalFactory must not be null for UDF 
execution");
-    checkArgument(
-        fragmentInstanceId != null, "fragmentInstanceId must not be null for 
UDF execution");
-    checkArgument(outerGlobalQueryId != null, "queryId must not be null for 
UDF execution");
-    checkArgument(
-        outerLocalQueryId >= 0, "outerLocalQueryId must not be negative for 
UDF execution");
-    return factory.create(
-        getSessionInfo(context), fragmentInstanceId, outerLocalQueryId, 
outerGlobalQueryId);
+    return IoTDBLocalFactory.createIoTDBLocal(
+        getIoTDBLocalFactory(context),
+        getSessionInfo(context),
+        getFragmentInstanceId(context),
+        getQueryId(context),
+        getOuterQueryDeadlineMs(context));
   }
 
   /** Factory for creating {@link IoTDBLocal} inside UDF column transformers. 
*/
@@ -2541,7 +2535,23 @@ public abstract class TableOperatorGenerator<
     IoTDBLocal create(
         SessionInfo sessionInfo,
         String fragmentInstanceId,
-        long outerLocalQueryId,
-        String outerGlobalQueryId);
+        String outerGlobalQueryId,
+        long outerQueryDeadlineMs);
+
+    static IoTDBLocal createIoTDBLocal(
+        IoTDBLocalFactory factory,
+        SessionInfo sessionInfo,
+        String fragmentInstanceId,
+        String outerGlobalQueryId,
+        long outerQueryDeadlineMs) {
+      checkArgument(factory != null, "IoTDBLocalFactory must not be null for 
UDF execution");
+      checkArgument(
+          fragmentInstanceId != null, "fragmentInstanceId must not be null for 
UDF execution");
+      checkArgument(outerGlobalQueryId != null, "queryId must not be null for 
UDF execution");
+      checkArgument(
+          outerQueryDeadlineMs > 0, "outerQueryDeadlineMs must be positive for 
UDF execution");
+      return factory.create(
+          sessionInfo, fragmentInstanceId, outerGlobalQueryId, 
outerQueryDeadlineMs);
+    }
   }
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
index 6955dd56663..0a50cc19492 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
@@ -37,8 +37,6 @@ import org.apache.tsfile.read.common.type.Type;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer {
 
   private final ScalarFunction scalarFunction;
@@ -56,27 +54,17 @@ public class UserDefineScalarFunctionTransformer extends 
MultiColumnTransformer
     super(returnType, childrenTransformers);
     this.scalarFunction = scalarFunction;
     this.parameters = parameters;
-    this.ioTDBLocal = createIoTDBLocal(context);
+    this.ioTDBLocal =
+        IoTDBLocalFactory.createIoTDBLocal(
+            context.getIoTDBLocalFactory(),
+            context.getSessionInfo(),
+            context.getFragmentInstanceId(),
+            context.getOuterGlobalQueryId(),
+            context.getOuterQueryDeadlineMs());
     this.inputTypes =
         
childrenTransformers.stream().map(ColumnTransformer::getType).collect(Collectors.toList());
   }
 
-  private static IoTDBLocal createIoTDBLocal(ColumnTransformerBuilder.Context 
context) {
-    IoTDBLocalFactory factory = context.getIoTDBLocalFactory();
-    String fragmentInstanceId = context.getFragmentInstanceId();
-    String outerGlobalQueryId = context.getOuterGlobalQueryId();
-    long outerLocalQueryId = context.getOuterLocalQueryId();
-    checkArgument(factory != null, "IoTDBLocalFactory must not be null for UDF 
execution");
-    checkArgument(
-        fragmentInstanceId != null, "fragmentInstanceId must not be null for 
UDF execution");
-    checkArgument(
-        outerGlobalQueryId != null, "outerGlobalQueryId must not be null for 
UDF execution");
-    checkArgument(
-        outerLocalQueryId >= 0, "outerLocalQueryId must not be negative for 
UDF execution");
-    return factory.create(
-        context.getSessionInfo(), fragmentInstanceId, outerLocalQueryId, 
outerGlobalQueryId);
-  }
-
   private void initIfNeeded() {
     if (init) {
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index a41660ef556..595e9da7065 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -152,8 +152,8 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
-  // Coordinator-local query id of the outer query, used by IoTDBLocal UDF
-  private long localQueryId = -1L;
+  // Outer query deadline (startTime + timeout) for IoTDBLocal UDF
+  private long outerQueryDeadlineMs = -1L;
 
   private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap;
   private DataNodeQueryContext dataNodeQueryContext;
@@ -209,7 +209,7 @@ public class FragmentInstanceContext extends QueryContext {
       IDataRegionForQuery dataRegion,
       TimePredicate globalTimePredicate,
       Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap,
-      long localQueryId,
+      long outerQueryDeadlineMs,
       boolean debug,
       boolean isVerbose) {
     FragmentInstanceContext instanceContext =
@@ -220,7 +220,7 @@ public class FragmentInstanceContext extends QueryContext {
             dataRegion,
             globalTimePredicate,
             dataNodeQueryContextMap,
-            localQueryId,
+            outerQueryDeadlineMs,
             debug,
             isVerbose);
     instanceContext.initialize();
@@ -276,7 +276,7 @@ public class FragmentInstanceContext extends QueryContext {
       IDataRegionForQuery dataRegion,
       TimePredicate globalTimePredicate,
       Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap,
-      long localQueryId,
+      long outerQueryDeadlineMs,
       boolean debug,
       boolean verbose) {
     super(debug, verbose);
@@ -284,7 +284,7 @@ public class FragmentInstanceContext extends QueryContext {
     this.stateMachine = stateMachine;
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
     this.sessionInfo = sessionInfo;
-    this.localQueryId = localQueryId;
+    this.outerQueryDeadlineMs = outerQueryDeadlineMs;
     this.dataRegion = dataRegion;
     this.globalTimeFilter =
         globalTimePredicate == null
@@ -574,8 +574,8 @@ public class FragmentInstanceContext extends QueryContext {
     return sessionInfo;
   }
 
-  public long getLocalQueryId() {
-    return localQueryId;
+  public long getOuterQueryDeadlineMs() {
+    return outerQueryDeadlineMs;
   }
 
   public Optional<Throwable> getFailureCause() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 75e447de586..18042372b80 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -167,7 +167,7 @@ public class FragmentInstanceManager {
                               dataRegion,
                               instance.getGlobalTimePredicate(),
                               dataNodeQueryContextMap,
-                              instance.getLocalQueryId(),
+                              instance.getOuterQueryDeadlineMs(),
                               instance.isDebug(),
                               instance.isVerbose());
                         });
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
index aa748cde854..7d628e78d79 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java
@@ -2116,8 +2116,8 @@ public class DataNodeTableOperatorGenerator
   }
 
   @Override
-  protected long getLocalQueryId(LocalExecutionPlanContext context) {
-    return context.getLocalQueryId();
+  protected long getOuterQueryDeadlineMs(LocalExecutionPlanContext context) {
+    return context.getOuterQueryDeadlineMs();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
index c9172bb44ca..4d78236f38d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
@@ -198,8 +198,8 @@ public class LocalExecutionPlanContext implements 
ITableOperatorGeneratorContext
     return driverContext.getFragmentInstanceContext().getId();
   }
 
-  public long getLocalQueryId() {
-    return driverContext.getFragmentInstanceContext().getLocalQueryId();
+  public long getOuterQueryDeadlineMs() {
+    return 
driverContext.getFragmentInstanceContext().getOuterQueryDeadlineMs();
   }
 
   public List<PipelineDriverFactory> getPipelineDriverFactories() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index dec7c1917b6..a39aaff6fb4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -148,7 +148,8 @@ public class SimpleFragmentParallelPlanner extends 
AbstractFragmentParallelPlann
             queryContext.isDebug(),
             fragment.isRoot(),
             queryContext.isVerbose());
-    fragmentInstance.setLocalQueryId(queryContext.getLocalQueryId());
+    fragmentInstance.setOuterQueryDeadlineMs(
+        queryContext.getStartTime() + queryContext.getTimeOut());
 
     selectExecutorAndHost(
         fragment,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 4cc98ee7d06..ff03aa006d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -83,8 +83,8 @@ public class FragmentInstance implements IConsensusRequest {
   private final boolean debug;
   private final boolean verbose;
 
-  // Coordinator-local query id, used to look up IQueryExecution on DataNode
-  private long localQueryId = -1L;
+  // Outer query deadline (startTime + timeout) for IoTDBLocal on remote 
DataNodes.
+  private long outerQueryDeadlineMs = -1L;
 
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
@@ -272,9 +272,7 @@ public class FragmentInstance implements IConsensusRequest {
         hasHostDataNode ? 
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
     fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer);
     fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer));
-    if (buffer.hasRemaining()) {
-      fragmentInstance.setLocalQueryId(ReadWriteIOUtils.readLong(buffer));
-    }
+    
fragmentInstance.setOuterQueryDeadlineMs(ReadWriteIOUtils.readLong(buffer));
     return fragmentInstance;
   }
 
@@ -302,7 +300,7 @@ public class FragmentInstance implements IConsensusRequest {
       }
       ReadWriteIOUtils.write(isExplainAnalyze, outputStream);
       ReadWriteIOUtils.write(isHighestPriority, outputStream);
-      ReadWriteIOUtils.write(localQueryId, outputStream);
+      ReadWriteIOUtils.write(outerQueryDeadlineMs, outputStream);
       return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
     } catch (IOException e) {
       LOGGER.error(
@@ -349,12 +347,12 @@ public class FragmentInstance implements 
IConsensusRequest {
     return sessionInfo;
   }
 
-  public long getLocalQueryId() {
-    return localQueryId;
+  public long getOuterQueryDeadlineMs() {
+    return outerQueryDeadlineMs;
   }
 
-  public void setLocalQueryId(long localQueryId) {
-    this.localQueryId = localQueryId;
+  public void setOuterQueryDeadlineMs(long outerQueryDeadlineMs) {
+    this.outerQueryDeadlineMs = outerQueryDeadlineMs;
   }
 
   public boolean isExplainAnalyze() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index e9695ec73f6..586bb6d61dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -186,7 +186,8 @@ public class TableModelQueryFragmentPlanner extends 
AbstractFragmentParallelPlan
             queryContext.isDebug(),
             fragment.isRoot(),
             queryContext.isVerbose());
-    fragmentInstance.setLocalQueryId(queryContext.getLocalQueryId());
+    fragmentInstance.setOuterQueryDeadlineMs(
+        queryContext.getStartTime() + queryContext.getTimeOut());
 
     selectExecutorAndHost(
         fragment,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
index 940288a83ac..630360f4858 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.QueryTimeoutException;
 import org.apache.iotdb.commons.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.common.QueryId;
-import org.apache.iotdb.db.queryengine.plan.Coordinator;
-import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.udf.api.IoTDBLocal;
 import org.apache.iotdb.udf.api.UDFResultSet;
 import org.apache.iotdb.udf.api.exception.UDFException;
@@ -42,23 +40,22 @@ public class IoTDBLocalImpl implements IoTDBLocal {
   public static final IoTDBLocalFactory FACTORY = IoTDBLocalImpl::new;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBLocalImpl.class);
-  private static final Coordinator COORDINATOR = Coordinator.getInstance();
 
   private final SessionInfo sessionInfo;
   private final String fragmentInstanceId;
-  private final long outerLocalQueryId;
   private final QueryId outerGlobalQueryId;
+  private final long outerQueryDeadlineMs;
   private final List<UDFResultSetImpl> openResultSets = new ArrayList<>();
 
   public IoTDBLocalImpl(
       SessionInfo sessionInfo,
       String fragmentInstanceId,
-      long outerLocalQueryId,
-      String outerGlobalQueryId) {
+      String outerGlobalQueryId,
+      long outerQueryDeadlineMs) {
     this.sessionInfo = sessionInfo;
     this.fragmentInstanceId = fragmentInstanceId;
-    this.outerLocalQueryId = outerLocalQueryId;
     this.outerGlobalQueryId = QueryId.valueOf(outerGlobalQueryId);
+    this.outerQueryDeadlineMs = outerQueryDeadlineMs;
   }
 
   @Override
@@ -97,12 +94,10 @@ public class IoTDBLocalImpl implements IoTDBLocal {
   }
 
   private long computeRemainingTimeoutMs() {
-    IQueryExecution execution = 
COORDINATOR.getQueryExecution(outerLocalQueryId);
-    if (execution == null) {
+    if (outerQueryDeadlineMs <= 0) {
       return 0;
     }
-    return execution.getTimeout()
-        - (System.currentTimeMillis() - execution.getStartExecutionTime());
+    return outerQueryDeadlineMs - System.currentTimeMillis();
   }
 
   @Override

Reply via email to