This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b878a7e61e [feature](Load)Suppot skip specific lines number for csv
stream load (#16055)
b878a7e61e is described below
commit b878a7e61e56f8d9e5497de337806249c23561a4
Author: huangzhaowei <[email protected]>
AuthorDate: Wed Feb 1 20:42:43 2023 +0800
[feature](Load)Suppot skip specific lines number for csv stream load
(#16055)
Support set skip line number for stream load to load csv file.
Usage `-H skip_lines:number`:
```
curl --location-trusted -u root: -T test.csv -H skip_lines:5 -XPUT
http://127.0.0.1:8030/api/testDb/testTbl/_stream_load
```
Skip line number also can be used in mysql load as below:
```sql
LOAD DATA
LOCAL
INFILE '${mysql_load_skip_lines}'
INTO TABLE ${tableName}
COLUMNS TERMINATED BY ','
IGNORE 2 LINES
PROPERTIES ("auth" = "root:");
```
---
be/src/http/action/stream_load.cpp | 3 +
be/src/http/http_common.h | 1 +
be/src/vec/exec/format/csv/csv_reader.cpp | 20 +--
.../Load/STREAM-LOAD.md | 2 +
.../Load/STREAM-LOAD.md | 2 +
fe/fe-core/src/main/cup/sql_parser.cup | 24 +++-
.../org/apache/doris/analysis/DataDescription.java | 8 ++
.../java/org/apache/doris/analysis/LoadStmt.java | 2 +
.../org/apache/doris/load/BrokerFileGroup.java | 6 +
.../apache/doris/load/loadv2/MysqlLoadManager.java | 5 +
.../doris/planner/external/LoadScanProvider.java | 1 +
.../java/org/apache/doris/task/LoadTaskInfo.java | 4 +
.../java/org/apache/doris/task/StreamLoadTask.java | 9 ++
fe/fe-core/src/main/jflex/sql_scanner.flex | 1 +
.../apache/doris/analysis/DataDescriptionTest.java | 2 +-
gensrc/thrift/FrontendService.thrift | 1 +
gensrc/thrift/PlanNodes.thrift | 4 +
.../load_p0/mysql_load/mysql_load_skip_lines.csv | 4 +
.../mysql_load/test_mysql_load_skip_lines.out | 16 +++
.../load_p0/stream_load/csv_with_skip_lines.csv | 4 +
.../stream_load/test_csv_with_skip_lines.out | 20 +++
.../mysql_load/test_mysql_load_skip_lines.groovy | 100 +++++++++++++++
.../stream_load/test_csv_with_skip_lines.groovy | 134 +++++++++++++++++++++
23 files changed, 362 insertions(+), 11 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index b62f2d62a9..8281064881 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -573,6 +573,9 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req, StreamLoadContext*
request.__set_trim_double_quotes(false);
}
}
+ if (!http_req->header(HTTP_SKIP_LINES).empty()) {
+ request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES)));
+ }
#ifndef BE_TEST
// plan this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index e61c828fb4..f3d7caa06a 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -52,6 +52,7 @@ static const std::string HTTP_SEND_BATCH_PARALLELISM =
"send_batch_parallelism";
static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns";
static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
+static const std::string HTTP_SKIP_LINES = "skip_lines";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index ef123c6eaa..d5863c96de 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -88,14 +88,18 @@ CsvReader::~CsvReader() = default;
Status CsvReader::init_reader(bool is_load) {
// set the skip lines and start offset
int64_t start_offset = _range.start_offset;
- if (start_offset == 0 && _params.__isset.file_attributes &&
- _params.file_attributes.__isset.header_type &&
- _params.file_attributes.header_type.size() > 0) {
- std::string header_type =
to_lower(_params.file_attributes.header_type);
- if (header_type == BeConsts::CSV_WITH_NAMES) {
- _skip_lines = 1;
- } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
- _skip_lines = 2;
+ if (start_offset == 0) {
+ // check header typer first
+ if (_params.__isset.file_attributes &&
_params.file_attributes.__isset.header_type &&
+ _params.file_attributes.header_type.size() > 0) {
+ std::string header_type =
to_lower(_params.file_attributes.header_type);
+ if (header_type == BeConsts::CSV_WITH_NAMES) {
+ _skip_lines = 1;
+ } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+ _skip_lines = 2;
+ }
+ } else if (_params.file_attributes.__isset.skip_lines) {
+ _skip_lines = _params.file_attributes.skip_lines;
}
} else if (start_offset != 0) {
if (_file_format_type != TFileFormatType::FORMAT_CSV_PLAIN ||
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
index 0313c3f343..f01ce5126b 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
@@ -183,6 +183,8 @@ ERRORS:
25. trim_double_quotes: Boolean type, The default value is false. True means
that the outermost double quotes of each field in the csv file are trimmed.
+26. skip_lines: <version since="dev" type="inline"> Integer type, the default
value is 0. It will skip some lines in the head of csv file. It will be
disabled when format is `csv_with_names` or `csv_with_names_and_types`.
</version>
+
### Example
1. Import the data in the local file 'testData' into the table 'testTbl' in
the database 'testDb', and use Label for deduplication. Specify a timeout of
100 seconds
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
index 1d40986465..d383705d75 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
@@ -180,6 +180,8 @@ ERRORS:
25. trim_double_quotes: 布尔类型,默认值为 false,为 true 时表示裁剪掉 csv 文件每个字段最外层的双引号。
+26. skip_lines: <version since="dev" type="inline"> 整数类型, 默认值为0,
含义为跳过csv文件的前几行. 当设置format设置为 `csv_with_names` 或、`csv_with_names_and_types` 时,
该参数会失效. </version>
+
### Example
1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index c977a9b731..9dd51f2657 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -621,7 +621,8 @@ terminal String
KW_AUTO,
KW_PREPARE,
KW_EXECUTE,
- KW_LINES;
+ KW_LINES,
+ KW_IGNORE;
terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON,
LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT, PLACEHOLDER;
terminal BITAND, BITOR, BITXOR, BITNOT;
@@ -833,6 +834,7 @@ nonterminal String opt_user_role;
nonterminal TablePattern tbl_pattern;
nonterminal ResourcePattern resource_pattern;
nonterminal String ident_or_star;
+nonterminal Integer opt_skip_lines;
// password policy
nonterminal PasswordOptions opt_password_option;
@@ -2365,12 +2367,13 @@ mysql_data_desc ::=
opt_partition_names:partitionNames
opt_field_term:colSep
opt_line_term:lineDelimiter
+ opt_skip_lines:skipLines
opt_col_list:colList
opt_col_mapping_list:colMappingList
opt_properties:properties
{:
RESULT = new DataDescription(tableName, partitionNames, file,
clientLocal, colList, colSep, lineDelimiter,
- colMappingList, properties);
+ skipLines, colMappingList, properties);
:}
;
@@ -2416,6 +2419,21 @@ opt_line_term ::=
:}
;
+opt_skip_lines ::=
+ /* Empty */
+ {:
+ RESULT = 0;
+ :}
+ | KW_IGNORE INTEGER_LITERAL:number KW_LINES
+ {:
+ RESULT = number.intValue();
+ :}
+ | KW_IGNORE INTEGER_LITERAL:number KW_ROWS
+ {:
+ RESULT = number.intValue();
+ :}
+ ;
+
separator ::=
KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep
{:
@@ -7057,6 +7075,8 @@ keyword ::=
{: RESULT = id; :}
| KW_LINES:id
{: RESULT = id; :}
+ | KW_IGNORE:id
+ {: RESULT = id; :}
;
// Identifier that contain keyword
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 63e4c41fc7..32586c6d4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -151,6 +151,7 @@ public class DataDescription {
private final Map<String, String> properties;
private boolean trimDoubleQuotes = false;
private boolean isMysqlLoad = false;
+ private int skipLines = 0;
public DataDescription(String tableName,
PartitionNames partitionNames,
@@ -232,6 +233,7 @@ public class DataDescription {
List<String> columns,
Separator columnSeparator,
Separator lineDelimiter,
+ int skipLines,
List<Expr> columnMappingList,
Map<String, String> properties) {
this.tableName = tableName.getTbl();
@@ -242,6 +244,7 @@ public class DataDescription {
this.fileFieldNames = columns;
this.columnSeparator = columnSeparator;
this.lineDelimiter = lineDelimiter;
+ this.skipLines = skipLines;
this.fileFormat = null;
this.columnsFromPath = null;
this.isNegative = false;
@@ -288,6 +291,7 @@ public class DataDescription {
this.numAsString = taskInfo.isNumAsString();
this.properties = Maps.newHashMap();
this.trimDoubleQuotes = taskInfo.getTrimDoubleQuotes();
+ this.skipLines = taskInfo.getSkipLines();
}
private void getFileFormatAndCompressType(LoadTaskInfo taskInfo) {
@@ -702,6 +706,10 @@ public class DataDescription {
return properties;
}
+ public int getSkipLines() {
+ return skipLines;
+ }
+
/*
* Analyze parsedExprMap and columnToHadoopFunction from columns, columns
from path and columnMappingList
* Example:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 60d405eb86..8a1a45f7b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -117,6 +117,8 @@ public class LoadStmt extends DdlStmt {
public static final String KEY_IN_PARAM_FUNCTION_COLUMN =
"function_column";
public static final String KEY_IN_PARAM_SEQUENCE_COL = "sequence_col";
public static final String KEY_IN_PARAM_BACKEND_ID = "backend_id";
+ public static final String KEY_SKIP_LINES = "skip_lines";
+
private final LabelName label;
private final List<DataDescription> dataDescriptions;
private final BrokerDesc brokerDesc;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 887062bda1..016ef13e06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -106,6 +106,7 @@ public class BrokerFileGroup implements Writable {
private boolean readJsonByLine = false;
private boolean numAsString = false;
private boolean trimDoubleQuotes = false;
+ private int skipLines;
// for unit test and edit log persistence
private BrokerFileGroup() {
@@ -264,6 +265,7 @@ public class BrokerFileGroup implements Writable {
numAsString = dataDescription.isNumAsString();
}
trimDoubleQuotes = dataDescription.getTrimDoubleQuotes();
+ skipLines = dataDescription.getSkipLines();
}
public long getTableId() {
@@ -422,6 +424,10 @@ public class BrokerFileGroup implements Writable {
return trimDoubleQuotes;
}
+ public int getSkipLines() {
+ return skipLines;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index a394ee34d0..17a6c8ebd2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -173,6 +173,11 @@ public class MysqlLoadManager {
}
}
+ // skip_lines
+ if (desc.getSkipLines() != 0) {
+ httpPut.addHeader(LoadStmt.KEY_SKIP_LINES,
Integer.toString(desc.getSkipLines()));
+ }
+
// column_separator
if (desc.getColumnSeparator() != null) {
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR,
desc.getColumnSeparator());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index ae2bf5d4ff..5a5dd7b061 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -132,6 +132,7 @@ public class LoadScanProvider implements FileScanProviderIf
{
fileAttributes.setReadByColumnDef(true);
fileAttributes.setHeaderType(getHeaderType(fileGroup.getFileFormat()));
fileAttributes.setTrimDoubleQuotes(fileGroup.getTrimDoubleQuotes());
+ fileAttributes.setSkipLines(fileGroup.getSkipLines());
}
private String getHeaderType(String formatType) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index e384394653..cb938e84cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -101,6 +101,10 @@ public interface LoadTaskInfo {
return false;
}
+ default int getSkipLines() {
+ return 0;
+ }
+
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 2a80ceef90..868835fff8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -84,6 +84,8 @@ public class StreamLoadTask implements LoadTaskInfo {
private List<String> hiddenColumns;
private boolean trimDoubleQuotes = false;
+ private int skipLines = 0;
+
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType,
TFileFormatType formatType,
TFileCompressType compressType) {
this.id = id;
@@ -257,6 +259,10 @@ public class StreamLoadTask implements LoadTaskInfo {
return trimDoubleQuotes;
}
+ public int getSkipLines() {
+ return skipLines;
+ }
+
public static StreamLoadTask
fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new
StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@@ -359,6 +365,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetTrimDoubleQuotes()) {
trimDoubleQuotes = request.isTrimDoubleQuotes();
}
+ if (request.isSetSkipLines()) {
+ skipLines = request.getSkipLines();
+ }
}
// used for stream load
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 3df5f685fc..85a15f0b24 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -483,6 +483,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("prepare", new Integer(SqlParserSymbols.KW_PREPARE));
keywordMap.put("execute", new Integer(SqlParserSymbols.KW_EXECUTE));
keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES));
+ keywordMap.put("ignore", new Integer(SqlParserSymbols.KW_IGNORE));
}
// map from token id to token description
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
index 023fdec59e..ec07f8c0b5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java
@@ -385,7 +385,7 @@ public class DataDescriptionTest {
properties.put("line_delimiter", "abc");
DataDescription desc =
new DataDescription(tbl, new PartitionNames(false,
Lists.newArrayList("p1", "p2")), "abc.txt", true,
- Lists.newArrayList("k1", "k2", "v1"), new
Separator("010203"), new Separator("040506"),
+ Lists.newArrayList("k1", "k2", "v1"), new
Separator("010203"), new Separator("040506"), 0,
Lists.newArrayList(predicate), properties);
String db = desc.analyzeFullDbName(null, analyzer);
Assert.assertEquals("default_cluster:testDb", db);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 7d1e4e2481..457e78020d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -547,6 +547,7 @@ struct TStreamLoadPutRequest {
40: optional PlanNodes.TFileCompressType compress_type
41: optional i64 file_size // only for stream load with parquet or orc
42: optional bool trim_double_quotes // trim double quotes for csv
+ 43: optional i32 skip_lines // csv skip line num, only used when csv
header_type is not set.
}
struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 6eed8b46cd..41bd4e088a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -174,6 +174,8 @@ struct TBrokerRangeDesc {
18: optional bool read_by_column_def;
// csv with header type
19: optional string header_type;
+ // csv skip line num, only used when csv header_type is not set.
+ 20: optional i32 skip_lines;
}
struct TBrokerScanRangeParams {
@@ -259,6 +261,8 @@ struct TFileAttributes {
9: optional string header_type;
// trim double quotes for csv
10: optional bool trim_double_quotes;
+ // csv skip line num, only used when csv header_type is not set.
+ 11: optional i32 skip_lines;
}
struct TIcebergDeleteFileDesc {
diff --git a/regression-test/data/load_p0/mysql_load/mysql_load_skip_lines.csv
b/regression-test/data/load_p0/mysql_load/mysql_load_skip_lines.csv
new file mode 100644
index 0000000000..f6db3b6bfe
--- /dev/null
+++ b/regression-test/data/load_p0/mysql_load/mysql_load_skip_lines.csv
@@ -0,0 +1,4 @@
+1,2,3,abc,2022-12-01,2022-12-01:09:30:31
+2,3,3,abc,2022-12-01,2022-12-01:09:30:31
+3,4,3,abc,2022-12-01,2022-12-01:09:30:31
+4,5,3,abc,2022-12-01,2022-12-01:09:30:31
diff --git
a/regression-test/data/load_p0/mysql_load/test_mysql_load_skip_lines.out
b/regression-test/data/load_p0/mysql_load/test_mysql_load_skip_lines.out
new file mode 100644
index 0000000000..04089c0388
--- /dev/null
+++ b/regression-test/data/load_p0/mysql_load/test_mysql_load_skip_lines.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 2 3 abc 2022-12-01 2022-12-01T09:30:31
+2 3 3 abc 2022-12-01 2022-12-01T09:30:31
+3 4 3 abc 2022-12-01 2022-12-01T09:30:31
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+
+-- !sql --
+3 4 3 abc 2022-12-01 2022-12-01T09:30:31
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+
+-- !sql --
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+
+-- !sql --
+
diff --git a/regression-test/data/load_p0/stream_load/csv_with_skip_lines.csv
b/regression-test/data/load_p0/stream_load/csv_with_skip_lines.csv
new file mode 100644
index 0000000000..f6db3b6bfe
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/csv_with_skip_lines.csv
@@ -0,0 +1,4 @@
+1,2,3,abc,2022-12-01,2022-12-01:09:30:31
+2,3,3,abc,2022-12-01,2022-12-01:09:30:31
+3,4,3,abc,2022-12-01,2022-12-01:09:30:31
+4,5,3,abc,2022-12-01,2022-12-01:09:30:31
diff --git
a/regression-test/data/load_p0/stream_load/test_csv_with_skip_lines.out
b/regression-test/data/load_p0/stream_load/test_csv_with_skip_lines.out
new file mode 100644
index 0000000000..445cec6a5a
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_csv_with_skip_lines.out
@@ -0,0 +1,20 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+
+-- !sql --
+
+-- !sql --
+
+-- !sql --
+2 3 3 abc 2022-12-01 2022-12-01T09:30:31
+3 4 3 abc 2022-12-01 2022-12-01T09:30:31
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+
+-- !sql --
+3 4 3 abc 2022-12-01 2022-12-01T09:30:31
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+
+-- !sql --
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+
diff --git
a/regression-test/suites/load_p0/mysql_load/test_mysql_load_skip_lines.groovy
b/regression-test/suites/load_p0/mysql_load/test_mysql_load_skip_lines.groovy
new file mode 100644
index 0000000000..7b1cafbbb0
--- /dev/null
+++
b/regression-test/suites/load_p0/mysql_load/test_mysql_load_skip_lines.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mysql_load_skip_lines", "p0") {
+ sql "show tables"
+
+ def tableName = "test_mysql_load_skip_lines"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) NULL,
+ `v2` string NULL,
+ `v3` date NULL,
+ `v4` datetime NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def mysql_load_skip_lines = getLoalFilePath "mysql_load_skip_lines.csv"
+
+ // no any skip
+ sql """
+ LOAD DATA
+ LOCAL
+ INFILE '${mysql_load_skip_lines}'
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY ','
+ PROPERTIES ("auth" = "root:");
+ """
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ // skip 2 lines
+ sql """truncate table ${tableName}"""
+ sql """
+ LOAD DATA
+ LOCAL
+ INFILE '${mysql_load_skip_lines}'
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY ','
+ IGNORE 2 LINES
+ PROPERTIES ("auth" = "root:");
+ """
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+
+ // skip 3 rows
+ sql """truncate table ${tableName}"""
+ sql """
+ LOAD DATA
+ LOCAL
+ INFILE '${mysql_load_skip_lines}'
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY ','
+ IGNORE 3 ROWS
+ PROPERTIES ("auth" = "root:");
+ """
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ // skip 5 rows
+ sql """truncate table ${tableName}"""
+ sql """
+ LOAD DATA
+ LOCAL
+ INFILE '${mysql_load_skip_lines}'
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY ','
+ IGNORE 5 ROWS
+ PROPERTIES ("auth" = "root:");
+ """
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+}
+
diff --git
a/regression-test/suites/load_p0/stream_load/test_csv_with_skip_lines.groovy
b/regression-test/suites/load_p0/stream_load/test_csv_with_skip_lines.groovy
new file mode 100644
index 0000000000..4f14d47d62
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_csv_with_skip_lines.groovy
@@ -0,0 +1,134 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_csv_with_skip_lines", "p0") {
+ def tableName = "test_csv_with_skip_lines"
+
+ // create table
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) NULL,
+ `v2` string NULL,
+ `v3` date NULL,
+ `v4` datetime NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // skip 3 lines and file have 4 lines
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'skip_lines', '3'
+
+ file 'csv_with_skip_lines.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ // skip 4 lines and file have 4 lines
+ sql """truncate table ${tableName}"""
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'skip_lines', '4'
+
+ file 'csv_with_skip_lines.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ // skip 5 lines and file have 4 lines
+ sql """truncate table ${tableName}"""
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'skip_lines', '5'
+
+ file 'csv_with_skip_lines.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ // skip 3 lines and set format = csv_with_names ==>>> skip 1 line
+ sql """truncate table ${tableName}"""
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv_with_names'
+ set 'skip_lines', '3'
+
+ file 'csv_with_skip_lines.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ // skip 3 lines and set format = csv_with_names_and_types ==>>> skip 2 line
+ sql """truncate table ${tableName}"""
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv_with_names_and_types'
+ set 'skip_lines', '3'
+
+ file 'csv_with_skip_lines.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ // skip 3 lines and set format = csv ==>>> skip 3 line
+ sql """truncate table ${tableName}"""
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'skip_lines', '3'
+
+ file 'csv_with_skip_lines.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+
+ // drop drop
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]