This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5c1eef5f060 [feature](tvf) support max_filter_ratio (#35431) (#36911)
5c1eef5f060 is described below
commit 5c1eef5f060409371aa1befed92f0c8af54e27bf
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Jun 27 20:58:53 2024 +0800
[feature](tvf) support max_filter_ratio (#35431) (#36911)
bp #35431
Co-authored-by: 苏小刚 <[email protected]>
---
.../commands/insert/AbstractInsertExecutor.java | 10 ++++-
.../apache/doris/planner/GroupCommitPlanner.java | 3 +-
.../java/org/apache/doris/qe/SessionVariable.java | 12 ++++++
.../java/org/apache/doris/qe/StmtExecutor.java | 12 +++++-
.../doris/statistics/util/StatisticsUtil.java | 1 +
.../data/external_table_p0/tvf/test_hdfs_tvf.out | 1 -
.../external_table_p0/tvf/test_hdfs_tvf.groovy | 43 ++++++++++++++++++++--
7 files changed, 73 insertions(+), 9 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 64bf625fef5..cb9073c77ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -162,13 +162,19 @@ public abstract class AbstractInsertExecutor {
}
}
- private void checkStrictMode() throws Exception {
+ private void checkStrictModeAndFilterRatio() throws Exception {
// if in strict mode, insert will fail if there are filtered rows
if (ctx.getSessionVariable().getEnableInsertStrict()) {
if (filteredRows > 0) {
ErrorReport.reportDdlException("Insert has filtered data in
strict mode",
ErrorCode.ERR_FAILED_WHEN_INSERT);
}
+ } else {
+ if (filteredRows >
ctx.getSessionVariable().getInsertMaxFilterRatio() * (filteredRows +
loadedRows)) {
+ ErrorReport.reportDdlException("Insert has too many filtered
data %d/%d insert_max_filter_ratio is %f",
+ ErrorCode.ERR_FAILED_WHEN_INSERT, filteredRows,
filteredRows + loadedRows,
+ ctx.getSessionVariable().getInsertMaxFilterRatio());
+ }
}
}
@@ -179,7 +185,7 @@ public abstract class AbstractInsertExecutor {
beforeExec();
try {
execImpl(executor, jobId);
- checkStrictMode();
+ checkStrictModeAndFilterRatio();
onComplete();
} catch (Throwable t) {
onFail(t);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 9ff06b29076..253d12ad864 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -98,7 +98,8 @@ public class GroupCommitPlanner {
}
streamLoadPutRequest
.setDb(db.getFullName())
-
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict
? 0 : 1)
+
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict
? 0
+ :
ConnectContext.get().getSessionVariable().insertMaxFilterRatio)
.setTbl(table.getName())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 534b4f3386a..7c2257f6719 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -128,6 +128,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String PROFILE_LEVEL = "profile_level";
public static final String MAX_INSTANCE_NUM = "max_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
+ public static final String INSERT_MAX_FILTER_RATIO =
"insert_max_filter_ratio";
public static final String ENABLE_SPILLING = "enable_spilling";
public static final String ENABLE_SHORT_CIRCUIT_QUERY =
"enable_short_circuit_point_query";
public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE =
"enable_exchange_node_parallel_merge";
@@ -848,6 +849,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT, needForward = true)
public boolean enableInsertStrict = true;
+ @VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO, needForward = true)
+ public double insertMaxFilterRatio = 1.0;
+
@VariableMgr.VarAttr(name = ENABLE_ODBC_TRANSCATION)
public boolean enableOdbcTransaction = false;
@@ -2463,6 +2467,14 @@ public class SessionVariable implements Serializable,
Writable {
this.enableInsertStrict = enableInsertStrict;
}
+ public double getInsertMaxFilterRatio() {
+ return insertMaxFilterRatio;
+ }
+
+ public void setInsertMaxFilterRatio(double maxFilterRatio) {
+ this.insertMaxFilterRatio = maxFilterRatio;
+ }
+
public boolean isEnableSqlCache() {
return enableSqlCache;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 6edceff76d3..1e63a750748 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1930,7 +1930,8 @@ public class StmtExecutor {
context.getTxnEntry()
.setTxnConf(new
TTxnParams().setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load)
.setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("")
-
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 :
1.0));
+
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0
+ :
context.getSessionVariable().getInsertMaxFilterRatio()));
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("',
'status':'")
.append(TransactionStatus.PREPARE.name());
@@ -2250,6 +2251,15 @@ public class StmtExecutor {
"Insert has filtered data in strict mode,
tracking_url=" + coord.getTrackingUrl());
return;
}
+ } else {
+ if (filteredRows >
context.getSessionVariable().getInsertMaxFilterRatio()
+ * (filteredRows + loadedRows)) {
+
context.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
+ String.format("Insert has too many filtered
data %d/%d insert_max_filter_ratio is %f",
+ filteredRows, filteredRows +
loadedRows,
+
context.getSessionVariable().getInsertMaxFilterRatio()));
+ return;
+ }
}
if (tblType != TableType.OLAP && tblType !=
TableType.MATERIALIZED_VIEW) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 35037bb7e27..2be007b1419 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -187,6 +187,7 @@ public class StatisticsUtil {
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
sessionVariable.cpuResourceLimit =
Config.cpu_resource_limit_per_analyze_task;
sessionVariable.setEnableInsertStrict(true);
+ sessionVariable.setInsertMaxFilterRatio(1.0);
sessionVariable.enablePageCache = false;
sessionVariable.enableProfile = Config.enable_profile_when_analyze;
sessionVariable.parallelExecInstanceNum =
Config.statistics_sql_parallel_exec_instance_num;
diff --git a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
index 98a1324abd7..91e2ec2d330 100644
--- a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
+++ b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
@@ -346,7 +346,6 @@
2 shanghai 2345672
3 hangzhou 2345673
4 shenzhen 2345674
-5 guangzhou 2345675
-- !desc --
s_suppkey INT Yes false \N NONE
diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
index aa1fc8712bd..abdfd871a1f 100644
--- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
@@ -206,13 +206,13 @@
suite("test_hdfs_tvf","external,hive,tvf,external_docker") {
"strip_outer_array" = "false",
"read_json_by_line" = "true") order by id; """
- // test insert into select
+ // test insert into select in strict mode or
insert_insert_max_filter_ratio is setted
def testTable = "test_hdfs_tvf"
sql "DROP TABLE IF EXISTS ${testTable}"
def result1 = sql """ CREATE TABLE IF NOT EXISTS ${testTable}
(
id int,
- city varchar(50),
+ city varchar(8),
code int
)
COMMENT "test hdfs tvf table"
@@ -225,6 +225,9 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") {
uri = "${defaultFS}" +
"/user/doris/preinstalled_data/json_format_test/nest_json.json"
format = "json"
+
+ sql "set enable_insert_strict=false;"
+ sql "set insert_max_filter_ratio=0.2;"
def result2 = sql """ insert into ${testTable}(id,city,code)
select cast (id as INT) as id, city, cast (code as INT) as
code
from HDFS(
@@ -234,9 +237,41 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker")
{
"strip_outer_array" = "false",
"read_json_by_line" = "true",
"json_root" = "\$.item") """
-
sql "sync"
- assertTrue(result2[0][0] == 5, "Insert should update 12 rows")
+ assertTrue(result2[0][0] == 4, "Insert should update 4 rows")
+
+ try{
+ sql "set insert_max_filter_ratio=0.1;"
+ def result3 = sql """ insert into ${testTable}(id,city,code)
+ select cast (id as INT) as id, city, cast (code as
INT) as code
+ from HDFS(
+ "uri" = "${uri}",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}",
+ "strip_outer_array" = "false",
+ "read_json_by_line" = "true",
+ "json_root" = "\$.item") """
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains('Insert has too many
filtered data 1/5 insert_max_filter_ratio is 0.100000.'))
+ }
+
+ try{
+ sql " set enable_insert_strict=true;"
+ def result4 = sql """ insert into ${testTable}(id,city,code)
+ select cast (id as INT) as id, city, cast (code as
INT) as code
+ from HDFS(
+ "uri" = "${uri}",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}",
+ "strip_outer_array" = "false",
+ "read_json_by_line" = "true",
+ "json_root" = "\$.item") """
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains('Insert has filtered data
in strict mode.'))
+ }
+
qt_insert """ select * from test_hdfs_tvf order by id; """
// test desc function
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]