This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 99c3b5f041a [fix](outfile) Fix unable to export empty data (#30703)
99c3b5f041a is described below

commit 99c3b5f041a2b6ce828a3e279c4798ae261a7f7e
Author: zxealous <[email protected]>
AuthorDate: Mon Feb 19 16:55:54 2024 +0800

    [fix](outfile) Fix unable to export empty data (#30703)
    
    Issue Number: close #30600
    Fix unable to export empty data to hdfs / S3, this behavior is inconsistent 
with version 1.2.7,
    version 1.2.7 can export empty data to hdfs/ S3, and there will be exported 
files on S3/HDFS.
---
 be/src/io/fs/broker_file_writer.cpp                |   8 +
 be/src/io/fs/broker_file_writer.h                  |   1 +
 be/src/io/fs/file_writer.h                         |   3 +
 be/src/io/fs/hdfs_file_writer.cpp                  |   8 +
 be/src/io/fs/hdfs_file_writer.h                    |   1 +
 be/src/io/fs/s3_file_writer.cpp                    |  39 ++++-
 be/src/vec/sink/writer/vfile_result_writer.cpp     |   1 +
 .../outfile/csv/test_outfile_empty_data.out        |   9 ++
 .../outfile/csv/test_outfile_empty_data.groovy     | 166 +++++++++++++++++++++
 9 files changed, 230 insertions(+), 6 deletions(-)

diff --git a/be/src/io/fs/broker_file_writer.cpp 
b/be/src/io/fs/broker_file_writer.cpp
index d5b2baa7a66..0d305bf269b 100644
--- a/be/src/io/fs/broker_file_writer.cpp
+++ b/be/src/io/fs/broker_file_writer.cpp
@@ -153,6 +153,14 @@ Status BrokerFileWriter::finalize() {
     return Status::OK();
 }
 
+Status BrokerFileWriter::open() {
+    if (!_opened) {
+        RETURN_IF_ERROR(_open());
+        _opened = true;
+    }
+    return Status::OK();
+}
+
 Status BrokerFileWriter::_open() {
     TBrokerOpenWriterRequest request;
 
diff --git a/be/src/io/fs/broker_file_writer.h 
b/be/src/io/fs/broker_file_writer.h
index cf5b8013acb..f132545f0a8 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -45,6 +45,7 @@ public:
                      int64_t start_offset, FileSystemSPtr fs);
     virtual ~BrokerFileWriter();
 
+    Status open() override;
     Status close() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
     Status finalize() override;
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 58c9c9ff060..256c67a9838 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -46,6 +46,9 @@ public:
 
     DISALLOW_COPY_AND_ASSIGN(FileWriter);
 
+    // Open the file for writing.
+    virtual Status open() { return Status::OK(); }
+
     // Normal close. Wait for all data to persist before returning.
     virtual Status close() = 0;
 
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index 00081db310f..1f262e1abcd 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -103,6 +103,14 @@ Status HdfsFileWriter::finalize() {
     return Status::OK();
 }
 
+Status HdfsFileWriter::open() {
+    if (!_opened) {
+        RETURN_IF_ERROR(_open());
+        _opened = true;
+    }
+    return Status::OK();
+}
+
 Status HdfsFileWriter::_open() {
     _path = convert_path(_path, _hdfs_fs->_fs_name);
     std::string hdfs_dir = _path.parent_path().string();
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index c05f7625124..bffd0efdca9 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -36,6 +36,7 @@ public:
     HdfsFileWriter(Path file, FileSystemSPtr fs);
     ~HdfsFileWriter();
 
+    Status open() override;
     Status close() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
     Status finalize() override;
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 0ec3a46f808..7711529b6f5 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -198,17 +198,44 @@ Status S3FileWriter::close() {
         return _st;
     }
     VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
-    // it might be one file less than 5MB, we do upload here
-    if (_pending_buf != nullptr) {
-        if (_upload_id.empty()) {
+
+    if (_upload_id.empty()) {
+        if (_pending_buf != nullptr) {
+            // it might be one file less than 5MB, we do upload here
             auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
             DCHECK(buf != nullptr);
             buf->set_upload_to_remote([this](UploadFileBuffer& b) { 
_put_object(b); });
+        } else {
+            // if there is no pending buffer, we need to create an empty file
+            auto builder = FileBufferBuilder();
+            builder.set_type(BufferType::UPLOAD)
+                    .set_upload_callback([this](UploadFileBuffer& buf) { 
_put_object(buf); })
+                    .set_sync_after_complete_task([this](Status s) {
+                        bool ret = false;
+                        if (!s.ok()) [[unlikely]] {
+                            VLOG_NOTICE << "failed at key: " << _key
+                                        << ", status: " << s.to_string();
+                            std::unique_lock<std::mutex> _lck 
{_completed_lock};
+                            _failed = true;
+                            ret = true;
+                            this->_st = std::move(s);
+                        }
+                        // After the signal, there is a scenario where the 
previous invocation of _wait_until_finish
+                        // returns to the caller, and subsequently, the S3 
file writer is destructed.
+                        // This means that accessing _failed afterwards would 
result in a heap use after free vulnerability.
+                        _countdown_event.signal();
+                        return ret;
+                    })
+                    .set_is_cancelled([this]() { return _failed.load(); });
+            RETURN_IF_ERROR(builder.build(&_pending_buf));
+            auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+            DCHECK(buf != nullptr);
         }
-        _countdown_event.add_count();
-        RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
-        _pending_buf = nullptr;
     }
+    _countdown_event.add_count();
+    RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
+    _pending_buf = nullptr;
+
     DBUG_EXECUTE_IF("s3_file_writer::close", {
         RETURN_IF_ERROR(_complete());
         return Status::InternalError("failed to close s3 file writer");
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index bf3d09cdda6..d3228d33680 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -141,6 +141,7 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
             FileFactory::convert_storage_type(_storage_type), 
_state->exec_env(),
             _file_opts->broker_addresses, _file_opts->broker_properties, 
file_name, 0,
             _file_writer_impl));
+    RETURN_IF_ERROR(_file_writer_impl->open());
     switch (_file_opts->file_format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
         _vfile_writer.reset(new VCSVTransformer(_state, 
_file_writer_impl.get(),
diff --git 
a/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out 
b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
new file mode 100644
index 00000000000..260c177d310
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/csv/test_outfile_empty_data.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_base1 --
+
+-- !select_tvf1 --
+
+-- !select_tvf2 --
+
+-- !select_tvf3 --
+
diff --git 
a/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy 
b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
new file mode 100644
index 00000000000..1804fff2a11
--- /dev/null
+++ 
b/regression-test/suites/export_p0/outfile/csv/test_outfile_empty_data.groovy
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_outfile_empty_data", "external,hive,tvf,external_docker") {
+
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("diable Hive test.")
+        return;
+    }
+
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+    // use to outfile to hdfs
+    String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    // It's okay to use random `hdfsUser`, but can not be empty.
+    def hdfsUserName = "doris"
+    def format = "csv"
+    def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
+
+    // use to outfile to s3
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    // broker
+    String broker_name = "hdfs"
+
+    def export_table_name = "outfile_empty_data_test"
+
+    def create_table = {table_name, column_define ->
+        sql """ DROP TABLE IF EXISTS ${table_name} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${table_name} (
+            ${column_define}
+            )
+            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+        """
+    }
+
+    def outfile_to_HDFS_directly = {
+        // select ... into outfile ...
+        def uuid = UUID.randomUUID().toString()
+
+        hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
+        uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"
+
+        def res = sql """
+            SELECT * FROM ${export_table_name} t ORDER BY user_id
+            INTO OUTFILE "${uri}"
+            FORMAT AS ${format}
+            PROPERTIES (
+                "fs.defaultFS"="${defaultFS}",
+                "hadoop.username" = "${hdfsUserName}"
+            );
+        """
+        logger.info("outfile to hdfs direct success path: " + res[0][3]);
+        return res[0][3]
+    }
+
+    def outfile_to_HDFS_with_broker = {
+        // select ... into outfile ...
+        def uuid = UUID.randomUUID().toString()
+
+        hdfs_outfile_path = "/user/doris/tmp_data/${uuid}"
+        uri = "${defaultFS}" + "${hdfs_outfile_path}/exp_"
+
+        def res = sql """
+            SELECT * FROM ${export_table_name} t ORDER BY user_id
+            INTO OUTFILE "${uri}"
+            FORMAT AS ${format}
+            PROPERTIES (
+                "broker.fs.defaultFS"="${defaultFS}",
+                "broker.name"="hdfs",
+                "broker.username" = "${hdfsUserName}"
+            );
+        """
+        logger.info("outfile to hdfs with broker success path: " + res[0][3]);
+        return res[0][3]
+    }
+
+    def outfile_to_S3_directly = {
+        // select ... into outfile ...
+        s3_outfile_path = "${bucket}/outfile/csv/test-outfile-empty/"
+        uri = "s3://${s3_outfile_path}/exp_"
+
+        def res = sql """
+            SELECT * FROM ${export_table_name} t ORDER BY user_id
+            INTO OUTFILE "${uri}"
+            FORMAT AS csv
+            PROPERTIES (
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}"
+            );
+        """
+        logger.info("outfile to s3 success path: " + res[0][3]);
+        return res[0][3]
+    }
+
+    try {
+        def doris_column_define = """
+                                    `user_id` INT NOT NULL COMMENT "用户id",
+                                    `name` STRING NULL,
+                                    `age` INT NULL"""
+        // create table
+        create_table(export_table_name, doris_column_define);
+        // test outfile empty data to hdfs directly
+        def outfile_to_hdfs_directly_url = outfile_to_HDFS_directly()
+        // test outfile empty data to hdfs with broker
+        def outfile_to_hdfs_with_broker_url= outfile_to_HDFS_with_broker()
+        // test outfile empty data to s3 directly
+        def outfile_to_s3_directly_url = outfile_to_S3_directly()
+        qt_select_base1 """ SELECT * FROM ${export_table_name} ORDER BY 
user_id; """ 
+
+        qt_select_tvf1 """ select * from HDFS(
+                    "uri" = "${outfile_to_hdfs_directly_url}0.csv",
+                    "hadoop.username" = "${hdfsUserName}",
+                    "format" = "${format}");
+                    """
+
+        qt_select_tvf2 """ select * from HDFS(
+                    "uri" = "${outfile_to_hdfs_with_broker_url}0.csv",
+                    "hadoop.username" = "${hdfsUserName}",
+                    "format" = "${format}");
+                    """
+        
+        qt_select_tvf3 """ SELECT * FROM S3 (
+                "uri" = 
"http://${s3_endpoint}${outfile_to_s3_directly_url.substring(4, 
outfile_to_s3_directly_url.length())}0.csv",
+                "ACCESS_KEY"= "${ak}",
+                "SECRET_KEY" = "${sk}",
+                "format" = "${format}",
+                "region" = "${region}",
+                "use_path_style" = "true"
+            );
+            """
+
+    } finally {
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to