This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 5c1eef5f060 [feature](tvf) support max_filter_ratio  (#35431) (#36911)
5c1eef5f060 is described below

commit 5c1eef5f060409371aa1befed92f0c8af54e27bf
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Jun 27 20:58:53 2024 +0800

    [feature](tvf) support max_filter_ratio  (#35431) (#36911)
    
    bp #35431
    
    Co-authored-by: 苏小刚 <[email protected]>
---
 .../commands/insert/AbstractInsertExecutor.java    | 10 ++++-
 .../apache/doris/planner/GroupCommitPlanner.java   |  3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  | 12 ++++++
 .../java/org/apache/doris/qe/StmtExecutor.java     | 12 +++++-
 .../doris/statistics/util/StatisticsUtil.java      |  1 +
 .../data/external_table_p0/tvf/test_hdfs_tvf.out   |  1 -
 .../external_table_p0/tvf/test_hdfs_tvf.groovy     | 43 ++++++++++++++++++++--
 7 files changed, 73 insertions(+), 9 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 64bf625fef5..cb9073c77ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -162,13 +162,19 @@ public abstract class AbstractInsertExecutor {
         }
     }
 
-    private void checkStrictMode() throws Exception {
+    private void checkStrictModeAndFilterRatio() throws Exception {
         // if in strict mode, insert will fail if there are filtered rows
         if (ctx.getSessionVariable().getEnableInsertStrict()) {
             if (filteredRows > 0) {
                 ErrorReport.reportDdlException("Insert has filtered data in 
strict mode",
                         ErrorCode.ERR_FAILED_WHEN_INSERT);
             }
+        } else {
+            if (filteredRows > 
ctx.getSessionVariable().getInsertMaxFilterRatio() * (filteredRows + 
loadedRows)) {
+                ErrorReport.reportDdlException("Insert has too many filtered 
data %d/%d insert_max_filter_ratio is %f",
+                        ErrorCode.ERR_FAILED_WHEN_INSERT, filteredRows, 
filteredRows + loadedRows,
+                        ctx.getSessionVariable().getInsertMaxFilterRatio());
+            }
         }
     }
 
@@ -179,7 +185,7 @@ public abstract class AbstractInsertExecutor {
         beforeExec();
         try {
             execImpl(executor, jobId);
-            checkStrictMode();
+            checkStrictModeAndFilterRatio();
             onComplete();
         } catch (Throwable t) {
             onFail(t);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 9ff06b29076..253d12ad864 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -98,7 +98,8 @@ public class GroupCommitPlanner {
         }
         streamLoadPutRequest
                 .setDb(db.getFullName())
-                
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict 
? 0 : 1)
+                
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict 
? 0
+                        : 
ConnectContext.get().getSessionVariable().insertMaxFilterRatio)
                 .setTbl(table.getName())
                 
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
                 
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 534b4f3386a..7c2257f6719 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -128,6 +128,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String PROFILE_LEVEL = "profile_level";
     public static final String MAX_INSTANCE_NUM = "max_instance_num";
     public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
+    public static final String INSERT_MAX_FILTER_RATIO = 
"insert_max_filter_ratio";
     public static final String ENABLE_SPILLING = "enable_spilling";
     public static final String ENABLE_SHORT_CIRCUIT_QUERY = 
"enable_short_circuit_point_query";
     public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = 
"enable_exchange_node_parallel_merge";
@@ -848,6 +849,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT, needForward = true)
     public boolean enableInsertStrict = true;
 
+    @VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO, needForward = true)
+    public double insertMaxFilterRatio = 1.0;
+
     @VariableMgr.VarAttr(name = ENABLE_ODBC_TRANSCATION)
     public boolean enableOdbcTransaction = false;
 
@@ -2463,6 +2467,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableInsertStrict = enableInsertStrict;
     }
 
+    public double getInsertMaxFilterRatio() {
+        return insertMaxFilterRatio;
+    }
+
+    public void setInsertMaxFilterRatio(double maxFilterRatio) {
+        this.insertMaxFilterRatio = maxFilterRatio;
+    }
+
     public boolean isEnableSqlCache() {
         return enableSqlCache;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 6edceff76d3..1e63a750748 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1930,7 +1930,8 @@ public class StmtExecutor {
             context.getTxnEntry()
                     .setTxnConf(new 
TTxnParams().setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load)
                             
.setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("")
-                            
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 : 
1.0));
+                            
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0
+                                    : 
context.getSessionVariable().getInsertMaxFilterRatio()));
             StringBuilder sb = new StringBuilder();
             
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 
'status':'")
                     .append(TransactionStatus.PREPARE.name());
@@ -2250,6 +2251,15 @@ public class StmtExecutor {
                                 "Insert has filtered data in strict mode, 
tracking_url=" + coord.getTrackingUrl());
                         return;
                     }
+                } else {
+                    if (filteredRows > 
context.getSessionVariable().getInsertMaxFilterRatio()
+                            * (filteredRows + loadedRows)) {
+                        
context.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
+                                String.format("Insert has too many filtered 
data %d/%d insert_max_filter_ratio is %f",
+                                        filteredRows, filteredRows + 
loadedRows,
+                                        
context.getSessionVariable().getInsertMaxFilterRatio()));
+                        return;
+                    }
                 }
 
                 if (tblType != TableType.OLAP && tblType != 
TableType.MATERIALIZED_VIEW) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 35037bb7e27..2be007b1419 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -187,6 +187,7 @@ public class StatisticsUtil {
         
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
         sessionVariable.cpuResourceLimit = 
Config.cpu_resource_limit_per_analyze_task;
         sessionVariable.setEnableInsertStrict(true);
+        sessionVariable.setInsertMaxFilterRatio(1.0);
         sessionVariable.enablePageCache = false;
         sessionVariable.enableProfile = Config.enable_profile_when_analyze;
         sessionVariable.parallelExecInstanceNum = 
Config.statistics_sql_parallel_exec_instance_num;
diff --git a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out 
b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
index 98a1324abd7..91e2ec2d330 100644
--- a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
+++ b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out
@@ -346,7 +346,6 @@
 2      shanghai        2345672
 3      hangzhou        2345673
 4      shenzhen        2345674
-5      guangzhou       2345675
 
 -- !desc --
 s_suppkey      INT     Yes     false   \N      NONE
diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy 
b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
index aa1fc8712bd..abdfd871a1f 100644
--- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
@@ -206,13 +206,13 @@ 
suite("test_hdfs_tvf","external,hive,tvf,external_docker") {
                         "strip_outer_array" = "false",
                         "read_json_by_line" = "true") order by id; """
 
-            // test insert into select
+            // test insert into select in strict mode or 
insert_insert_max_filter_ratio is setted
             def testTable = "test_hdfs_tvf"
             sql "DROP TABLE IF EXISTS ${testTable}"
             def result1 = sql """ CREATE TABLE IF NOT EXISTS ${testTable}
                 (
                     id int,
-                    city varchar(50),
+                    city varchar(8),
                     code int
                 )
                 COMMENT "test hdfs tvf table"
@@ -225,6 +225,9 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") {
 
             uri = "${defaultFS}" + 
"/user/doris/preinstalled_data/json_format_test/nest_json.json"
             format = "json"
+
+            sql "set enable_insert_strict=false;"
+            sql "set insert_max_filter_ratio=0.2;"
             def result2 = sql """ insert into ${testTable}(id,city,code)
                     select cast (id as INT) as id, city, cast (code as INT) as 
code
                     from HDFS(
@@ -234,9 +237,41 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") 
{
                         "strip_outer_array" = "false",
                         "read_json_by_line" = "true",
                         "json_root" = "\$.item") """
-            
             sql "sync"
-            assertTrue(result2[0][0] == 5, "Insert should update 12 rows")
+            assertTrue(result2[0][0] == 4, "Insert should update 4 rows")
+
+            try{
+                sql "set insert_max_filter_ratio=0.1;"
+                def result3 = sql """ insert into ${testTable}(id,city,code)
+                        select cast (id as INT) as id, city, cast (code as 
INT) as code
+                        from HDFS(
+                            "uri" = "${uri}",
+                            "hadoop.username" = "${hdfsUserName}",
+                            "format" = "${format}",
+                            "strip_outer_array" = "false",
+                            "read_json_by_line" = "true",
+                            "json_root" = "\$.item") """
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                assertTrue(e.getMessage().contains('Insert has too many 
filtered data 1/5 insert_max_filter_ratio is 0.100000.'))
+            }
+
+            try{
+                sql " set enable_insert_strict=true;"
+                def result4 = sql """ insert into ${testTable}(id,city,code)
+                        select cast (id as INT) as id, city, cast (code as 
INT) as code
+                        from HDFS(
+                            "uri" = "${uri}",
+                            "hadoop.username" = "${hdfsUserName}",
+                            "format" = "${format}",
+                            "strip_outer_array" = "false",
+                            "read_json_by_line" = "true",
+                            "json_root" = "\$.item") """
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                assertTrue(e.getMessage().contains('Insert has filtered data 
in strict mode.'))
+            }
+
             qt_insert """ select * from test_hdfs_tvf order by id; """
 
             // test desc function


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to