This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch IoTDBLocal in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a5e0539e3c7d61c06c9d262e2b1446d4d597cd00 Author: Weihao Li <[email protected]> AuthorDate: Fri Jun 26 02:55:38 2026 +0800 fix IT and add more Signed-off-by: Weihao Li <[email protected]> --- .../iotdblocal/IoTDBLocalLogAggregateFunction.java | 133 +++++++++++++++++ .../iotdblocal/IoTDBLocalLogFunction.java | 46 +++--- .../relational/iotdblocal/IoTDBLocalLogHelper.java | 93 ++++++++++++ .../iotdblocal/IoTDBLocalLogTableFunction.java | 134 +++++++++++++++++ .../relational/it/db/it/udf/IoTDBLocalIT.java | 62 +++----- .../relational/it/db/it/udf/IoTDBLocalLogIT.java | 159 +++++++++++++++++++++ .../relational/it/db/it/udf/SQLFunctionUtils.java | 1 - .../relational/ColumnTransformerBuilder.java | 10 +- .../calc/plan/planner/TableOperatorGenerator.java | 46 +++--- .../udf/UserDefineScalarFunctionTransformer.java | 26 +--- .../fragment/FragmentInstanceContext.java | 16 +-- .../fragment/FragmentInstanceManager.java | 2 +- .../planner/DataNodeTableOperatorGenerator.java | 4 +- .../plan/planner/LocalExecutionPlanContext.java | 4 +- .../SimpleFragmentParallelPlanner.java | 3 +- .../plan/planner/plan/FragmentInstance.java | 18 ++- .../distribute/TableModelQueryFragmentPlanner.java | 3 +- .../iotdb/db/queryengine/udf/IoTDBLocalImpl.java | 17 +-- 18 files changed, 631 insertions(+), 146 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogAggregateFunction.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogAggregateFunction.java new file mode 100644 index 00000000000..7b7630bf5b0 --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogAggregateFunction.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.example.relational.iotdblocal; + +import org.apache.iotdb.udf.api.IoTDBLocal; +import org.apache.iotdb.udf.api.State; +import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis; +import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments; +import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; +import org.apache.iotdb.udf.api.relational.AggregateFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.type.Type; +import org.apache.iotdb.udf.api.utils.ResultValue; + +import java.nio.ByteBuffer; + +/** Exercises IoTDBLocal log APIs at each aggregate-function lifecycle hook. */ +public class IoTDBLocalLogAggregateFunction implements AggregateFunction { + + private static class CountState implements State { + long count; + boolean addInputLogged; + boolean combineStateLogged; + boolean outputFinalLogged; + boolean destroyLogged; + + @Override + public void reset() { + count = 0; + } + + @Override + public byte[] serialize() { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(count); + return buffer.array(); + } + + @Override + public void deserialize(byte[] bytes) { + count = ByteBuffer.wrap(bytes).getLong(); + } + } + + @Override + public AggregateFunctionAnalysis analyze(FunctionArguments arguments) + throws UDFArgumentNotValidException { + if (arguments.getArgumentsSize() != 1) { + throw new UDFArgumentNotValidException("IoTDBLocalLogAggregateFunction accepts one column"); + } + return new AggregateFunctionAnalysis.Builder().outputDataType(Type.INT64).build(); + } + + @Override + public void beforeStart(FunctionArguments arguments, IoTDBLocal local) { + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.UDAF_BEFORE_START); + } + + @Override + public State createState() { + return new CountState(); + } + + @Override + public void addInput(State state, Record input) { + CountState countState = (CountState) state; + countState.count++; + } + + @Override + public void addInput(State state, Record input, IoTDBLocal local) { + CountState countState = (CountState) state; + if (!countState.addInputLogged) { + countState.addInputLogged = true; + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.UDAF_ADD_INPUT); + } + addInput(state, input); + } + + @Override + public void combineState(State state, State rhs) { + CountState left = (CountState) state; + CountState right = (CountState) rhs; + left.count += right.count; + } + + @Override + public void combineState(State state, State rhs, IoTDBLocal local) { + CountState countState = (CountState) state; + if (!countState.combineStateLogged) { + countState.combineStateLogged = true; + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.UDAF_COMBINE_STATE); + } + combineState(state, rhs); + } + + @Override + public void outputFinal(State state, ResultValue resultValue) { + resultValue.setLong(((CountState) state).count); + } + + @Override + public void outputFinal(State state, ResultValue resultValue, IoTDBLocal local) { + CountState countState = (CountState) state; + if (!countState.outputFinalLogged) { + countState.outputFinalLogged = true; + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.UDAF_OUTPUT_FINAL); + } + outputFinal(state, resultValue); + } + + @Override + public void beforeDestroy(IoTDBLocal local) { + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.UDAF_BEFORE_DESTROY); + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogFunction.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogFunction.java index b22c5a1841f..720828dcf1b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogFunction.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogFunction.java @@ -27,18 +27,14 @@ import org.apache.iotdb.udf.api.relational.ScalarFunction; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.type.Type; -/** Exercises all IoTDBLocal log APIs with distinctive markers for integration tests. */ +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; + +/** Exercises IoTDBLocal log APIs at each scalar-function lifecycle hook. */ public class IoTDBLocalLogFunction implements ScalarFunction { - public static final String INFO_PLAIN = "IOTDB_LOCAL_IT_INFO_PLAIN"; - public static final String INFO_FORMAT = "IOTDB_LOCAL_IT_INFO_FORMAT loaded 3 rows"; - public static final String INFO_CAUSE = "IOTDB_LOCAL_IT_INFO_CAUSE"; - public static final String WARN_PLAIN = "IOTDB_LOCAL_IT_WARN_PLAIN"; - public static final String WARN_FORMAT = "IOTDB_LOCAL_IT_WARN_FORMAT warn ab"; - public static final String WARN_CAUSE = "IOTDB_LOCAL_IT_WARN_CAUSE"; - public static final String ERROR_PLAIN = "IOTDB_LOCAL_IT_ERROR_PLAIN"; - public static final String ERROR_FORMAT = "IOTDB_LOCAL_IT_ERROR_FORMAT error code=500"; - public static final String ERROR_CAUSE = "IOTDB_LOCAL_IT_ERROR_CAUSE"; + private boolean evaluateLogged; + private boolean destroyLogged; @Override public ScalarFunctionAnalysis analyze(FunctionArguments arguments) @@ -48,20 +44,28 @@ public class IoTDBLocalLogFunction implements ScalarFunction { @Override public void beforeStart(FunctionArguments arguments, IoTDBLocal local) { - RuntimeException cause = new RuntimeException("iotdb-local-it-log-cause"); - local.info(INFO_PLAIN); - local.info("IOTDB_LOCAL_IT_INFO_FORMAT loaded {} rows", 3); - local.info(INFO_CAUSE, cause); - local.warn(WARN_PLAIN); - local.warn("IOTDB_LOCAL_IT_WARN_FORMAT warn {} {}", "a", "b"); - local.warn(WARN_CAUSE, cause); - local.error(ERROR_PLAIN); - local.error("IOTDB_LOCAL_IT_ERROR_FORMAT error code={}", 500); - local.error(ERROR_CAUSE, cause); + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.SCALAR_BEFORE_START); } @Override public Object evaluate(Record input) { - return "ok"; + return new Binary("ok", TSFileConfig.STRING_CHARSET); + } + + @Override + public Object evaluate(Record input, IoTDBLocal local) { + if (!evaluateLogged) { + evaluateLogged = true; + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.SCALAR_EVALUATE); + } + return evaluate(input); + } + + @Override + public void beforeDestroy(IoTDBLocal local) { + if (!destroyLogged) { + destroyLogged = true; + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.SCALAR_BEFORE_DESTROY); + } } } diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogHelper.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogHelper.java new file mode 100644 index 00000000000..970b25c3e84 --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogHelper.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.example.relational.iotdblocal; + +import org.apache.iotdb.udf.api.IoTDBLocal; + +/** Shared IoTDBLocal log markers for integration tests. */ +public final class IoTDBLocalLogHelper { + + public static final String CAUSE_MESSAGE = "iotdb-local-it-log-cause"; + + public static final String SCALAR_BEFORE_START = "IOTDB_LOCAL_SCALAR_BEFORE_START"; + public static final String SCALAR_EVALUATE = "IOTDB_LOCAL_SCALAR_EVALUATE"; + public static final String SCALAR_BEFORE_DESTROY = "IOTDB_LOCAL_SCALAR_BEFORE_DESTROY"; + + public static final String UDAF_BEFORE_START = "IOTDB_LOCAL_UDAF_BEFORE_START"; + public static final String UDAF_ADD_INPUT = "IOTDB_LOCAL_UDAF_ADD_INPUT"; + public static final String UDAF_COMBINE_STATE = "IOTDB_LOCAL_UDAF_COMBINE_STATE"; + public static final String UDAF_OUTPUT_FINAL = "IOTDB_LOCAL_UDAF_OUTPUT_FINAL"; + public static final String UDAF_BEFORE_DESTROY = "IOTDB_LOCAL_UDAF_BEFORE_DESTROY"; + + public static final String TVF_BEFORE_START = "IOTDB_LOCAL_TVF_BEFORE_START"; + public static final String TVF_PROCESS = "IOTDB_LOCAL_TVF_PROCESS"; + public static final String TVF_BEFORE_DESTROY = "IOTDB_LOCAL_TVF_BEFORE_DESTROY"; + + private IoTDBLocalLogHelper() {} + + public static void logAllApis(IoTDBLocal local, String phase) { + RuntimeException cause = new RuntimeException(CAUSE_MESSAGE); + local.info(phase + "_INFO_PLAIN"); + local.info(phase + "_INFO_FORMAT loaded {} rows", 3); + local.info(phase + "_INFO_CAUSE", cause); + local.warn(phase + "_WARN_PLAIN"); + local.warn(phase + "_WARN_FORMAT warn {} {}", "a", "b"); + local.warn(phase + "_WARN_CAUSE", cause); + local.error(phase + "_ERROR_PLAIN"); + local.error(phase + "_ERROR_FORMAT error code={}", 500); + local.error(phase + "_ERROR_CAUSE", cause); + } + + public static String infoPlain(String phase) { + return phase + "_INFO_PLAIN"; + } + + public static String infoFormat(String phase) { + return phase + "_INFO_FORMAT loaded 3 rows"; + } + + public static String infoCause(String phase) { + return phase + "_INFO_CAUSE"; + } + + public static String warnPlain(String phase) { + return phase + "_WARN_PLAIN"; + } + + public static String warnFormat(String phase) { + return phase + "_WARN_FORMAT warn a b"; + } + + public static String warnCause(String phase) { + return phase + "_WARN_CAUSE"; + } + + public static String errorPlain(String phase) { + return phase + "_ERROR_PLAIN"; + } + + public static String errorFormat(String phase) { + return phase + "_ERROR_FORMAT error code=500"; + } + + public static String errorCause(String phase) { + return phase + "_ERROR_CAUSE"; + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogTableFunction.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogTableFunction.java new file mode 100644 index 00000000000..39dfa057751 --- /dev/null +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/iotdblocal/IoTDBLocalLogTableFunction.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.example.relational.iotdblocal; + +import org.apache.iotdb.udf.api.IoTDBLocal; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor; +import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.utils.Binary; + +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Exercises IoTDBLocal log APIs at each table-function lifecycle hook. */ +public class IoTDBLocalLogTableFunction implements TableFunction { + + private static final String INPUT_PARAMETER_NAME = "input"; + + @Override + public List<ParameterSpecification> getArgumentsSpecifications() { + return Collections.singletonList( + ScalarParameterSpecification.builder() + .name(INPUT_PARAMETER_NAME) + .type(Type.STRING) + .build()); + } + + @Override + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { + DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + INPUT_PARAMETER_NAME, + ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()) + .build(); + return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); + } + + @Override + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionLeafProcessor getSplitProcessor() { + return new LogSplitProcessor( + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME)); + } + }; + } + + private static class LogSplitProcessor implements TableFunctionLeafProcessor { + private final String input; + private boolean finish; + private boolean processLogged; + private boolean destroyLogged; + + LogSplitProcessor(String input) { + this.input = input; + } + + @Override + public void beforeStart(IoTDBLocal local) { + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.TVF_BEFORE_START); + } + + @Override + public void process(List<ColumnBuilder> columnBuilders) { + for (String value : input.split(",")) { + columnBuilders.get(0).writeBinary(new Binary(value, Charset.defaultCharset())); + } + finish = true; + } + + @Override + public void process(List<ColumnBuilder> columnBuilders, IoTDBLocal local) { + if (!processLogged) { + processLogged = true; + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.TVF_PROCESS); + } + process(columnBuilders); + } + + @Override + public boolean isFinish() { + return finish; + } + + @Override + public void beforeDestroy(IoTDBLocal local) { + if (!destroyLogged) { + destroyLogged = true; + IoTDBLocalLogHelper.logAllApis(local, IoTDBLocalLogHelper.TVF_BEFORE_DESTROY); + } + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalIT.java index c50d360ba66..0cebd2afe7a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalIT.java @@ -19,7 +19,6 @@ package org.apache.iotdb.relational.it.db.it.udf; -import org.apache.iotdb.db.query.udf.example.relational.iotdblocal.IoTDBLocalLogFunction; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.TableClusterIT; @@ -27,13 +26,11 @@ import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.io.IOException; import java.sql.Connection; import java.sql.Statement; @@ -43,7 +40,7 @@ import static org.junit.Assert.fail; /** * Integration tests for {@link org.apache.iotdb.udf.api.IoTDBLocal} in table-model UDF, covering - * compatibility, embedded query, logging, permission inheritance and auto resource cleanup. + * compatibility, embedded query, permission inheritance and auto resource cleanup. */ @RunWith(IoTDBTestRunner.class) @Category({TableLocalStandaloneIT.class, TableClusterIT.class}) @@ -118,8 +115,8 @@ public class IoTDBLocalIT { new String[] { "1970-01-01T00:00:00.001Z,false,false,", "1970-01-01T00:00:00.002Z,true,true,", - "1970-01-01T00:00:00.003Z,true,false,", - "1970-01-01T00:00:00.005Z,true,true," + "1970-01-01T00:00:00.003Z,false,false,", + "1970-01-01T00:00:00.005Z,true,false," }, DATABASE_NAME); tableResultSetEqualTest( @@ -128,8 +125,8 @@ public class IoTDBLocalIT { new String[] { "1970-01-01T00:00:00.001Z,2,", "1970-01-01T00:00:00.002Z,2,", - "1970-01-01T00:00:00.003Z,3,", - "5,4," + "1970-01-01T00:00:00.003Z,6,", + "1970-01-01T00:00:00.005Z,4," }, DATABASE_NAME); } @@ -140,7 +137,7 @@ public class IoTDBLocalIT { tableResultSetEqualTest( "SELECT device_id, my_avg(s1) AS avg_s1 FROM vehicle GROUP BY device_id", new String[] {"device_id", "avg_s1"}, - new String[] {"d0,2.0,"}, + new String[] {"d0,2.6666666666666665,"}, DATABASE_NAME); } @@ -149,12 +146,12 @@ public class IoTDBLocalIT { SQLFunctionUtils.createUDF("my_split", PKG + ".MySplit"); SQLFunctionUtils.createUDF("my_repeat", PKG + ".MyRepeatWithoutIndex"); tableResultSetEqualTest( - "SELECT * FROM TABLE(my_split('a,b,c'))", + "select * from my_split('a,b,c')", new String[] {"output"}, new String[] {"a,", "b,", "c,"}, DATABASE_NAME); tableResultSetEqualTest( - "SELECT * FROM my_repeat(vehicle, 2) ORDER BY time", + "select * from my_repeat((select time, device_id, s1 from vehicle), 2) order by time", new String[] {"time", "device_id", "s1"}, new String[] { "1970-01-01T00:00:00.001Z,d0,1,", @@ -178,7 +175,9 @@ public class IoTDBLocalIT { "SELECT time, device_id, device_name(device_id) AS name, temperature FROM readings ORDER BY time", new String[] {"time", "device_id", "name", "temperature"}, new String[] { - "1000,d1,一号车间温度传感器,25.5,", "1001,d2,二号车间温度传感器,32.0,", "1002,d3,未知设备,20.0,", + "1970-01-01T00:00:01.000Z,d1,一号车间温度传感器,25.5,", + "1970-01-01T00:00:01.001Z,d2,二号车间温度传感器,32.0,", + "1970-01-01T00:00:01.002Z,d3,未知设备,20.0,", }, DATABASE_NAME); } @@ -190,9 +189,9 @@ public class IoTDBLocalIT { "SELECT time, device_id, temperature, device_summary(device_id) AS summary FROM readings ORDER BY time", new String[] {"time", "device_id", "temperature", "summary"}, new String[] { - "1000,d1,25.5,一号车间温度传感器(上限:30.0),", - "1001,d2,32.0,二号车间温度传感器(上限:35.0),", - "1002,d3,20.0,未知设备(上限:未知),", + "1970-01-01T00:00:01.000Z,d1,25.5,一号车间温度传感器(上限:30.0),", + "1970-01-01T00:00:01.001Z,d2,32.0,二号车间温度传感器(上限:35.0),", + "1970-01-01T00:00:01.002Z,d3,20.0,未知设备(上限:未知),", }, DATABASE_NAME); } @@ -213,35 +212,6 @@ public class IoTDBLocalIT { DATABASE_NAME); } - // ── IoTDBLocal logging ────────────────────────────────────────────────────── - - @Test - public void testIoTDBLocalLogApis() throws IOException { - SQLFunctionUtils.createUDF("iotdb_local_log", IOTDB_LOCAL_PKG + ".IoTDBLocalLogFunction"); - EnvFactory.getEnv().getDataNodeWrapper(0).clearLogContent(); - tableResultSetEqualTest( - "SELECT iotdb_local_log(device_id) AS log_ok FROM readings WHERE device_id = 'd1'", - new String[] {"log_ok"}, - new String[] {"ok,"}, - DATABASE_NAME); - assertLogContains(IoTDBLocalLogFunction.INFO_PLAIN); - assertLogContains(IoTDBLocalLogFunction.INFO_FORMAT); - assertLogContains(IoTDBLocalLogFunction.INFO_CAUSE); - assertLogContains(IoTDBLocalLogFunction.WARN_PLAIN); - assertLogContains(IoTDBLocalLogFunction.WARN_FORMAT); - assertLogContains(IoTDBLocalLogFunction.WARN_CAUSE); - assertLogContains(IoTDBLocalLogFunction.ERROR_PLAIN); - assertLogContains(IoTDBLocalLogFunction.ERROR_FORMAT); - assertLogContains(IoTDBLocalLogFunction.ERROR_CAUSE); - assertLogContains("iotdb-local-it-log-cause"); - } - - private static void assertLogContains(String content) throws IOException { - Assert.assertTrue( - "Expected log to contain: " + content, - EnvFactory.getEnv().getDataNodeWrapper(0).logContains(content)); - } - // ── permission inheritance ────────────────────────────────────────────────── @Test @@ -252,9 +222,9 @@ public class IoTDBLocalIT { "SELECT device_name(device_id) AS name FROM readings WHERE device_id = 'd1'", new String[] {"name"}, new String[] {"一号车间温度传感器,"}, - DATABASE_NAME, LIMITED_USER, - LIMITED_PASSWORD); + LIMITED_PASSWORD, + DATABASE_NAME); dropLimitedUser(); } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalLogIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalLogIT.java new file mode 100644 index 00000000000..c045357296a --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/IoTDBLocalLogIT.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it.udf; + +import org.apache.iotdb.db.query.udf.example.relational.iotdblocal.IoTDBLocalLogHelper; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; +import static org.junit.Assert.fail; + +/** + * Verifies {@link org.apache.iotdb.udf.api.IoTDBLocal} logging APIs on DataNode log output. Runs + * only in 1C1D ({@link TableLocalStandaloneIT}) because log inspection uses {@code + * DataNodeWrapper(0)}. + */ +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class}) +public class IoTDBLocalLogIT { + + private static final String DATABASE_NAME = "iotdb_local_log_it"; + private static final String IOTDB_LOCAL_PKG = + "org.apache.iotdb.db.query.udf.example.relational.iotdblocal"; + + private static DataNodeWrapper dataNodeWrapper; + + private static final String[] SETUP_SQLS = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "CREATE TABLE readings (device_id STRING TAG, temperature DOUBLE FIELD)", + "CREATE TABLE vehicle (device_id STRING TAG, s1 INT32 FIELD)", + "INSERT INTO readings(time, device_id, temperature) VALUES (1000, 'd1', 25.5)", + "INSERT INTO vehicle(time, device_id, s1) VALUES (1, 'd0', 1), (2, 'd0', 2), (3, 'd0', 3)", + "FLUSH", + }; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0); + executeAsRoot(SETUP_SQLS); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @After + public void dropFunctions() { + SQLFunctionUtils.dropAllUDF(); + } + + @Test + public void testScalarFunctionLogApis() throws IOException { + SQLFunctionUtils.createUDF( + "iotdb_local_log_scalar", IOTDB_LOCAL_PKG + ".IoTDBLocalLogFunction"); + dataNodeWrapper.clearLogContent(); + tableResultSetEqualTest( + "SELECT iotdb_local_log_scalar(device_id) AS log_ok FROM readings WHERE device_id = 'd1'", + new String[] {"log_ok"}, + new String[] {"ok,"}, + DATABASE_NAME); + assertPhaseLogs(IoTDBLocalLogHelper.SCALAR_BEFORE_START); + assertPhaseLogs(IoTDBLocalLogHelper.SCALAR_EVALUATE); + assertPhaseLogs(IoTDBLocalLogHelper.SCALAR_BEFORE_DESTROY); + } + + @Test + public void testAggregateFunctionLogApis() throws IOException { + SQLFunctionUtils.createUDF( + "iotdb_local_log_udaf", IOTDB_LOCAL_PKG + ".IoTDBLocalLogAggregateFunction"); + dataNodeWrapper.clearLogContent(); + tableResultSetEqualTest( + "SELECT iotdb_local_log_udaf(s1) AS row_count FROM vehicle", + new String[] {"row_count"}, + new String[] {"3,"}, + DATABASE_NAME); + assertPhaseLogs(IoTDBLocalLogHelper.UDAF_BEFORE_START); + assertPhaseLogs(IoTDBLocalLogHelper.UDAF_ADD_INPUT); + assertPhaseLogs(IoTDBLocalLogHelper.UDAF_OUTPUT_FINAL); + assertPhaseLogs(IoTDBLocalLogHelper.UDAF_BEFORE_DESTROY); + } + + @Test + public void testTableFunctionLogApis() throws IOException { + SQLFunctionUtils.createUDF( + "iotdb_local_log_split", IOTDB_LOCAL_PKG + ".IoTDBLocalLogTableFunction"); + dataNodeWrapper.clearLogContent(); + tableResultSetEqualTest( + "SELECT * FROM iotdb_local_log_split('a,b')", + new String[] {"output"}, + new String[] {"a,", "b,"}, + DATABASE_NAME); + assertPhaseLogs(IoTDBLocalLogHelper.TVF_BEFORE_START); + assertPhaseLogs(IoTDBLocalLogHelper.TVF_PROCESS); + assertPhaseLogs(IoTDBLocalLogHelper.TVF_BEFORE_DESTROY); + } + + private static void assertPhaseLogs(String phase) throws IOException { + assertLogContains(IoTDBLocalLogHelper.infoPlain(phase)); + assertLogContains(IoTDBLocalLogHelper.infoFormat(phase)); + assertLogContains(IoTDBLocalLogHelper.infoCause(phase)); + assertLogContains(IoTDBLocalLogHelper.warnPlain(phase)); + assertLogContains(IoTDBLocalLogHelper.warnFormat(phase)); + assertLogContains(IoTDBLocalLogHelper.warnCause(phase)); + assertLogContains(IoTDBLocalLogHelper.errorPlain(phase)); + assertLogContains(IoTDBLocalLogHelper.errorFormat(phase)); + assertLogContains(IoTDBLocalLogHelper.errorCause(phase)); + assertLogContains(IoTDBLocalLogHelper.CAUSE_MESSAGE); + } + + private static void assertLogContains(String content) throws IOException { + Assert.assertTrue("Expected log to contain: " + content, dataNodeWrapper.logContains(content)); + } + + private static void executeAsRoot(String... sqls) { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : sqls) { + statement.execute(sql); + } + } catch (Exception e) { + fail("executeAsRoot failed: " + e.getMessage()); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java index 34f57b2940b..6174f06bb5b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/udf/SQLFunctionUtils.java @@ -41,7 +41,6 @@ public class SQLFunctionUtils { public static void createUDF(String udfName, String classPath) { try (Connection connection = EnvFactory.getEnv().getTableConnection(); Statement statement = connection.createStatement()) { - // create statement.execute(String.format("create function %s as '%s'", udfName, classPath)); // check try (ResultSet resultSet = statement.executeQuery("show functions")) { diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java index 20d35fe0ab3..7d3c1ab9bc5 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/relational/ColumnTransformerBuilder.java @@ -1958,7 +1958,7 @@ public class ColumnTransformerBuilder private final String outerGlobalQueryId; - private final long outerLocalQueryId; + private final long outerQueryDeadlineMs; @Nullable private final IoTDBLocalFactory ioTDBLocalFactory; @@ -2006,7 +2006,7 @@ public class ColumnTransformerBuilder @Nullable MemoryReservationManager memoryReservationManager, String fragmentInstanceId, String outerGlobalQueryId, - long outerLocalQueryId, + long outerQueryDeadlineMs, @Nullable IoTDBLocalFactory ioTDBLocalFactory) { this.sessionInfo = sessionInfo; this.leafList = leafList; @@ -2021,7 +2021,7 @@ public class ColumnTransformerBuilder this.memoryReservationManager = Optional.ofNullable(memoryReservationManager); this.fragmentInstanceId = fragmentInstanceId; this.outerGlobalQueryId = outerGlobalQueryId; - this.outerLocalQueryId = outerLocalQueryId; + this.outerQueryDeadlineMs = outerQueryDeadlineMs; this.ioTDBLocalFactory = ioTDBLocalFactory; } @@ -2037,8 +2037,8 @@ public class ColumnTransformerBuilder return outerGlobalQueryId; } - public long getOuterLocalQueryId() { - return outerLocalQueryId; + public long getOuterQueryDeadlineMs() { + return outerQueryDeadlineMs; } @Nullable diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java index f12dadbb87a..86786404bc4 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java @@ -318,7 +318,7 @@ public abstract class TableOperatorGenerator< String fragmentInstanceId = getFragmentInstanceId(context); String outerGlobalQueryId = getQueryId(context); - long outerLocalQueryId = getLocalQueryId(context); + long outerQueryDeadlineMs = getOuterQueryDeadlineMs(context); IoTDBLocalFactory ioTDBLocalFactory = getIoTDBLocalFactory(context); final List<TSDataType> filterOutputDataTypes = new ArrayList<>(inputDataTypes); @@ -351,7 +351,7 @@ public abstract class TableOperatorGenerator< context.getMemoryReservationManager(), fragmentInstanceId, outerGlobalQueryId, - outerLocalQueryId, + outerQueryDeadlineMs, ioTDBLocalFactory); return visitor.process(p, filterColumnTransformerContext); @@ -383,7 +383,7 @@ public abstract class TableOperatorGenerator< context.getMemoryReservationManager(), fragmentInstanceId, outerGlobalQueryId, - outerLocalQueryId, + outerQueryDeadlineMs, ioTDBLocalFactory); for (Expression expression : projectExpressions) { @@ -416,7 +416,7 @@ public abstract class TableOperatorGenerator< return null; } - protected long getLocalQueryId(C context) { + protected long getOuterQueryDeadlineMs(C context) { return -1L; } @@ -2520,18 +2520,12 @@ public abstract class TableOperatorGenerator< protected abstract SessionInfo getSessionInfo(C context); protected IoTDBLocal createIoTDBLocal(C context) { - IoTDBLocalFactory factory = getIoTDBLocalFactory(context); - String fragmentInstanceId = getFragmentInstanceId(context); - String outerGlobalQueryId = getQueryId(context); - long outerLocalQueryId = getLocalQueryId(context); - checkArgument(factory != null, "IoTDBLocalFactory must not be null for UDF execution"); - checkArgument( - fragmentInstanceId != null, "fragmentInstanceId must not be null for UDF execution"); - checkArgument(outerGlobalQueryId != null, "queryId must not be null for UDF execution"); - checkArgument( - outerLocalQueryId >= 0, "outerLocalQueryId must not be negative for UDF execution"); - return factory.create( - getSessionInfo(context), fragmentInstanceId, outerLocalQueryId, outerGlobalQueryId); + return IoTDBLocalFactory.createIoTDBLocal( + getIoTDBLocalFactory(context), + getSessionInfo(context), + getFragmentInstanceId(context), + getQueryId(context), + getOuterQueryDeadlineMs(context)); } /** Factory for creating {@link IoTDBLocal} inside UDF column transformers. */ @@ -2541,7 +2535,23 @@ public abstract class TableOperatorGenerator< IoTDBLocal create( SessionInfo sessionInfo, String fragmentInstanceId, - long outerLocalQueryId, - String outerGlobalQueryId); + String outerGlobalQueryId, + long outerQueryDeadlineMs); + + static IoTDBLocal createIoTDBLocal( + IoTDBLocalFactory factory, + SessionInfo sessionInfo, + String fragmentInstanceId, + String outerGlobalQueryId, + long outerQueryDeadlineMs) { + checkArgument(factory != null, "IoTDBLocalFactory must not be null for UDF execution"); + checkArgument( + fragmentInstanceId != null, "fragmentInstanceId must not be null for UDF execution"); + checkArgument(outerGlobalQueryId != null, "queryId must not be null for UDF execution"); + checkArgument( + outerQueryDeadlineMs > 0, "outerQueryDeadlineMs must be positive for UDF execution"); + return factory.create( + sessionInfo, fragmentInstanceId, outerGlobalQueryId, outerQueryDeadlineMs); + } } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java index 6955dd56663..0a50cc19492 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java @@ -37,8 +37,6 @@ import org.apache.tsfile.read.common.type.Type; import java.util.List; import java.util.stream.Collectors; -import static com.google.common.base.Preconditions.checkArgument; - public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer { private final ScalarFunction scalarFunction; @@ -56,27 +54,17 @@ public class UserDefineScalarFunctionTransformer extends MultiColumnTransformer super(returnType, childrenTransformers); this.scalarFunction = scalarFunction; this.parameters = parameters; - this.ioTDBLocal = createIoTDBLocal(context); + this.ioTDBLocal = + IoTDBLocalFactory.createIoTDBLocal( + context.getIoTDBLocalFactory(), + context.getSessionInfo(), + context.getFragmentInstanceId(), + context.getOuterGlobalQueryId(), + context.getOuterQueryDeadlineMs()); this.inputTypes = childrenTransformers.stream().map(ColumnTransformer::getType).collect(Collectors.toList()); } - private static IoTDBLocal createIoTDBLocal(ColumnTransformerBuilder.Context context) { - IoTDBLocalFactory factory = context.getIoTDBLocalFactory(); - String fragmentInstanceId = context.getFragmentInstanceId(); - String outerGlobalQueryId = context.getOuterGlobalQueryId(); - long outerLocalQueryId = context.getOuterLocalQueryId(); - checkArgument(factory != null, "IoTDBLocalFactory must not be null for UDF execution"); - checkArgument( - fragmentInstanceId != null, "fragmentInstanceId must not be null for UDF execution"); - checkArgument( - outerGlobalQueryId != null, "outerGlobalQueryId must not be null for UDF execution"); - checkArgument( - outerLocalQueryId >= 0, "outerLocalQueryId must not be negative for UDF execution"); - return factory.create( - context.getSessionInfo(), fragmentInstanceId, outerLocalQueryId, outerGlobalQueryId); - } - private void initIfNeeded() { if (init) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index a41660ef556..595e9da7065 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -152,8 +152,8 @@ public class FragmentInstanceContext extends QueryContext { // session info private SessionInfo sessionInfo; - // Coordinator-local query id of the outer query, used by IoTDBLocal UDF - private long localQueryId = -1L; + // Outer query deadline (startTime + timeout) for IoTDBLocal UDF + private long outerQueryDeadlineMs = -1L; private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap; private DataNodeQueryContext dataNodeQueryContext; @@ -209,7 +209,7 @@ public class FragmentInstanceContext extends QueryContext { IDataRegionForQuery dataRegion, TimePredicate globalTimePredicate, Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap, - long localQueryId, + long outerQueryDeadlineMs, boolean debug, boolean isVerbose) { FragmentInstanceContext instanceContext = @@ -220,7 +220,7 @@ public class FragmentInstanceContext extends QueryContext { dataRegion, globalTimePredicate, dataNodeQueryContextMap, - localQueryId, + outerQueryDeadlineMs, debug, isVerbose); instanceContext.initialize(); @@ -276,7 +276,7 @@ public class FragmentInstanceContext extends QueryContext { IDataRegionForQuery dataRegion, TimePredicate globalTimePredicate, Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap, - long localQueryId, + long outerQueryDeadlineMs, boolean debug, boolean verbose) { super(debug, verbose); @@ -284,7 +284,7 @@ public class FragmentInstanceContext extends QueryContext { this.stateMachine = stateMachine; this.executionEndTime.set(END_TIME_INITIAL_VALUE); this.sessionInfo = sessionInfo; - this.localQueryId = localQueryId; + this.outerQueryDeadlineMs = outerQueryDeadlineMs; this.dataRegion = dataRegion; this.globalTimeFilter = globalTimePredicate == null @@ -574,8 +574,8 @@ public class FragmentInstanceContext extends QueryContext { return sessionInfo; } - public long getLocalQueryId() { - return localQueryId; + public long getOuterQueryDeadlineMs() { + return outerQueryDeadlineMs; } public Optional<Throwable> getFailureCause() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 75e447de586..18042372b80 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -167,7 +167,7 @@ public class FragmentInstanceManager { dataRegion, instance.getGlobalTimePredicate(), dataNodeQueryContextMap, - instance.getLocalQueryId(), + instance.getOuterQueryDeadlineMs(), instance.isDebug(), instance.isVerbose()); }); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index aa748cde854..7d628e78d79 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -2116,8 +2116,8 @@ public class DataNodeTableOperatorGenerator } @Override - protected long getLocalQueryId(LocalExecutionPlanContext context) { - return context.getLocalQueryId(); + protected long getOuterQueryDeadlineMs(LocalExecutionPlanContext context) { + return context.getOuterQueryDeadlineMs(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java index c9172bb44ca..4d78236f38d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java @@ -198,8 +198,8 @@ public class LocalExecutionPlanContext implements ITableOperatorGeneratorContext return driverContext.getFragmentInstanceContext().getId(); } - public long getLocalQueryId() { - return driverContext.getFragmentInstanceContext().getLocalQueryId(); + public long getOuterQueryDeadlineMs() { + return driverContext.getFragmentInstanceContext().getOuterQueryDeadlineMs(); } public List<PipelineDriverFactory> getPipelineDriverFactories() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index dec7c1917b6..a39aaff6fb4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -148,7 +148,8 @@ public class SimpleFragmentParallelPlanner extends AbstractFragmentParallelPlann queryContext.isDebug(), fragment.isRoot(), queryContext.isVerbose()); - fragmentInstance.setLocalQueryId(queryContext.getLocalQueryId()); + fragmentInstance.setOuterQueryDeadlineMs( + queryContext.getStartTime() + queryContext.getTimeOut()); selectExecutorAndHost( fragment, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index 4cc98ee7d06..ff03aa006d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -83,8 +83,8 @@ public class FragmentInstance implements IConsensusRequest { private final boolean debug; private final boolean verbose; - // Coordinator-local query id, used to look up IQueryExecution on DataNode - private long localQueryId = -1L; + // Outer query deadline (startTime + timeout) for IoTDBLocal on remote DataNodes. + private long outerQueryDeadlineMs = -1L; // We can add some more params for a specific FragmentInstance // So that we can make different FragmentInstance owns different data range. @@ -272,9 +272,7 @@ public class FragmentInstance implements IConsensusRequest { hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null; fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer); fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer)); - if (buffer.hasRemaining()) { - fragmentInstance.setLocalQueryId(ReadWriteIOUtils.readLong(buffer)); - } + fragmentInstance.setOuterQueryDeadlineMs(ReadWriteIOUtils.readLong(buffer)); return fragmentInstance; } @@ -302,7 +300,7 @@ public class FragmentInstance implements IConsensusRequest { } ReadWriteIOUtils.write(isExplainAnalyze, outputStream); ReadWriteIOUtils.write(isHighestPriority, outputStream); - ReadWriteIOUtils.write(localQueryId, outputStream); + ReadWriteIOUtils.write(outerQueryDeadlineMs, outputStream); return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); } catch (IOException e) { LOGGER.error( @@ -349,12 +347,12 @@ public class FragmentInstance implements IConsensusRequest { return sessionInfo; } - public long getLocalQueryId() { - return localQueryId; + public long getOuterQueryDeadlineMs() { + return outerQueryDeadlineMs; } - public void setLocalQueryId(long localQueryId) { - this.localQueryId = localQueryId; + public void setOuterQueryDeadlineMs(long outerQueryDeadlineMs) { + this.outerQueryDeadlineMs = outerQueryDeadlineMs; } public boolean isExplainAnalyze() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index e9695ec73f6..586bb6d61dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -186,7 +186,8 @@ public class TableModelQueryFragmentPlanner extends AbstractFragmentParallelPlan queryContext.isDebug(), fragment.isRoot(), queryContext.isVerbose()); - fragmentInstance.setLocalQueryId(queryContext.getLocalQueryId()); + fragmentInstance.setOuterQueryDeadlineMs( + queryContext.getStartTime() + queryContext.getTimeOut()); selectExecutorAndHost( fragment, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java index 940288a83ac..630360f4858 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/udf/IoTDBLocalImpl.java @@ -24,8 +24,6 @@ import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.QueryTimeoutException; import org.apache.iotdb.commons.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.common.QueryId; -import org.apache.iotdb.db.queryengine.plan.Coordinator; -import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.udf.api.IoTDBLocal; import org.apache.iotdb.udf.api.UDFResultSet; import org.apache.iotdb.udf.api.exception.UDFException; @@ -42,23 +40,22 @@ public class IoTDBLocalImpl implements IoTDBLocal { public static final IoTDBLocalFactory FACTORY = IoTDBLocalImpl::new; private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLocalImpl.class); - private static final Coordinator COORDINATOR = Coordinator.getInstance(); private final SessionInfo sessionInfo; private final String fragmentInstanceId; - private final long outerLocalQueryId; private final QueryId outerGlobalQueryId; + private final long outerQueryDeadlineMs; private final List<UDFResultSetImpl> openResultSets = new ArrayList<>(); public IoTDBLocalImpl( SessionInfo sessionInfo, String fragmentInstanceId, - long outerLocalQueryId, - String outerGlobalQueryId) { + String outerGlobalQueryId, + long outerQueryDeadlineMs) { this.sessionInfo = sessionInfo; this.fragmentInstanceId = fragmentInstanceId; - this.outerLocalQueryId = outerLocalQueryId; this.outerGlobalQueryId = QueryId.valueOf(outerGlobalQueryId); + this.outerQueryDeadlineMs = outerQueryDeadlineMs; } @Override @@ -97,12 +94,10 @@ public class IoTDBLocalImpl implements IoTDBLocal { } private long computeRemainingTimeoutMs() { - IQueryExecution execution = COORDINATOR.getQueryExecution(outerLocalQueryId); - if (execution == null) { + if (outerQueryDeadlineMs <= 0) { return 0; } - return execution.getTimeout() - - (System.currentTimeMillis() - execution.getStartExecutionTime()); + return outerQueryDeadlineMs - System.currentTimeMillis(); } @Override
