This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new efd3746f907 [feat](csv reader) support empty field as null (#55041)
efd3746f907 is described below
commit efd3746f907fef252007529822b1c92360ac9e5f
Author: hui lai <[email protected]>
AuthorDate: Sat Sep 6 21:14:36 2025 +0800
[feat](csv reader) support empty field as null (#55041)
Support user set empty field as null by config `empty_field_as_null`
header. For example:
create table:
```
CREATE TABLE IF NOT EXISTS test_stream_load_empty_field_as_null (
`k1` int(20) NULL,
`k2` string NULL,
`v1` date NULL,
`v2` string NULL,
`v3` datetime NULL,
`v4` string NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
```
data
```
10,,2023-07-15,def,2023-07-20T05:48:31,ghi
```
command
```
curl --location-trusted -u root:"" -H "empty_field_as_null:true" -T
data.csv -H"column_separator:," http://127.0.0.1:8030/api/db/test/_stream_load
```
result
```
10 \N 2023-07-15 def 2023-07-20T05:48:31 ghi
```
---
be/src/http/action/stream_load.cpp | 6 +
be/src/http/http_common.h | 1 +
be/src/vec/exec/format/csv/csv_reader.cpp | 10 ++
be/src/vec/exec/format/csv/csv_reader.h | 1 +
.../fileformat/CsvFileFormatProperties.java | 11 ++
.../doris/load/routineload/RoutineLoadJob.java | 13 ++
.../doris/nereids/load/NereidsBrokerLoadTask.java | 5 +
.../doris/nereids/load/NereidsDataDescription.java | 2 +
.../doris/nereids/load/NereidsLoadTaskInfo.java | 2 +
.../nereids/load/NereidsRoutineLoadTaskInfo.java | 11 ++
.../doris/nereids/load/NereidsStreamLoadTask.java | 14 ++
.../plans/commands/AlterRoutineLoadCommand.java | 8 +
.../plans/commands/info/CreateRoutineLoadInfo.java | 1 +
gensrc/thrift/FrontendService.thrift | 1 +
gensrc/thrift/PlanNodes.thrift | 1 +
.../test_bulk_load_empty_field_as_null.out | Bin 0 -> 196 bytes
.../test_routine_load_empty_field_as_null.out | Bin 0 -> 305 bytes
.../load_p0/stream_load/empty_field_as_null.csv | 2 +
.../test_stream_load_empty_field_as_null.out | Bin 0 -> 305 bytes
.../load_p0/tvf/test_tvf_empty_field_as_null.out | Bin 0 -> 199 bytes
.../test_bulk_load_empty_field_as_null.groovy | 82 ++++++++++
.../test_routine_load_empty_field_as_null.groovy | 173 +++++++++++++++++++++
.../test_stream_load_empty_field_as_null.groovy | 63 ++++++++
.../tvf/test_tvf_empty_field_as_null.groovy | 52 +++++++
24 files changed, 459 insertions(+)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index f8f720d6b66..d8175b17f86 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -744,6 +744,12 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER));
}
+ if (!http_req->header(HTTP_EMPTY_FIELD_AS_NULL).empty()) {
+ if (iequal(http_req->header(HTTP_EMPTY_FIELD_AS_NULL), "true")) {
+ request.__set_empty_field_as_null(true);
+ }
+ }
+
#ifndef BE_TEST
// plan this load
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 4c856ba4478..7719070cb24 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -71,5 +71,6 @@ static const std::string HTTP_WAL_ID_KY = "wal_id";
static const std::string HTTP_AUTH_CODE = "auth_code"; // deprecated
static const std::string HTTP_GROUP_COMMIT = "group_commit";
static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster";
+static const std::string HTTP_EMPTY_FIELD_AS_NULL = "empty_field_as_null";
} // namespace doris
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 0f5ab9648ad..0e164e1c399 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -458,6 +458,12 @@ Status
CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
auto& null_column = assert_cast<ColumnNullable&>(column);
+ if (_empty_field_as_null) {
+ if (slice.size == 0) {
+ null_column.insert_data(nullptr, 0);
+ return Status::OK();
+ }
+ }
if (_options.null_len > 0 && !(_options.converted_from_string &&
slice.trim_double_quotes())) {
if (slice.compare(Slice(_options.null_format, _options.null_len)) ==
0) {
null_column.insert_data(nullptr, 0);
@@ -520,6 +526,10 @@ Status CsvReader::_init_options() {
if (_state != nullptr) {
_keep_cr = _state->query_options().keep_carriage_return;
}
+
+ if (_params.file_attributes.text_params.__isset.empty_field_as_null) {
+ _empty_field_as_null =
_params.file_attributes.text_params.empty_field_as_null;
+ }
return Status::OK();
}
diff --git a/be/src/vec/exec/format/csv/csv_reader.h
b/be/src/vec/exec/format/csv/csv_reader.h
index e28c56cbdc5..fcdc5e5606f 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -275,6 +275,7 @@ private:
bool _trim_double_quotes = false;
bool _trim_tailing_spaces = false;
bool _keep_cr = false;
+ bool _empty_field_as_null = false;
io::IOContext* _io_ctx = nullptr;
// save source text which have been splitted.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
index 55e545b4fa8..57ac88e6b2a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
@@ -67,6 +67,8 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
public static final String PROP_ENABLE_TEXT_VALIDATE_UTF8 =
"enable_text_validate_utf8";
+ public static final String PROP_EMPTY_FIELD_AS_NULL =
"empty_field_as_null";
+
private String headerType = "";
private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
private String lineDelimiter = DEFAULT_LINE_DELIMITER;
@@ -75,6 +77,7 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
private byte enclose;
private byte escape;
private boolean enableTextValidateUTF8 = true;
+ private boolean emptyFieldAsNull;
public CsvFileFormatProperties(String formatName) {
super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
@@ -149,6 +152,9 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
enableTextValidateUTF8 = Boolean.parseBoolean(validateUtf8);
}
+ emptyFieldAsNull = Boolean.valueOf(getOrDefault(formatProperties,
+ PROP_EMPTY_FIELD_AS_NULL, "false", isRemoveOriginProperty))
+ .booleanValue();
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage());
}
@@ -185,6 +191,7 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
fileTextScanRangeParams.setEnclose(this.enclose);
fileTextScanRangeParams.setEscape(this.escape);
+ fileTextScanRangeParams.setEmptyFieldAsNull(this.emptyFieldAsNull);
fileAttributes.setTextParams(fileTextScanRangeParams);
fileAttributes.setHeaderType(headerType);
fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes);
@@ -220,4 +227,8 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
public byte getEscape() {
return escape;
}
+
+ public boolean getEmptyFieldAsNull() {
+ return emptyFieldAsNull;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 34cd9e5228f..3717b377ff2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -278,6 +278,8 @@ public abstract class RoutineLoadJob
protected byte escape = 0;
+ protected boolean emptyFieldAsNull = false;
+
// use for cloud cluster mode
@SerializedName("ccn")
protected String cloudCluster;
@@ -397,8 +399,11 @@ public abstract class RoutineLoadJob
new String(new
byte[]{csvFileFormatProperties.getEnclose()}));
jobProperties.put(CsvFileFormatProperties.PROP_ESCAPE,
new String(new
byte[]{csvFileFormatProperties.getEscape()}));
+ jobProperties.put(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL,
+
String.valueOf(csvFileFormatProperties.getEmptyFieldAsNull()));
this.enclose = csvFileFormatProperties.getEnclose();
this.escape = csvFileFormatProperties.getEscape();
+ this.emptyFieldAsNull =
csvFileFormatProperties.getEmptyFieldAsNull();
} else if (fileFormatProperties instanceof JsonFileFormatProperties) {
JsonFileFormatProperties jsonFileFormatProperties =
(JsonFileFormatProperties) fileFormatProperties;
jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
@@ -605,6 +610,14 @@ public abstract class RoutineLoadJob
return escape;
}
+ public boolean getEmptyFieldAsNull() {
+ String value =
jobProperties.get(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL);
+ if (value == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(value);
+ }
+
public boolean isStrictMode() {
String value = jobProperties.get(LoadStmt.STRICT_MODE);
if (value == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
index 6cf1f7d436f..7d28c0a9bdd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsBrokerLoadTask.java
@@ -225,4 +225,9 @@ public class NereidsBrokerLoadTask implements
NereidsLoadTaskInfo {
public byte getEscape() {
return 0;
}
+
+ @Override
+ public boolean getEmptyFieldAsNull() {
+ return false;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java
index f6a43fc772d..eb1a727afdf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsDataDescription.java
@@ -426,6 +426,8 @@ public class NereidsDataDescription {
String.valueOf(taskInfo.getTrimDoubleQuotes()));
putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_SKIP_LINES,
String.valueOf(taskInfo.getSkipLines()));
+
putAnalysisMapIfNonNull(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL,
+ String.valueOf(taskInfo.getEmptyFieldAsNull()));
putAnalysisMapIfNonNull(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
String.valueOf(taskInfo.isStripOuterArray()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
index 127c4e65806..0d15e3d9086 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadTaskInfo.java
@@ -114,6 +114,8 @@ public interface NereidsLoadTaskInfo {
boolean isFixedPartialUpdate();
+ boolean getEmptyFieldAsNull();
+
default TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
return TUniqueKeyUpdateMode.UPSERT;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
index e9ffb514d66..a9ecf76523c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.thrift.TFileCompressType;
@@ -61,6 +62,7 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
protected Separator lineDelimiter;
protected byte enclose;
protected byte escape;
+ protected boolean emptyFieldAsNull;
protected int sendBatchParallelism;
protected boolean loadToSingleTablet;
protected boolean isPartialUpdate;
@@ -267,6 +269,15 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
return escape;
}
+ @Override
+ public boolean getEmptyFieldAsNull() {
+ String value =
jobProperties.get(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL);
+ if (value == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(value);
+ }
+
@Override
public int getSendBatchParallelism() {
return sendBatchParallelism;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java
index 9e1ed849b5d..6b3fa1ec3ee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadTask.java
@@ -95,6 +95,8 @@ public class NereidsStreamLoadTask implements
NereidsLoadTaskInfo {
private String groupCommit;
+ private boolean emptyFieldAsNull = false;
+
/**
* NereidsStreamLoadTask
*/
@@ -335,6 +337,15 @@ public class NereidsStreamLoadTask implements
NereidsLoadTaskInfo {
this.streamPerNode = streamPerNode;
}
+ @Override
+ public boolean getEmptyFieldAsNull() {
+ return emptyFieldAsNull;
+ }
+
+ public void setEmptyFieldAsNull(boolean emptyFieldAsNull) {
+ this.emptyFieldAsNull = emptyFieldAsNull;
+ }
+
/**
* fromTStreamLoadPutRequest
*/
@@ -500,6 +511,9 @@ public class NereidsStreamLoadTask implements
NereidsLoadTaskInfo {
if (request.isSetStreamPerNode()) {
this.streamPerNode = request.getStreamPerNode();
}
+ if (request.isSetEmptyFieldAsNull()) {
+ emptyFieldAsNull = request.isEmptyFieldAsNull();
+ }
}
// used for stream load
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
index b173863a3ce..ac3a4a0d367 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java
@@ -80,6 +80,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
.add(JsonFileFormatProperties.PROP_JSON_ROOT)
.add(CsvFileFormatProperties.PROP_ENCLOSE)
.add(CsvFileFormatProperties.PROP_ESCAPE)
+ .add(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL)
.build();
private final LabelNameInfo labelNameInfo;
@@ -297,6 +298,13 @@ public class AlterRoutineLoadCommand extends AlterCommand {
analyzedJobProperties.put(CsvFileFormatProperties.PROP_ESCAPE,
jobProperties.get(CsvFileFormatProperties.PROP_ESCAPE));
}
+
+ if
(jobProperties.containsKey(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL)) {
+ boolean emptyFieldAsNull = Boolean.parseBoolean(
+
jobProperties.get(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL));
+
analyzedJobProperties.put(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL,
+ String.valueOf(emptyFieldAsNull));
+ }
}
private void checkDataSourceProperties() throws UserException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index dacc706ea35..e5fe196799f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -131,6 +131,7 @@ public class CreateRoutineLoadInfo {
.add(JsonFileFormatProperties.PROP_JSON_ROOT)
.add(CsvFileFormatProperties.PROP_ENCLOSE)
.add(CsvFileFormatProperties.PROP_ESCAPE)
+ .add(CsvFileFormatProperties.PROP_EMPTY_FIELD_AS_NULL)
.build();
private static final Logger LOG =
LogManager.getLogger(CreateRoutineLoadInfo.class);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index ebad46253f7..87d8fc8d0cf 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -556,6 +556,7 @@ struct TStreamLoadPutRequest {
56: optional string group_commit_mode
57: optional Types.TUniqueKeyUpdateMode unique_key_update_mode
58: optional Descriptors.TPartialUpdateNewRowPolicy
partial_update_new_key_policy
+ 59: optional bool empty_field_as_null
// For cloud
1000: optional string cloud_cluster
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 998f75cf40a..967eba42479 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -247,6 +247,7 @@ struct TFileTextScanRangeParams {
5: optional i8 enclose;
6: optional i8 escape;
7: optional string null_format;
+ 8: optional bool empty_field_as_null
}
struct TFileScanSlotInfo {
diff --git
a/regression-test/data/load_p0/broker_load/test_bulk_load_empty_field_as_null.out
b/regression-test/data/load_p0/broker_load/test_bulk_load_empty_field_as_null.out
new file mode 100644
index 00000000000..9e07b24bbc8
Binary files /dev/null and
b/regression-test/data/load_p0/broker_load/test_bulk_load_empty_field_as_null.out
differ
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_empty_field_as_null.out
b/regression-test/data/load_p0/routine_load/test_routine_load_empty_field_as_null.out
new file mode 100644
index 00000000000..ffdd86573a0
Binary files /dev/null and
b/regression-test/data/load_p0/routine_load/test_routine_load_empty_field_as_null.out
differ
diff --git a/regression-test/data/load_p0/stream_load/empty_field_as_null.csv
b/regression-test/data/load_p0/stream_load/empty_field_as_null.csv
new file mode 100644
index 00000000000..38a59c4335e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/empty_field_as_null.csv
@@ -0,0 +1,2 @@
+9,\N,2023-07-15,def,2023-07-20T05:48:31,ghi
+10,,2023-07-15,def,2023-07-20T05:48:31,ghi
\ No newline at end of file
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_empty_field_as_null.out
b/regression-test/data/load_p0/stream_load/test_stream_load_empty_field_as_null.out
new file mode 100644
index 00000000000..ffdd86573a0
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_stream_load_empty_field_as_null.out
differ
diff --git a/regression-test/data/load_p0/tvf/test_tvf_empty_field_as_null.out
b/regression-test/data/load_p0/tvf/test_tvf_empty_field_as_null.out
new file mode 100644
index 00000000000..00422d102af
Binary files /dev/null and
b/regression-test/data/load_p0/tvf/test_tvf_empty_field_as_null.out differ
diff --git
a/regression-test/suites/load_p0/broker_load/test_bulk_load_empty_field_as_null.groovy
b/regression-test/suites/load_p0/broker_load/test_bulk_load_empty_field_as_null.groovy
new file mode 100644
index 00000000000..96020c3c538
--- /dev/null
+++
b/regression-test/suites/load_p0/broker_load/test_bulk_load_empty_field_as_null.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_bulk_load_empty_field_as_null", "p0") {
+ def tableName = "test_bulk_load_empty_field_as_null"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def label = UUID.randomUUID().toString().replace("-", "0")
+
+ def sql_str = """
+ LOAD LABEL $label (
+ DATA
INFILE("s3://${s3BucketName}/regression/load/data/empty_field_as_null.csv")
+ INTO TABLE $tableName
+ COLUMNS TERMINATED BY ","
+ FORMAT AS "CSV"
+ PROPERTIES (
+ "empty_field_as_null" = "true"
+ )
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "${getS3AK()}",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_ENDPOINT" = "${getS3Endpoint()}",
+ "AWS_REGION" = "${getS3Region()}",
+ "provider" = "${getS3Provider()}"
+ );
+ """
+ logger.info("submit sql: ${sql_str}");
+ sql """${sql_str}"""
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ String[][] result = sql """ show load where label="$label" order by
createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + label)
+ break
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ def reason = result[0][7]
+ logger.info("load failed, reason:$reason")
+ assertTrue(1 == 2)
+ break
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $label")
+ }
+ }
+
+ qt_sql """ SELECT * FROM ${tableName} order by k1 """
+
+}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_empty_field_as_null.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_empty_field_as_null.groovy
new file mode 100644
index 00000000000..71d32db5fef
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_empty_field_as_null.groovy
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_empty_field_as_null","p0") {
+ def kafkaCsvTpoics = [
+ "test_routine_load_empty_field_as_null",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+ def verifyKafkaConnection = { prod ->
+ try {
+ logger.info("=====try to connect Kafka========")
+ def partitions =
prod.partitionsFor("__connection_verification_topic")
+ return partitions != null
+ } catch (Exception e) {
+ throw new Exception("Kafka connect fail:
${e.message}".toString())
+ }
+ }
+ def producer = new KafkaProducer<>(props)
+ try {
+ logger.info("Kafka connecting: ${kafka_broker}")
+ if (!verifyKafkaConnection(producer)) {
+ throw new Exception("can't get any kafka info")
+ }
+ } catch (Exception e) {
+ logger.error("FATAL: " + e.getMessage())
+ producer.close()
+ throw e
+ }
+ logger.info("Kafka connect success")
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def testData = [
+ "9,\\N,2023-07-15,def,2023-07-20T05:48:31,ghi",
+ "10,,2023-07-15,def,2023-07-20T05:48:31,ghi"
+ ]
+
+ testData.each { line ->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+
+ def tableName = "test_routine_load_empty_field_as_null"
+ def job = "test_follower_routine_load"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "empty_field_as_null" = "false"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def count = 0
+ def maxWaitCount = 60
+ while (count < maxWaitCount) {
+ def state = sql "show routine load for ${job}"
+ def routineLoadState = state[0][8].toString()
+ def statistic = state[0][14].toString()
+ logger.info("Routine load state: ${routineLoadState}")
+ logger.info("Routine load statistic: ${statistic}")
+ def rowCount = sql "select count(*) from ${tableName}"
+ if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) {
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_select_1 """
+ SELECT * FROM ${tableName} ORDER BY k1
+ """
+
+ sql "pause routine load for ${job}"
+ def res = sql "show routine load for ${job}"
+ logger.info("routine load job properties:
${res[0][11].toString()}".toString())
+ sql "ALTER ROUTINE LOAD FOR ${job}
PROPERTIES(\"empty_field_as_null\" = \"true\");"
+ sql "truncate table ${tableName}"
+ sql "resume routine load for ${job}"
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def testData = [
+ "9,\\N,2023-07-15,def,2023-07-20T05:48:31,ghi",
+ "10,,2023-07-15,def,2023-07-20T05:48:31,ghi"
+ ]
+ testData.each { line ->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null,
line)
+ producer.send(record)
+ }
+ }
+
+ count = 0
+ while (count < maxWaitCount) {
+ def state = sql "show routine load for ${job}"
+ def routineLoadState = state[0][8].toString()
+ def statistic = state[0][14].toString()
+ logger.info("Routine load state: ${routineLoadState}")
+ logger.info("Routine load statistic: ${statistic}")
+ def rowCount = sql "select count(*) from ${tableName}"
+ if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) {
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_select_2 """
+ SELECT * FROM ${tableName} ORDER BY k1
+ """
+
+ } catch (Exception e) {
+ logger.error("Test failed with exception: ${e.message}")
+ } finally {
+ try {
+ sql "stop routine load for ${job}"
+ } catch (Exception e) {
+ logger.warn("Failed to stop routine load job: ${e.message}")
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_empty_field_as_null.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_empty_field_as_null.groovy
new file mode 100644
index 00000000000..b4046d16de4
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_empty_field_as_null.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_empty_field_as_null", "p0") {
+
+ def tableName = "test_stream_load_empty_field_as_null"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+
+ file "empty_field_as_null.csv"
+ }
+
+ sql "sync"
+ qt_select_1 """
+ SELECT * FROM ${tableName} ORDER BY k1
+ """
+ sql "truncate table ${tableName}"
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'empty_field_as_null', 'true'
+
+ file "empty_field_as_null.csv"
+ }
+
+ sql "sync"
+ qt_select_2 """
+ SELECT * FROM ${tableName} ORDER BY k1
+ """
+}
diff --git
a/regression-test/suites/load_p0/tvf/test_tvf_empty_field_as_null.groovy
b/regression-test/suites/load_p0/tvf/test_tvf_empty_field_as_null.groovy
new file mode 100644
index 00000000000..6abbc3969f4
--- /dev/null
+++ b/regression-test/suites/load_p0/tvf/test_tvf_empty_field_as_null.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_tvf_empty_field_as_null", "p0") {
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ def tableName = "test_tvf_empty_field_as_null"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int NULL,
+ `k2` varchar(50) NULL,
+ `v1` varchar(50) NULL,
+ `v2` varchar(50) NULL,
+ `v3` varchar(50) NULL,
+ `v4` varchar(50) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ qt_select """ select * from S3 (
+ "uri" =
"http://${bucket}.${s3_endpoint}/regression/load/data/empty_field_as_null.csv",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "empty_field_as_null" = "true",
+ "column_separator" = ",",
+ "region" = "${region}"
+ );
+ """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]