This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 452318a9fc9 [Enhancement](streamload) stream tvf support user
specified label (#24219)
452318a9fc9 is described below
commit 452318a9fc98be83ce47f2d3d86ab876753c1fe7
Author: zzzzzzzs <[email protected]>
AuthorDate: Wed Sep 27 12:09:35 2023 +0800
[Enhancement](streamload) stream tvf support user specified label (#24219)
stream tvf support user specified label
example:
curl -v --location-trusted -u root: -H "sql: insert into test.t1 WITH LABEL
label1 select c1,c2 from http_stream(\"format\" = \"CSV\", \"column_separator\"
= \",\")" -T example.csv http://127.0.0.1:8030/api/_http_stream
return:
{
"TxnId": 2064,
"Label": "label1",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 2,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 27,
"LoadTimeMs": 152,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 83,
"ReadDataTimeMs": 92,
"WriteDataTimeMs": 41,
"CommitAndPublishTimeMs": 24
}
---
be/src/http/action/http_stream.cpp | 5 +-
.../import/import-way/stream-load-manual.md | 6 +
.../apache/doris/service/FrontendServiceImpl.java | 1 +
.../data/load_p0/http_stream/test_http_stream.out | 13 ++
.../load_p0/http_stream/test_http_stream.groovy | 252 +++------------------
5 files changed, 53 insertions(+), 224 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index 62153c8f8ff..1b59bf0b6bb 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -160,10 +160,6 @@ int HttpStreamAction::on_header(HttpRequest* req) {
ctx->load_type = TLoadType::MANUL_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;
- ctx->label = req->header(HTTP_LABEL_KEY);
- if (ctx->label.empty()) {
- ctx->label = generate_uuid_string();
- }
ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true");
LOG(INFO) << "new income streaming load request." << ctx->brief()
@@ -315,6 +311,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->db = ctx->put_result.params.db_name;
ctx->table = ctx->put_result.params.table_name;
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
+ ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (ctx->group_commit) {
diff --git
a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
index 1bb2e6b85d8..946c057198f 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
@@ -303,6 +303,12 @@ curl --location-trusted -u user:passwd [-H "sql:
${load_sql}"...] -T data.file -
curl --location-trusted -u root: -T test.csv -H "sql:insert into
demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from
http_stream("format" = "CSV", "column_separator" = "," ) where age >= 30"
http://127.0.0.1:28030/api/_http_stream
```
+#### 相关参数
+
+1. label: 用户可以通过指定Label的方式来导入数据
+```
+curl -v --location-trusted -u root: -H "sql: insert into test.t1(c1, c2) WITH
LABEL label1 select c1,c2 from http_stream(\"format\" = \"CSV\",
\"column_separator\" = \",\")" -T example.csv
http://127.0.0.1:8030/api/_http_stream
+```
### 返回结果
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b06cfc91d14..d34913bba5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2079,6 +2079,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.getParams().setTableName(parsedStmt.getTbl());
// The txn_id here is obtained from the NativeInsertStmt
result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
+ result.getParams().setImportLabel(parsedStmt.getLabel());
if (parsedStmt.isGroupCommitTvf) {
result.getParams().params.setGroupCommit(true);
}
diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.out
b/regression-test/data/load_p0/http_stream/test_http_stream.out
index 4471c47db0a..5b6d8411bf0 100644
--- a/regression-test/data/load_p0/http_stream/test_http_stream.out
+++ b/regression-test/data/load_p0/http_stream/test_http_stream.out
@@ -569,3 +569,16 @@
龚强 948
龚静 641
+-- !sql14 --
+10000 aa
+10001 bb
+10002 cc
+10003 dd
+10004 ee
+10005 ff
+10006 gg
+10007 hh
+10008 ii
+10009 jj
+10010 kk
+
diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
index 5d991c40c0d..bbfc2c30f00 100644
--- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
+++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
@@ -600,231 +600,43 @@ suite("test_http_stream", "p0") {
try_sql "DROP TABLE IF EXISTS ${tableName13}"
}
- // 14. test parquet orc case
- def tableName14 = "test_parquet_orc_case"
+ // 14. test label
+ def tableName14 = "test_http_stream_label"
+ def label = UUID.randomUUID().toString().replaceAll("-", "")
+
try {
- sql """ DROP TABLE IF EXISTS ${tableName14} """
sql """
- CREATE TABLE IF NOT EXISTS ${tableName14} (
- `WatchId` char(128),
- `JavaEnable` smallint,
- `Title` string,
- `GoodEvent` smallint,
- `EventTime` datetime,
- `EventDate` date,
- `CounterId` bigint,
- `ClientIp` bigint,
- `ClientIp6` char(50),
- `RegionId` bigint,
- `UserId` string,
- `CounterClass` tinyint,
- `Os` smallint,
- `UserAgent` smallint,
- `Url` string,
- `Referer` string,
- `Urldomain` string,
- `RefererDomain` string,
- `Refresh` smallint,
- `IsRobot` smallint,
- `RefererCategories` string,
- `UrlCategories` string,
- `UrlRegions` string,
- `RefererRegions` string,
- `ResolutionWidth` int,
- `ResolutionHeight` int,
- `ResolutionDepth` smallint,
- `FlashMajor` smallint,
- `FlashMinor` smallint,
- `FlashMinor2` string,
- `NetMajor` smallint,
- `NetMinor` smallint,
- `UserAgentMajor` int,
- `UserAgentMinor` char(4),
- `CookieEnable` smallint,
- `JavascriptEnable` smallint,
- `IsMobile` smallint,
- `MobilePhone` smallint,
- `MobilePhoneModel` string,
- `Params` string,
- `IpNetworkId` bigint,
- `TraficSourceId` tinyint,
- `SearchEngineId` int,
- `SearchPhrase` string,
- `AdvEngineId` smallint,
- `IsArtifical` smallint,
- `WindowClientWidth` int,
- `WindowClientHeight` int,
- `ClientTimeZone` smallint,
- `ClientEventTime` datetime,
- `SilverLightVersion1` smallint,
- `SilverlightVersion2` smallint,
- `SilverlightVersion3` bigint,
- `SilverlightVersion4` int,
- `PageCharset` string,
- `CodeVersion` bigint,
- `IsLink` smallint,
- `IsDownload` smallint,
- `IsNotBounce` smallint,
- `FUniqId` string,
- `Hid` bigint,
- `IsOldCounter` smallint,
- `IsEvent` smallint,
- `IsParameter` smallint,
- `DontCountHits` smallint,
- `WithHash` smallint,
- `HitColor` char(2),
- `UtcEventTime` datetime,
- `Age` smallint,
- `Sex` smallint,
- `Income` smallint,
- `Interests` int,
- `Robotness` smallint,
- `GeneralInterests` string,
- `RemoteIp` bigint,
- `RemoteIp6` char(50),
- `WindowName` int,
- `OpenerName` int,
- `historylength` smallint,
- `BrowserLanguage` char(4),
- `BrowserCountry` char(4),
- `SocialNetwork` string,
- `SocialAction` string,
- `HttpError` int,
- `SendTiming` int,
- `DnsTiming` int,
- `ConnectTiming` int,
- `ResponseStartTiming` int,
- `ResponseEndTiming` int,
- `FetchTiming` int,
- `RedirectTiming` int,
- `DomInteractiveTiming` int,
- `DomContentLoadedTiming` int,
- `DomCompleteTiming` int,
- `LoadEventStartTiming` int,
- `LoadEventEndTiming` int,
- `NsToDomContentLoadedTiming` int,
- `FirstPaintTiming` int,
- `RedirectCount` tinyint,
- `SocialSourceNetworkId` smallint,
- `SocialSourcePage` string,
- `ParamPrice` bigint,
- `ParamOrderId` string,
- `ParamCurrency` char(6),
- `ParamCurrencyId` int,
- `GoalsReached` string,
- `OpenStatServiceName` string,
- `OpenStatCampaignId` string,
- `OpenStatAdId` string,
- `OpenStatSourceId` string,
- `UtmSource` string,
- `UtmMedium` string,
- `UtmCampaign` string,
- `UtmContent` string,
- `UtmTerm` string,
- `FromTag` string,
- `HasGclId` smallint,
- `RefererHash` string,
- `UrlHash` string,
- `ClId` bigint,
- `YclId` string,
- `ShareService` string,
- `ShareUrl` string,
- `ShareTitle` string,
- `ParsedParamsKey1` string,
- `ParsedParamsKey2` string,
- `ParsedParamsKey3` string,
- `ParsedParamsKey4` string,
- `ParsedParamsKey5` string,
- `ParsedParamsValueDouble` double,
- `IsLandId` char(40),
- `RequestNum` bigint,
- `RequestTry` smallint
- ) ENGINE=OLAP
- DUPLICATE KEY(`WatchId`, `JavaEnable`)
- DISTRIBUTED BY HASH(`WatchId`, `JavaEnable`) BUCKETS 3
- PROPERTIES ("replication_num" = "1");
+ CREATE TABLE IF NOT EXISTS ${tableName14} (
+ id int,
+ name CHAR(10)
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
"""
- // streamLoad {
- // set 'version', '1'
- // set 'sql', """
- // insert into ${db}.${tableName14} select * from
http_stream("format"="parquet")
- // """
- // time 10000
- // set 'format', 'parquet'
- // file 'test_http_stream_parquet_case.parquet'
- // check { result, exception, startTime, endTime ->
- // if (exception != null) {
- // throw exception
- // }
- // log.info("http_stream result: ${result}".toString())
- // def json = parseJson(result)
- // assertEquals("success", json.Status.toLowerCase())
- // }
- // }
- // qt_sql13 "select * from ${tableName14} order by WatchId"
- sql """truncate table ${tableName14}"""
-
- // streamLoad {
- // set 'version', '1'
- // set 'sql', """
- // insert into ${db}.${tableName14} select * from
http_stream("format"="parquet")
- // """
- // time 10000
- // set 'format', 'parquet'
- // file 'test_http_stream_parquet_case.parquet'
- // check { result, exception, startTime, endTime ->
- // if (exception != null) {
- // throw exception
- // }
- // log.info("http_stream result: ${result}".toString())
- // def json = parseJson(result)
- // assertEquals("success", json.Status.toLowerCase())
- // }
- // }
- // qt_sql13 "select * from ${tableName14} order by WatchId"
- sql """truncate table ${tableName14}"""
-
- // streamLoad {
- // set 'version', '1'
- // set 'sql', """
- // insert into ${db}.${tableName14} select * from
http_stream("format"="parquet")
- // """
- // time 10000
- // set 'format', 'parquet'
- // file 'test_http_stream_parquet_case.parquet'
- // check { result, exception, startTime, endTime ->
- // if (exception != null) {
- // throw exception
- // }
- // log.info("http_stream result: ${result}".toString())
- // def json = parseJson(result)
- // assertEquals("success", json.Status.toLowerCase())
- // }
- // }
- // qt_sql13 "select * from ${tableName14} order by WatchId"
- sql """truncate table ${tableName14}"""
-
- // streamLoad {
- // set 'version', '1'
- // set 'sql', """
- // insert into ${db}.${tableName14} select * from
http_stream("format"="orc")
- // """
- // time 10000
- // set 'format', 'orc'
- // file 'test_http_stream_orc_case.orc'
- // check { result, exception, startTime, endTime ->
- // if (exception != null) {
- // throw exception
- // }
- // log.info("http_stream result: ${result}".toString())
- // def json = parseJson(result)
- // assertEquals("success", json.Status.toLowerCase())
- // }
- // }
- // qt_sql13 "select * from ${tableName14} order by WatchId"
- sql """truncate table ${tableName14}"""
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${tableName14} WITH LABEL ${label}
select c1, c2 from http_stream("format"="csv", "column_separator"="--")
+ """
+ time 10000
+ file 'test_http_stream_column_separator.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("http_stream result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals(label, json.Label.toLowerCase())
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ qt_sql14 "select id, name from ${tableName14} order by id"
} finally {
try_sql "DROP TABLE IF EXISTS ${tableName14}"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]