This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a6a84b8ecc3 [improvement](stream load)(cherry-pick) support
hll_from_base64 for stream load column mapping (#36819)
a6a84b8ecc3 is described below
commit a6a84b8ecc324834f79f3a61e5e587366890a06e
Author: gnehil <[email protected]>
AuthorDate: Wed Jun 26 20:12:40 2024 +0800
[improvement](stream load)(cherry-pick) support hll_from_base64 for stream
load column mapping (#36819)
picked from https://github.com/apache/doris/pull/35923
---
.../java/org/apache/doris/catalog/FunctionSet.java | 1 +
.../org/apache/doris/planner/FileLoadScanNode.java | 4 ++-
.../data/load_p0/http_stream/test_http_stream.out | 12 +++++++
.../stream_load/test_stream_load_hll_type.csv | 10 ++++++
.../load_p0/stream_load/test_stream_load_new.out | 12 +++++++
.../load_p0/http_stream/test_http_stream.groovy | 41 +++++++++++++++++++++
.../stream_load/test_stream_load_new.groovy | 42 ++++++++++++++++++++++
7 files changed, 121 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
index b0d4c654531..2db943993dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
@@ -178,6 +178,7 @@ public class FunctionSet<T> {
public static final String HLL_UNION_AGG = "hll_union_agg";
public static final String HLL_RAW_AGG = "hll_raw_agg";
public static final String HLL_CARDINALITY = "hll_cardinality";
+ public static final String HLL_FROM_BASE64 = "hll_from_base64";
public static final String TO_BITMAP = "to_bitmap";
public static final String TO_BITMAP_WITH_CHECK = "to_bitmap_with_check";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index ca0324a51d0..0d674a70517 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -280,9 +280,11 @@ public class FileLoadScanNode extends FileScanNode {
}
FunctionCallExpr fn = (FunctionCallExpr) expr;
if
(!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) &&
!fn.getFnName()
- .getFunction().equalsIgnoreCase("hll_empty")) {
+ .getFunction().equalsIgnoreCase("hll_empty")
+ &&
!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) {
throw new AnalysisException("HLL column must use " +
FunctionSet.HLL_HASH + " function, like "
+ destSlotDesc.getColumn().getName() + "=" +
FunctionSet.HLL_HASH + "(xxx) or "
+ + destSlotDesc.getColumn().getName() + "=" +
FunctionSet.HLL_FROM_BASE64 + "(xxx) or "
+ destSlotDesc.getColumn().getName() + "=hll_empty()");
}
expr.setType(org.apache.doris.catalog.Type.HLL);
diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.out
b/regression-test/data/load_p0/http_stream/test_http_stream.out
index 7ce24eea095..2475ed24961 100644
--- a/regression-test/data/load_p0/http_stream/test_http_stream.out
+++ b/regression-test/data/load_p0/http_stream/test_http_stream.out
@@ -620,3 +620,15 @@
1 test
2 test
+-- !sql19 --
+buag 1 1
+huang 1 1
+jfin 1 1
+koga 1 1
+kon 1 1
+lofn 1 1
+lojn 1 1
+nfubg 1 1
+nhga 1 1
+nijg 1 1
+
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv
b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv
new file mode 100644
index 00000000000..0b1d798782c
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv
@@ -0,0 +1,10 @@
+1001,koga,AQEMYSmSmfh+mA==
+1002,nijg,AQGs1RXTaA+hkQ==
+1003,lojn,AQFyJr4rwn+S0A==
+1004,lofn,AQFvE0bU6Pc9uw==
+1005,jfin,AQEmxbO3VGItCA==
+1006,kon,AQEm5d0Gw4uvZw==
+1007,nhga,AQHOpocenFnBwQ==
+1008,nfubg,AQFzYsFz+NIgUg==
+1009,huang,AQH2slI7qAUmYA==
+1010,buag,AQGBXZ3xnU79YA==
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_new.out
b/regression-test/data/load_p0/stream_load/test_stream_load_new.out
index 52440d98436..f251042a9df 100644
--- a/regression-test/data/load_p0/stream_load/test_stream_load_new.out
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_new.out
@@ -124,3 +124,15 @@
10009 jj
10010 kk
+-- !sql13 --
+buag 1 1
+huang 1 1
+jfin 1 1
+koga 1 1
+kon 1 1
+lofn 1 1
+lojn 1 1
+nfubg 1 1
+nhga 1 1
+nijg 1 1
+
diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
index 781732988e5..5411224c200 100644
--- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
+++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
@@ -854,5 +854,46 @@ suite("test_http_stream", "p0") {
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName18}"
}
+
+ // test load hll type
+ def tableName19 = "test_http_stream_hll_type"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName19} (
+ type_id int,
+ type_name varchar(10),
+ pv_hash hll hll_union not null,
+ pv_base64 hll hll_union not null
+ )
+ AGGREGATE KEY(type_id,type_name)
+ DISTRIBUTED BY HASH(type_id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName19} select
c1,c2,hll_hash(c1),hll_from_base64(c3) from http_stream("format"="csv",
"column_separator"=",")
+ """
+ time 10000
+ file '../stream_load/test_stream_load_hll_type.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("http_stream result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+
+ qt_sql19 "select type_name, hll_union_agg(pv_hash),
hll_union_agg(pv_base64) from ${tableName19} group by type_name order by
type_name"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName19}"
+ }
+
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy
index 7df57ebbd16..48c3e5f9654 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy
@@ -540,5 +540,47 @@ suite("test_stream_load_new", "p0") {
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName12}"
}
+
+ // 13. test stream load hll type
+ def tableName13 = "test_stream_load_hll_type"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName13} (
+ type_id int,
+ type_name varchar(10),
+ pv_hash hll hll_union not null,
+ pv_base64 hll hll_union not null
+ )
+ AGGREGATE KEY(type_id,type_name)
+ DISTRIBUTED BY HASH(type_id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'columns',
'type_id,type_name,type_id_base64,pv_hash=hll_hash(type_id),pv_base64=hll_from_base64(type_id_base64)'
+ table "${tableName13}"
+ time 10000
+ file 'test_stream_load_hll_type.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(10, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ sql """ sync; """
+ qt_sql13 "select type_name, hll_union_agg(pv_hash),
hll_union_agg(pv_base64) from ${tableName13} group by type_name order by
type_name"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName13}"
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]