This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 99c3b5f041a [fix](outfile) Fix unable to export empty data (#30703)
99c3b5f041a is described below
commit 99c3b5f041a2b6ce828a3e279c4798ae261a7f7e
Author: zxealous <[email protected]>
AuthorDate: Mon Feb 19 16:55:54 2024 +0800
[fix](outfile) Fix unable to export empty data (#30703)
Issue Number: close #30600
Fix unable to export empty data to hdfs / S3, this behavior is inconsistent
with version 1.2.7,
version 1.2.7 can export empty data to hdfs/ S3, and there will be exported
files on S3/HDFS.
---
be/src/io/fs/broker_file_writer.cpp | 8 +
be/src/io/fs/broker_file_writer.h | 1 +
be/src/io/fs/file_writer.h | 3 +
be/src/io/fs/hdfs_file_writer.cpp | 8 +
be/src/io/fs/hdfs_file_writer.h | 1 +
be/src/io/fs/s3_file_writer.cpp | 39 ++++-
be/src/vec/sink/writer/vfile_result_writer.cpp | 1 +
.../outfile/csv/test_outfile_empty_data.out | 9 ++
.../outfile/csv/test_outfile_empty_data.groovy | 166 +++++++++++++++++++++
9 files changed, 230 insertions(+), 6 deletions(-)
diff --git a/be/src/io/fs/broker_file_writer.cpp
b/be/src/io/fs/broker_file_writer.cpp
index d5b2baa7a66..0d305bf269b 100644
--- a/be/src/io/fs/broker_file_writer.cpp
+++ b/be/src/io/fs/broker_file_writer.cpp
@@ -153,6 +153,14 @@ Status BrokerFileWriter::finalize() {
return Status::OK();
}
+Status BrokerFileWriter::open() {
+ if (!_opened) {
+ RETURN_IF_ERROR(_open());
+ _opened = true;
+ }
+ return Status::OK();
+}
+
Status BrokerFileWriter::_open() {
TBrokerOpenWriterRequest request;
diff --git a/be/src/io/fs/broker_file_writer.h
b/be/src/io/fs/broker_file_writer.h
index cf5b8013acb..f132545f0a8 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -45,6 +45,7 @@ public:
int64_t start_offset, FileSystemSPtr fs);
virtual ~BrokerFileWriter();
+ Status open() override;
Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 58c9c9ff060..256c67a9838 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -46,6 +46,9 @@ public:
DISALLOW_COPY_AND_ASSIGN(FileWriter);
+ // Open the file for writing.
+ virtual Status open() { return Status::OK(); }
+
// Normal close. Wait for all data to persist before returning.
virtual Status close() = 0;
diff --git a/be/src/io/fs/hdfs_file_writer.cpp
b/be/src/io/fs/hdfs_file_writer.cpp
index 00081db310f..1f262e1abcd 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -103,6 +103,14 @@ Status HdfsFileWriter::finalize() {
return Status::OK();
}
+Status HdfsFileWriter::open() {
+ if (!_opened) {
+ RETURN_IF_ERROR(_open());
+ _opened = true;
+ }
+ return Status::OK();
+}
+
Status HdfsFileWriter::_open() {
_path = convert_path(_path, _hdfs_fs->_fs_name);
std::string hdfs_dir = _path.parent_path().string();
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index c05f7625124..bffd0efdca9 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -36,6 +36,7 @@ public:
HdfsFileWriter(Path file, FileSystemSPtr fs);
~HdfsFileWriter();
+ Status open() override;
Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 0ec3a46f808..7711529b6f5 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -198,17 +198,44 @@ Status S3FileWriter::close() {
return _st;
}
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
- // it might be one file less than 5MB, we do upload here
- if (_pending_buf != nullptr) {
- if (_upload_id.empty()) {
+
+ if (_upload_id.empty()) {
+ if (_pending_buf != nullptr) {
+ // it might be one file less than 5MB, we do upload here
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) {
_put_object(b); });
+ } else {
+ // if there is no pending buffer, we need to create an empty file
+ auto builder = FileBufferBuilder();
+ builder.set_type(BufferType::UPLOAD)
+ .set_upload_callback([this](UploadFileBuffer& buf) {
_put_object(buf); })
+ .set_sync_after_complete_task([this](Status s) {
+ bool ret = false;
+ if (!s.ok()) [[unlikely]] {
+ VLOG_NOTICE << "failed at key: " << _key
+ << ", status: " << s.to_string();
+ std::unique_lock<std::mutex> _lck
{_completed_lock};
+ _failed = true;
+ ret = true;
+ this->_st = std::move(s);
+ }
+ // After the signal, there is a scenario where the
previous invocation of _wait_until_finish
+ // returns to the caller, and subsequently, the S3
file writer is destructed.
+ // This means that accessing _failed afterwards would
result in a heap use after free vulnerability.
+ _countdown_event.signal();
+ return ret;
+ })
+ .set_is_cancelled([this]() { return _failed.load(); });
+ RETURN_IF_ERROR(builder.build(&_pending_buf));
+ auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+ DCHECK(buf != nullptr);
}
- _countdown_event.add_count();
- RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
- _pending_buf = nullptr;
}
+ _countdown_event.add_count();
+ RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
+ _pending_buf = nullptr;
+
DBUG_EXECUTE_IF("s3_file_writer::close", {
RETURN_IF_ERROR(_complete());
return Status::InternalError("failed to close s3 file writer");
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index bf3d09cdda6..d3228d33680 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -141,6 +141,7 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
FileFactory::convert_storage_type(_storage_type),
_state->exec_env(),
_file_opts->broker_addresses, _file_opts->broker_properties,
file_name, 0,
_file_writer_impl));
+ RETURN_IF_ERROR(_file_writer_impl->open());
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
_vfile_writer.reset(new VCSVTransformer(_state,
_file_writer_impl.get(),
diff --git
a/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
new file mode 100644
index 00000000000..260c177d310
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_base1 --
+
+-- !select_tvf1 --
+
+-- !select_tvf2 --
+
+-- !select_tvf3 --
+
diff --git
a/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
new file mode 100644
index 00000000000..1804fff2a11
--- /dev/null
+++
b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_outfile_empty_data", "external,hive,tvf,external_docker") {
+
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("diable Hive test.")
+ return;
+ }
+
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ // use to outfile to hdfs
+ String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ // It's okay to use random `hdfsUser`, but can not be empty.
+ def hdfsUserName = "doris"
+ def format = "csv"
+ def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
+
+ // use to outfile to s3
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ // broker
+ String broker_name = "hdfs"
+
+ def export_table_name = "outfile_empty_data_test"
+
+ def create_table = {table_name, column_define ->
+ sql """ DROP TABLE IF EXISTS ${table_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ ${column_define}
+ )
+ DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+ """
+ }
+
+ def outfile_to_HDFS_directly = {
+ // select ... into outfile ...
+ def uuid = UUID.randomUUID().toString()
+
+ hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
+ uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"
+
+ def res = sql """
+ SELECT * FROM ${export_table_name} t ORDER BY user_id
+ INTO OUTFILE "${uri}"
+ FORMAT AS ${format}
+ PROPERTIES (
+ "fs.defaultFS"="${defaultFS}",
+ "hadoop.username" = "${hdfsUserName}"
+ );
+ """
+ logger.info("outfile to hdfs direct success path: " + res[0][3]);
+ return res[0][3]
+ }
+
+ def outfile_to_HDFS_with_broker = {
+ // select ... into outfile ...
+ def uuid = UUID.randomUUID().toString()
+
+ hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
+ uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"
+
+ def res = sql """
+ SELECT * FROM ${export_table_name} t ORDER BY user_id
+ INTO OUTFILE "${uri}"
+ FORMAT AS ${format}
+ PROPERTIES (
+ "broker.fs.defaultFS"="${defaultFS}",
+ "broker.name"="hdfs",
+ "broker.username" = "${hdfsUserName}"
+ );
+ """
+ logger.info("outfile to hdfs with broker success path: " + res[0][3]);
+ return res[0][3]
+ }
+
+ def outfile_to_S3_directly = {
+ // select ... into outfile ...
+ s3_outfile_path = "${bucket}/outfile/csv/test-outfile-empty/"
+ uri = "s3://${s3_outfile_path}/exp_"
+
+ def res = sql """
+ SELECT * FROM ${export_table_name} t ORDER BY user_id
+ INTO OUTFILE "${uri}"
+ FORMAT AS csv
+ PROPERTIES (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}"
+ );
+ """
+ logger.info("outfile to s3 success path: " + res[0][3]);
+ return res[0][3]
+ }
+
+ try {
+ def doris_column_define = """
+ `user_id` INT NOT NULL COMMENT "用户id",
+ `name` STRING NULL,
+ `age` INT NULL"""
+ // create table
+ create_table(export_table_name, doris_column_define);
+ // test outfile empty data to hdfs directly
+ def outfile_to_hdfs_directly_url = outfile_to_HDFS_directly()
+ // test outfile empty data to hdfs with broker
+ def outfile_to_hdfs_with_broker_url= outfile_to_HDFS_with_broker()
+ // test outfile empty data to s3 directly
+ def outfile_to_s3_directly_url = outfile_to_S3_directly()
+ qt_select_base1 """ SELECT * FROM ${export_table_name} ORDER BY
user_id; """
+
+ qt_select_tvf1 """ select * from HDFS(
+ "uri" = "${outfile_to_hdfs_directly_url}0.csv",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}");
+ """
+
+ qt_select_tvf2 """ select * from HDFS(
+ "uri" = "${outfile_to_hdfs_with_broker_url}0.csv",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}");
+ """
+
+ qt_select_tvf3 """ SELECT * FROM S3 (
+ "uri" =
"http://${s3_endpoint}${outfile_to_s3_directly_url.substring(4,
outfile_to_s3_directly_url.length())}0.csv",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "${format}",
+ "region" = "${region}",
+ "use_path_style" = "true"
+ );
+ """
+
+ } finally {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]