This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 452318a9fc9 [Enhancement](streamload) stream tvf support user 
specified label (#24219)
452318a9fc9 is described below

commit 452318a9fc98be83ce47f2d3d86ab876753c1fe7
Author: zzzzzzzs <[email protected]>
AuthorDate: Wed Sep 27 12:09:35 2023 +0800

    [Enhancement](streamload) stream tvf support user specified label (#24219)
    
    stream tvf support user specified label
    example:
    
    curl -v --location-trusted -u root: -H "sql: insert into test.t1 WITH LABEL 
label1 select c1,c2 from http_stream(\"format\" = \"CSV\", \"column_separator\" 
= \",\")" -T example.csv http://127.0.0.1:8030/api/_http_stream
    return:
    
    {
        "TxnId": 2064,
        "Label": "label1",
        "Comment": "",
        "TwoPhaseCommit": "false",
        "Status": "Success",
        "Message": "OK",
        "NumberTotalRows": 2,
        "NumberLoadedRows": 2,
        "NumberFilteredRows": 0,
        "NumberUnselectedRows": 0,
        "LoadBytes": 27,
        "LoadTimeMs": 152,
        "BeginTxnTimeMs": 0,
        "StreamLoadPutTimeMs": 83,
        "ReadDataTimeMs": 92,
        "WriteDataTimeMs": 41,
        "CommitAndPublishTimeMs": 24
    }
---
 be/src/http/action/http_stream.cpp                 |   5 +-
 .../import/import-way/stream-load-manual.md        |   6 +
 .../apache/doris/service/FrontendServiceImpl.java  |   1 +
 .../data/load_p0/http_stream/test_http_stream.out  |  13 ++
 .../load_p0/http_stream/test_http_stream.groovy    | 252 +++------------------
 5 files changed, 53 insertions(+), 224 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 62153c8f8ff..1b59bf0b6bb 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -160,10 +160,6 @@ int HttpStreamAction::on_header(HttpRequest* req) {
     ctx->load_type = TLoadType::MANUL_LOAD;
     ctx->load_src_type = TLoadSourceType::RAW;
 
-    ctx->label = req->header(HTTP_LABEL_KEY);
-    if (ctx->label.empty()) {
-        ctx->label = generate_uuid_string();
-    }
     ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true");
 
     LOG(INFO) << "new income streaming load request." << ctx->brief()
@@ -315,6 +311,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
     ctx->db = ctx->put_result.params.db_name;
     ctx->table = ctx->put_result.params.table_name;
     ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
+    ctx->label = ctx->put_result.params.import_label;
     ctx->put_result.params.__set_wal_id(ctx->wal_id);
 
     if (ctx->group_commit) {
diff --git 
a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md 
b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
index 1bb2e6b85d8..946c057198f 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
@@ -303,6 +303,12 @@ curl --location-trusted -u user:passwd [-H "sql: 
${load_sql}"...] -T data.file -
 curl  --location-trusted -u root: -T test.csv  -H "sql:insert into 
demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from 
http_stream("format" = "CSV", "column_separator" = "," ) where age >= 30"  
http://127.0.0.1:28030/api/_http_stream
 ```
 
+#### 相关参数
+
+1. label: 用户可以通过指定Label的方式来导入数据
+```
+curl -v --location-trusted -u root: -H "sql: insert into test.t1(c1, c2) WITH 
LABEL label1 select c1,c2 from http_stream(\"format\" = \"CSV\", 
\"column_separator\" = \",\")" -T example.csv 
http://127.0.0.1:8030/api/_http_stream
+```
 
 ### 返回结果
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b06cfc91d14..d34913bba5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2079,6 +2079,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.getParams().setTableName(parsedStmt.getTbl());
             // The txn_id here is obtained from the NativeInsertStmt
             result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
+            result.getParams().setImportLabel(parsedStmt.getLabel());
             if (parsedStmt.isGroupCommitTvf) {
                 result.getParams().params.setGroupCommit(true);
             }
diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.out 
b/regression-test/data/load_p0/http_stream/test_http_stream.out
index 4471c47db0a..5b6d8411bf0 100644
--- a/regression-test/data/load_p0/http_stream/test_http_stream.out
+++ b/regression-test/data/load_p0/http_stream/test_http_stream.out
@@ -569,3 +569,16 @@
 龚强     948
 龚静     641
 
+-- !sql14 --
+10000  aa
+10001  bb
+10002  cc
+10003  dd
+10004  ee
+10005  ff
+10006  gg
+10007  hh
+10008  ii
+10009  jj
+10010  kk
+
diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy 
b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
index 5d991c40c0d..bbfc2c30f00 100644
--- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
+++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
@@ -600,231 +600,43 @@ suite("test_http_stream", "p0") {
         try_sql "DROP TABLE IF EXISTS ${tableName13}"
     }
 
-    // 14. test parquet orc case
-    def tableName14 = "test_parquet_orc_case"
+    // 14. test label 
+    def tableName14 = "test_http_stream_label"
+    def label = UUID.randomUUID().toString().replaceAll("-", "")
+
     try {
-        sql """ DROP TABLE IF EXISTS ${tableName14} """
         sql """
-            CREATE TABLE IF NOT EXISTS ${tableName14} (
-                `WatchId` char(128),
-                `JavaEnable` smallint,
-                `Title` string,
-                `GoodEvent` smallint,
-                `EventTime` datetime,
-                `EventDate` date,
-                `CounterId` bigint,
-                `ClientIp` bigint,
-                `ClientIp6` char(50),
-                `RegionId` bigint,
-                `UserId` string,
-                `CounterClass` tinyint,
-                `Os` smallint,
-                `UserAgent` smallint,
-                `Url` string,
-                `Referer` string,
-                `Urldomain` string,
-                `RefererDomain` string,
-                `Refresh` smallint,
-                `IsRobot` smallint,
-                `RefererCategories` string,
-                `UrlCategories` string,
-                `UrlRegions` string,
-                `RefererRegions` string,
-                `ResolutionWidth` int,
-                `ResolutionHeight` int,
-                `ResolutionDepth` smallint,
-                `FlashMajor` smallint,
-                `FlashMinor` smallint,
-                `FlashMinor2` string,
-                `NetMajor` smallint,
-                `NetMinor` smallint,
-                `UserAgentMajor` int,
-                `UserAgentMinor` char(4),
-                `CookieEnable` smallint,
-                `JavascriptEnable` smallint,
-                `IsMobile` smallint,
-                `MobilePhone` smallint,
-                `MobilePhoneModel` string,
-                `Params` string,
-                `IpNetworkId` bigint,
-                `TraficSourceId` tinyint,
-                `SearchEngineId` int,
-                `SearchPhrase` string,
-                `AdvEngineId` smallint,
-                `IsArtifical` smallint,
-                `WindowClientWidth` int,
-                `WindowClientHeight` int,
-                `ClientTimeZone` smallint,
-                `ClientEventTime` datetime,
-                `SilverLightVersion1` smallint,
-                `SilverlightVersion2` smallint,
-                `SilverlightVersion3` bigint,
-                `SilverlightVersion4` int,
-                `PageCharset` string,
-                `CodeVersion` bigint,
-                `IsLink` smallint,
-                `IsDownload` smallint,
-                `IsNotBounce` smallint,
-                `FUniqId` string,
-                `Hid` bigint,
-                `IsOldCounter` smallint,
-                `IsEvent` smallint,
-                `IsParameter` smallint,
-                `DontCountHits` smallint,
-                `WithHash` smallint,
-                `HitColor` char(2),
-                `UtcEventTime` datetime,
-                `Age` smallint,
-                `Sex` smallint,
-                `Income` smallint,
-                `Interests` int,
-                `Robotness` smallint,
-                `GeneralInterests` string,
-                `RemoteIp` bigint,
-                `RemoteIp6` char(50),
-                `WindowName` int,
-                `OpenerName` int,
-                `historylength` smallint,
-                `BrowserLanguage` char(4),
-                `BrowserCountry` char(4),
-                `SocialNetwork` string,
-                `SocialAction` string,
-                `HttpError` int,
-                `SendTiming` int,
-                `DnsTiming` int,
-                `ConnectTiming` int,
-                `ResponseStartTiming` int,
-                `ResponseEndTiming` int,
-                `FetchTiming` int,
-                `RedirectTiming` int,
-                `DomInteractiveTiming` int,
-                `DomContentLoadedTiming` int,
-                `DomCompleteTiming` int,
-                `LoadEventStartTiming` int,
-                `LoadEventEndTiming` int,
-                `NsToDomContentLoadedTiming` int,
-                `FirstPaintTiming` int,
-                `RedirectCount` tinyint,
-                `SocialSourceNetworkId` smallint,
-                `SocialSourcePage` string,
-                `ParamPrice` bigint,
-                `ParamOrderId` string,
-                `ParamCurrency` char(6),
-                `ParamCurrencyId` int,
-                `GoalsReached` string,
-                `OpenStatServiceName` string,
-                `OpenStatCampaignId` string,
-                `OpenStatAdId` string,
-                `OpenStatSourceId` string,
-                `UtmSource` string,
-                `UtmMedium` string,
-                `UtmCampaign` string,
-                `UtmContent` string,
-                `UtmTerm` string,
-                `FromTag` string,
-                `HasGclId` smallint,
-                `RefererHash` string,
-                `UrlHash` string,
-                `ClId` bigint,
-                `YclId` string,
-                `ShareService` string,
-                `ShareUrl` string,
-                `ShareTitle` string,
-                `ParsedParamsKey1` string,
-                `ParsedParamsKey2` string,
-                `ParsedParamsKey3` string,
-                `ParsedParamsKey4` string,
-                `ParsedParamsKey5` string,
-                `ParsedParamsValueDouble` double,
-                `IsLandId` char(40),
-                `RequestNum` bigint,
-                `RequestTry` smallint
-            ) ENGINE=OLAP
-            DUPLICATE KEY(`WatchId`, `JavaEnable`)
-            DISTRIBUTED BY HASH(`WatchId`, `JavaEnable`) BUCKETS 3
-            PROPERTIES ("replication_num" = "1");
+        CREATE TABLE IF NOT EXISTS ${tableName14} (
+            id int,
+            name CHAR(10)
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
         """
 
-        // streamLoad {
-        //     set 'version', '1'
-        //     set 'sql', """
-        //             insert into ${db}.${tableName14} select * from 
http_stream("format"="parquet")
-        //             """
-        //     time 10000
-        //     set 'format', 'parquet'
-        //     file 'test_http_stream_parquet_case.parquet'
-        //     check { result, exception, startTime, endTime ->
-        //         if (exception != null) {
-        //             throw exception
-        //         }
-        //         log.info("http_stream result: ${result}".toString())
-        //         def json = parseJson(result)
-        //         assertEquals("success", json.Status.toLowerCase())
-        //     }
-        // }
-        // qt_sql13 "select * from ${tableName14} order by WatchId"
-        sql """truncate table ${tableName14}"""
-
-        // streamLoad {
-        //     set 'version', '1'
-        //     set 'sql', """
-        //             insert into ${db}.${tableName14} select * from 
http_stream("format"="parquet")
-        //             """
-        //     time 10000
-        //     set 'format', 'parquet'
-        //     file 'test_http_stream_parquet_case.parquet'
-        //     check { result, exception, startTime, endTime ->
-        //         if (exception != null) {
-        //             throw exception
-        //         }
-        //         log.info("http_stream result: ${result}".toString())
-        //         def json = parseJson(result)
-        //         assertEquals("success", json.Status.toLowerCase())
-        //     }
-        // }
-        // qt_sql13 "select * from ${tableName14} order by WatchId"
-        sql """truncate table ${tableName14}"""
-
-        // streamLoad {
-        //     set 'version', '1'
-        //     set 'sql', """
-        //             insert into ${db}.${tableName14} select * from 
http_stream("format"="parquet")
-        //             """
-        //     time 10000
-        //     set 'format', 'parquet'
-        //     file 'test_http_stream_parquet_case.parquet'
-        //     check { result, exception, startTime, endTime ->
-        //         if (exception != null) {
-        //             throw exception
-        //         }
-        //         log.info("http_stream result: ${result}".toString())
-        //         def json = parseJson(result)
-        //         assertEquals("success", json.Status.toLowerCase())
-        //     }
-        // }
-        // qt_sql13 "select * from ${tableName14} order by WatchId"
-        sql """truncate table ${tableName14}"""
-
-        // streamLoad {
-        //     set 'version', '1'
-        //     set 'sql', """
-        //             insert into ${db}.${tableName14} select * from 
http_stream("format"="orc")
-        //             """
-        //     time 10000
-        //     set 'format', 'orc'
-        //     file 'test_http_stream_orc_case.orc'
-        //     check { result, exception, startTime, endTime ->
-        //         if (exception != null) {
-        //             throw exception
-        //         }
-        //         log.info("http_stream result: ${result}".toString())
-        //         def json = parseJson(result)
-        //         assertEquals("success", json.Status.toLowerCase())
-        //     }
-        // }
-        // qt_sql13 "select * from ${tableName14} order by WatchId"
-        sql """truncate table ${tableName14}"""
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${tableName14} WITH LABEL ${label} 
select c1, c2 from http_stream("format"="csv", "column_separator"="--")
+                    """
+            time 10000
+            file 'test_http_stream_column_separator.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("http_stream result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals(label, json.Label.toLowerCase())
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
 
+        qt_sql14 "select id, name from ${tableName14} order by id"
     } finally {
         try_sql "DROP TABLE IF EXISTS ${tableName14}"
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to