This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0f6354f9d0fe759f25ef91a61e64093b261c54a3
Author: Tiewei Fang <[email protected]>
AuthorDate: Tue Jul 18 00:06:38 2023 +0800

    [fix](Export) Fixed the bug that would be core when exporting large amounts 
of data (#21761)
    
    A heap-buffer-overflow error occurs when exporting large amounts of data to 
orc format.
    Reserve 50B for buffer to avoid this problem.
---
 be/src/vec/runtime/vorc_writer.cpp                 |  22 ++--
 be/src/vec/runtime/vorc_writer.h                   |  11 +-
 .../suites/export_p2/test_export_big_data.groovy   | 124 +++++++++++++++++++++
 3 files changed, 144 insertions(+), 13 deletions(-)

diff --git a/be/src/vec/runtime/vorc_writer.cpp 
b/be/src/vec/runtime/vorc_writer.cpp
index 47fc9242f8..293c11b874 100644
--- a/be/src/vec/runtime/vorc_writer.cpp
+++ b/be/src/vec/runtime/vorc_writer.cpp
@@ -167,7 +167,7 @@ void VOrcWriterWrapper::close() {
 
 #define WRITE_LARGEINT_STRING_INTO_BATCH(VECTOR_BATCH, COLUMN)                 
                  \
     VECTOR_BATCH* cur_batch = dynamic_cast<VECTOR_BATCH*>(root->fields[i]);    
                  \
-    size_t begin_off = offset;                                                 
                  \
+    const size_t begin_off = offset;                                           
                  \
     if (null_map != nullptr) {                                                 
                  \
         cur_batch->hasNulls = true;                                            
                  \
         auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();                 \
@@ -179,7 +179,7 @@ void VOrcWriterWrapper::close() {
                 auto value = assert_cast<const 
COLUMN&>(*col).get_data()[row_id];                \
                 std::string value_str = fmt::format("{}", value);              
                  \
                 size_t len = value_str.size();                                 
                  \
-                while (buffer.size < offset + len) {                           
                  \
+                while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {    
                  \
                     char* new_ptr = (char*)malloc(buffer.size + 
BUFFER_UNIT_SIZE);               \
                     memcpy(new_ptr, buffer.data, buffer.size);                 
                  \
                     free(const_cast<char*>(buffer.data));                      
                  \
@@ -205,7 +205,7 @@ void VOrcWriterWrapper::close() {
             auto value = not_null_column->get_data()[row_id];                  
                  \
             std::string value_str = fmt::format("{}", value);                  
                  \
             size_t len = value_str.size();                                     
                  \
-            while (buffer.size < offset + len) {                               
                  \
+            while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {        
                  \
                 char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); 
                  \
                 memcpy(new_ptr, buffer.data, buffer.size);                     
                  \
                 free(const_cast<char*>(buffer.data));                          
                  \
@@ -227,7 +227,7 @@ void VOrcWriterWrapper::close() {
 
 #define WRITE_DATE_STRING_INTO_BATCH(FROM, TO)                                 
                    \
     orc::StringVectorBatch* cur_batch = 
dynamic_cast<orc::StringVectorBatch*>(root->fields[i]);    \
-    size_t begin_off = offset;                                                 
                    \
+    const size_t begin_off = offset;                                           
                    \
     if (null_map != nullptr) {                                                 
                    \
         cur_batch->hasNulls = true;                                            
                    \
         auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();                   \
@@ -239,7 +239,7 @@ void VOrcWriterWrapper::close() {
                 int len = binary_cast<FROM, TO>(                               
                    \
                                   assert_cast<const 
ColumnVector<FROM>&>(*col).get_data()[row_id]) \
                                   .to_buffer(const_cast<char*>(buffer.data) + 
offset);             \
-                while (buffer.size < offset + len) {                           
                    \
+                while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {    
                    \
                     char* new_ptr = (char*)malloc(buffer.size + 
BUFFER_UNIT_SIZE);                 \
                     memcpy(new_ptr, buffer.data, buffer.size);                 
                    \
                     free(const_cast<char*>(buffer.data));                      
                    \
@@ -264,7 +264,7 @@ void VOrcWriterWrapper::close() {
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                    \
             int len = binary_cast<FROM, 
TO>(not_null_column->get_data()[row_id])                   \
                               .to_buffer(const_cast<char*>(buffer.data) + 
offset);                 \
-            while (buffer.size < offset + len) {                               
                    \
+            while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {        
                    \
                 char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); 
                    \
                 memcpy(new_ptr, buffer.data, buffer.size);                     
                    \
                 free(const_cast<char*>(buffer.data));                          
                    \
@@ -285,7 +285,7 @@ void VOrcWriterWrapper::close() {
 
 #define WRITE_DATETIMEV2_STRING_INTO_BATCH(FROM, TO)                           
                    \
     orc::StringVectorBatch* cur_batch = 
dynamic_cast<orc::StringVectorBatch*>(root->fields[i]);    \
-    size_t begin_off = offset;                                                 
                    \
+    const size_t begin_off = offset;                                           
                    \
     if (null_map != nullptr) {                                                 
                    \
         cur_batch->hasNulls = true;                                            
                    \
         auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();                   \
@@ -299,7 +299,7 @@ void VOrcWriterWrapper::close() {
                         binary_cast<FROM, TO>(                                 
                    \
                                 assert_cast<const 
ColumnVector<FROM>&>(*col).get_data()[row_id])   \
                                 .to_buffer(const_cast<char*>(buffer.data) + 
offset, output_scale); \
-                while (buffer.size < offset + len) {                           
                    \
+                while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {    
                    \
                     char* new_ptr = (char*)malloc(buffer.size + 
BUFFER_UNIT_SIZE);                 \
                     memcpy(new_ptr, buffer.data, buffer.size);                 
                    \
                     free(const_cast<char*>(buffer.data));                      
                    \
@@ -325,7 +325,7 @@ void VOrcWriterWrapper::close() {
             int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;    
                    \
             int len = binary_cast<FROM, 
TO>(not_null_column->get_data()[row_id])                   \
                               .to_buffer(const_cast<char*>(buffer.data) + 
offset, output_scale);   \
-            while (buffer.size < offset + len) {                               
                    \
+            while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {        
                    \
                 char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); 
                    \
                 memcpy(new_ptr, buffer.data, buffer.size);                     
                    \
                 free(const_cast<char*>(buffer.data));                          
                    \
@@ -397,7 +397,7 @@ Status VOrcWriterWrapper::write(const Block& block) {
         return Status::OK();
     }
 
-    // Buffer used by date type
+    // Buffer used by date/datetime/datev2/datetimev2/largeint type
     char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
     StringRef buffer(ptr, BUFFER_UNIT_SIZE);
     size_t offset = 0;
@@ -584,7 +584,7 @@ Status VOrcWriterWrapper::write(const Block& block) {
             }
         }
     } catch (const std::exception& e) {
-        LOG(WARNING) << "Parquet write error: " << e.what();
+        LOG(WARNING) << "Orc write error: " << e.what();
         return Status::InternalError(e.what());
     }
     root->numElements = sz;
diff --git a/be/src/vec/runtime/vorc_writer.h b/be/src/vec/runtime/vorc_writer.h
index 3a7b6c205f..ba508ea7a1 100644
--- a/be/src/vec/runtime/vorc_writer.h
+++ b/be/src/vec/runtime/vorc_writer.h
@@ -98,7 +98,14 @@ private:
     std::unique_ptr<orc::Type> _schema;
     std::unique_ptr<orc::Writer> _writer;
 
-    static constexpr size_t BUFFER_UNIT_SIZE = 4096;
+    // Buffer used by date/datetime/datev2/datetimev2/largeint type
+    // date/datetime/datev2/datetimev2/largeint type will be converted to 
string bytes to store in Buffer
+    // The minimum value of largeint has 40 bytes after being converted to 
string(a negative number occupies a byte)
+    // The bytes of date/datetime/datev2/datetimev2 after converted to string 
are smaller than largeint
+    // Because a block is 4064 rows by default, here is 4064*40 bytes to 
BUFFER,
+    static constexpr size_t BUFFER_UNIT_SIZE = 4064 * 40;
+    // buffer reserves 40 bytes. The reserved space is just to prevent 
Headp-Buffer-Overflow
+    static constexpr size_t BUFFER_RESERVED_SIZE = 40;
 };
 
-} // namespace doris::vectorized
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/regression-test/suites/export_p2/test_export_big_data.groovy 
b/regression-test/suites/export_p2/test_export_big_data.groovy
new file mode 100644
index 0000000000..5c09b83010
--- /dev/null
+++ b/regression-test/suites/export_p2/test_export_big_data.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_export_big_data", "p2") {
+    // check whether the FE config 'enable_outfile_to_local' is true
+    StringBuilder strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe")
+
+    String command = strBuilder.toString()
+    def process = command.toString().execute()
+    def code = process.waitFor()
+    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+    def out = process.getText()
+    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    assertEquals(code, 0)
+    def response = parseJson(out.trim())
+    assertEquals(response.code, 0)
+    assertEquals(response.msg, "success")
+    def configJson = response.data.rows
+    boolean enableOutfileToLocal = false
+    for (Object conf: configJson) {
+        assert conf instanceof Map
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"enable_outfile_to_local") {
+            enableOutfileToLocal = ((Map<String, String>) 
conf).get("Value").toLowerCase() == "true"
+        }
+    }
+    if (!enableOutfileToLocal) {
+        logger.warn("Please set enable_outfile_to_local to true to run 
test_outfile")
+        return
+    }
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+    def check_path_exists = { dir_path ->
+        File path = new File(dir_path)
+        if (!path.exists()) {
+            assert path.mkdirs()
+        } else {
+            throw new IllegalStateException("""${dir_path} already exists! """)
+        }
+    }
+
+    def table_export_name = "test_export_big_data"
+    // create table and insert
+    sql """ DROP TABLE IF EXISTS ${table_export_name} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${table_export_name} (
+        `id` int(11) NULL,
+        `name` string NULL,
+        `age` largeint(11) NULL,
+        `dt` date NULL,
+        `dt2` datev2 NULL,
+        `dtime` datetime NULL,
+        `dtime2` datetimev2 NULL
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+    """
+
+    sql """ INSERT INTO ${table_export_name} select * from s3(
+            "uri" = 
"https://${bucket}.${s3_endpoint}/regression/export_p2/export_orc/test_export_big_data_dataset.csv";,
+            "s3.access_key"= "${ak}",
+            "s3.secret_key" = "${sk}",
+            "format" = "csv");
+        """
+
+    def outfile_path_prefix = 
"""/mnt/datadisk1/fangtiewei/tmpdata/test_export"""
+    def uuid = UUID.randomUUID().toString()
+    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+
+
+    // check export path
+    check_path_exists.call("${outFilePath}")
+
+    // exec export
+    sql """
+        EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
+        PROPERTIES(
+            "label" = "${uuid}",
+            "format" = "orc",
+            "column_separator"=","
+        );
+    """
+
+    while (true) {
+        def res = sql """ show export where label = "${uuid}" """
+        logger.info("export state: " + res[0][2])
+        if (res[0][2] == "FINISHED") {
+            def json = parseJson(res[0][11])
+            assert json instanceof List
+            assertEquals("1", json.fileNumber[0])
+            log.info("outfile_path: ${json.url[0]}")
+            return json.url[0];
+        } else if (res[0][2] == "CANCELLED") {
+            throw new IllegalStateException("""export failed: ${res[0][10]}""")
+        } else {
+            sleep(5000)
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to