This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new bd24a8bdd9b [Fix](csv_reader) Add a session variable to control
whether empty rows in CSV files are read as NULL values (#37153)
bd24a8bdd9b is described below
commit bd24a8bdd9b7e327fc2ded62bc6fc30092c46c27
Author: Tiewei Fang <[email protected]>
AuthorDate: Tue Jul 2 22:12:17 2024 +0800
[Fix](csv_reader) Add a session variable to control whether empty rows in
CSV files are read as NULL values (#37153)
bp: #36668
---
be/src/runtime/runtime_state.h | 5 +
be/src/vec/exec/format/csv/csv_reader.cpp | 25 ++++-
be/src/vec/exec/format/csv/csv_reader.h | 1 +
.../java/org/apache/doris/qe/SessionVariable.java | 8 ++
gensrc/thrift/PaloInternalService.thrift | 1 +
.../tvf/test_read_csv_empty_line_as_null.out | 31 ++++++
.../tvf/test_read_csv_empty_line_as_null.groovy | 111 +++++++++++++++++++++
7 files changed, 180 insertions(+), 2 deletions(-)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 33b5ded9c3a..b88b29ee8d0 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -518,6 +518,11 @@ public:
return _query_options.__isset.enable_parallel_scan &&
_query_options.enable_parallel_scan;
}
+ bool is_read_csv_empty_line_as_null() const {
+ return _query_options.__isset.read_csv_empty_line_as_null &&
+ _query_options.read_csv_empty_line_as_null;
+ }
+
int parallel_scan_max_scanners_count() const {
return _query_options.__isset.parallel_scan_max_scanners_count
? _query_options.parallel_scan_max_scanners_count
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index a10ba8c3d14..7894b5c57ae 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -485,7 +485,10 @@ Status CsvReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
continue;
}
if (size == 0) {
- // Read empty row, just continue
+ if (!_line_reader_eof &&
_state->is_read_csv_empty_line_as_null()) {
+ ++rows;
+ }
+ // Read empty line, continue
continue;
}
@@ -518,7 +521,10 @@ Status CsvReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
continue;
}
if (size == 0) {
- // Read empty row, just continue
+ if (!_line_reader_eof &&
_state->is_read_csv_empty_line_as_null()) {
+ RETURN_IF_ERROR(_fill_empty_line(block, columns, &rows));
+ }
+ // Read empty line, continue
continue;
}
@@ -661,6 +667,21 @@ Status CsvReader::_fill_dest_columns(const Slice& line,
Block* block,
return Status::OK();
}
+Status CsvReader::_fill_empty_line(Block* block,
std::vector<MutableColumnPtr>& columns,
+ size_t* rows) {
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ IColumn* col_ptr = columns[i];
+ if (!_is_load) {
+ col_ptr = const_cast<IColumn*>(
+
block->get_by_position(_file_slot_idx_map[i]).column.get());
+ }
+ auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
+ null_column.insert_data(nullptr, 0);
+ }
+ ++(*rows);
+ return Status::OK();
+}
+
Status CsvReader::_validate_line(const Slice& line, bool* success) {
if (!_is_proto_format && !validate_utf8(line.data, line.size)) {
if (!_is_load) {
diff --git a/be/src/vec/exec/format/csv/csv_reader.h
b/be/src/vec/exec/format/csv/csv_reader.h
index d9c8633f427..65eba62a54c 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -204,6 +204,7 @@ private:
Status _create_decompressor();
Status _fill_dest_columns(const Slice& line, Block* block,
std::vector<MutableColumnPtr>& columns, size_t*
rows);
+ Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>&
columns, size_t* rows);
Status _line_split_to_values(const Slice& line, bool* success);
void _split_line(const Slice& line);
Status _check_array_format(std::vector<Slice>& split_values, bool*
is_success);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d6e75faf673..5cf6cb901d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -194,6 +194,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE =
"enable_sync_runtime_filter_size";
+ public static final String READ_CSV_EMPTY_LINE_AS_NULL =
"read_csv_empty_line_as_null";
+
public static final String BE_NUMBER_FOR_TEST = "be_number_for_test";
// max ms to wait transaction publish finish when exec insert stmt.
@@ -1034,6 +1036,11 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward =
true)
private boolean enableSyncRuntimeFilterSize = true;
+ @VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward =
true,
+ description = {"在读取csv文件时是否读取csv的空行为null",
+ "Determine whether to read empty rows in CSV files as NULL
when reading CSV files."})
+ public boolean readCsvEmptyLineAsNull = false;
+
@VariableMgr.VarAttr(name = USE_RF_DEFAULT)
public boolean useRuntimeFilterDefaultSize = false;
@@ -3327,6 +3334,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setMinRevocableMem(minRevocableMem);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
+ tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
return tResult;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 35331f13fc7..aaedd219fb2 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -297,6 +297,7 @@ struct TQueryOptions {
113: optional bool enable_force_spill = false;
+ 117: optional bool read_csv_empty_line_as_null = false
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}
diff --git
a/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out
b/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out
new file mode 100644
index 00000000000..6fbf970ff67
--- /dev/null
+++
b/regression-test/data/external_table_p0/tvf/test_read_csv_empty_line_as_null.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_base --
+1,doris1,16
+2,doris2,18
+
+3,doris3,19
+
+
+
+
+4,doris4,20
+
+
+-- !select_1 --
+1 doris1 16
+2 doris2 18
+3 doris3 19
+4 doris4 20
+
+-- !select_1 --
+\N \N \N
+\N \N \N
+\N \N \N
+\N \N \N
+\N \N \N
+\N \N \N
+1 doris1 16
+2 doris2 18
+3 doris3 19
+4 doris4 20
+
diff --git
a/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy
b/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy
new file mode 100644
index 00000000000..0a64109eb22
--- /dev/null
+++
b/regression-test/suites/external_table_p0/tvf/test_read_csv_empty_line_as_null.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_read_csv_empty_line_as_null", "p0") {
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+
+ def export_table_name = "test_read_csv_empty_line"
+ def outFilePath = "${bucket}/test_read_csv_empty_line/exp_"
+
+
+ def create_table = {table_name ->
+ sql """ DROP TABLE IF EXISTS ${table_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ `id` INT NULL,
+ `content` varchar(32) NULL
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+ }
+
+ def outfile_to_S3 = {
+ // select ... into outfile ...
+ def res = sql """
+ SELECT content FROM ${export_table_name} t ORDER BY id
+ INTO OUTFILE "s3://${outFilePath}"
+ FORMAT AS csv
+ PROPERTIES (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}"
+ );
+ """
+
+ return res[0][3]
+ }
+
+ // create table to export data
+ create_table(export_table_name)
+
+ // insert data
+ sql """ insert into ${export_table_name} values (1, "1,doris1,16"); """
+ sql """ insert into ${export_table_name} values (2, "2,doris2,18"); """
+ sql """ insert into ${export_table_name} values (3, ""); """
+ sql """ insert into ${export_table_name} values (4, "3,doris3,19"); """
+ sql """ insert into ${export_table_name} values (5, ""); """
+ sql """ insert into ${export_table_name} values (6, ""); """
+ sql """ insert into ${export_table_name} values (7, ""); """
+ sql """ insert into ${export_table_name} values (8, ""); """
+ sql """ insert into ${export_table_name} values (9, "4,doris4,20"); """
+ sql """ insert into ${export_table_name} values (10, ""); """
+
+ // test base data
+ qt_select_base """ SELECT content FROM ${export_table_name} t ORDER BY id;
"""
+
+ // test outfile to s3
+ def outfile_url = outfile_to_S3()
+
+ // test read_csv_empty_line_as_null = false
+ try {
+ order_qt_select_1 """ SELECT * FROM S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.csv",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "column_separator" = ",",
+ "region" = "${region}"
+ );
+ """
+ } finally {
+ }
+
+ // test read_csv_empty_line_as_null = true
+ try {
+ sql """ set read_csv_empty_line_as_null=true; """
+ order_qt_select_1 """ SELECT * FROM S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.csv",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "column_separator" = ",",
+ "region" = "${region}"
+ );
+ """
+ } finally {
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]