This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-1971 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 65e6715bbfedfd4ab492c50181d65ae6f471695a Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Dec 2 12:10:21 2021 +0800 udf_min_fragment_number_to_trigger_parallel_execution --- .../src/assembly/resources/conf/iotdb-engine.properties | 5 +++++ .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 17 +++++++++++++++++ .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 13 +++++++++++++ .../iotdb/db/query/udf/core/layer/LayerBuilder.java | 8 ++++++-- 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index aad2f9d..c736bd1 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -828,6 +828,11 @@ timestamp_precision=ms # If its prefix is "/", then the path is absolute. Otherwise, it is relative. # udf_root_dir=ext/udf +# UDTFPlan can be split into several fragment plans, when the number of the fragment plans is over +# udf_min_fragment_number_to_trigger_parallel_execution, the executor would trigger a parallel +# execution. The property should be an integer and larger than 1. +# udf_min_fragment_number_to_trigger_parallel_execution=2 + #################### ### Trigger Configuration #################### diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 0ef9fbe..6038fb1 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -740,6 +740,13 @@ public class IoTDBConfig { private float udfCollectorMemoryBudgetInMB = (float) (1.0 / 3 * udfMemoryBudgetInMB); + /** + * UDTFPlan can be split into several fragment plans, when the number of the fragment plans is + * over udfMinFragmentNumberToTriggerParallelExecution, the executor would trigger a parallel + * execution. + */ + private int udfMinFragmentNumberToTriggerParallelExecution = 2; + // time in nanosecond precision when starting up private long startUpNanosecond = System.nanoTime(); @@ -815,6 +822,16 @@ public class IoTDBConfig { this.udfInitialByteArrayLengthForMemoryControl = udfInitialByteArrayLengthForMemoryControl; } + public int getUdfMinFragmentNumberToTriggerParallelExecution() { + return udfMinFragmentNumberToTriggerParallelExecution; + } + + public void setUdfMinFragmentNumberToTriggerParallelExecution( + int udfMinFragmentNumberToTriggerParallelExecution) { + this.udfMinFragmentNumberToTriggerParallelExecution = + udfMinFragmentNumberToTriggerParallelExecution; + } + public int getConcurrentWritingTimePartition() { return concurrentWritingTimePartition; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 84f8051..a42572f 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1200,6 +1200,13 @@ public class IoTDBDescriptor { properties.getProperty( "select_into_insert_tablet_plan_row_limit", String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit())))); + + // udf + conf.setUdfMinFragmentNumberToTriggerParallelExecution( + Integer.parseInt( + properties.getProperty( + "udf_min_fragment_number_to_trigger_parallel_execution", + String.valueOf(conf.getUdfMinFragmentNumberToTriggerParallelExecution())))); } catch (Exception e) { throw new QueryProcessException(String.format("Fail to reload configuration because %s", e)); } @@ -1328,6 +1335,12 @@ public class IoTDBDescriptor { + readerTransformerCollectorMemoryProportion); } } + + conf.setUdfMinFragmentNumberToTriggerParallelExecution( + Integer.parseInt( + properties.getProperty( + "udf_min_fragment_number_to_trigger_parallel_execution", + String.valueOf(conf.getUdfMinFragmentNumberToTriggerParallelExecution())))); } private void loadTriggerProps(Properties properties) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java index a61d872..05f1b8f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.query.udf.core.layer; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.dataset.udf.UDTFAlignByTimeDataSet; @@ -38,6 +40,8 @@ import java.util.Map; public class LayerBuilder { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private final long queryId; private final UDTFPlan udtfPlan; private final RawQueryInputLayer rawTimeSeriesInputLayer; @@ -137,9 +141,9 @@ public class LayerBuilder { return resultColumnPointReaders; } - /** TODO: make it configurable */ public boolean canBeSplitIntoFragments() { - return 2 <= fragmentDataSetIndexToLayerPointReaders.size(); + return Math.min(2, CONFIG.getUdfMinFragmentNumberToTriggerParallelExecution()) + <= fragmentDataSetIndexToLayerPointReaders.size(); } public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet udtfAlignByTimeDataSet)
