This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f4044eb9652366a35fa75d847b552c2bdf518e3b
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed Aug 30 19:01:44 2023 +0800

    [fix](Outfile) fix core dump when export data to orc file format using 
`outfile` (#23586)
    
    * fix
    
    * add test
---
 be/src/vec/runtime/vorc_writer.cpp                 | 141 ++++++++++++---------
 .../suites/export_p2/test_export_big_data.groovy   |  82 ++++++++----
 2 files changed, 136 insertions(+), 87 deletions(-)

diff --git a/be/src/vec/runtime/vorc_writer.cpp 
b/be/src/vec/runtime/vorc_writer.cpp
index 34f274cc06..df9615d668 100644
--- a/be/src/vec/runtime/vorc_writer.cpp
+++ b/be/src/vec/runtime/vorc_writer.cpp
@@ -171,7 +171,7 @@ Status VOrcWriterWrapper::close() {
         RETURN_WRONG_TYPE                                                      
                \
     }
 
-#define WRITE_LARGEINT_STRING_INTO_BATCH(VECTOR_BATCH, COLUMN)                 
                  \
+#define WRITE_LARGEINT_STRING_INTO_BATCH(VECTOR_BATCH, COLUMN, BUFFER)         
                  \
     VECTOR_BATCH* cur_batch = dynamic_cast<VECTOR_BATCH*>(root->fields[i]);    
                  \
     const size_t begin_off = offset;                                           
                  \
     if (null_map != nullptr) {                                                 
                  \
@@ -185,14 +185,14 @@ Status VOrcWriterWrapper::close() {
                 auto value = assert_cast<const 
COLUMN&>(*col).get_data()[row_id];                \
                 std::string value_str = fmt::format("{}", value);              
                  \
                 size_t len = value_str.size();                                 
                  \
-                while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {    
                  \
-                    char* new_ptr = (char*)malloc(buffer.size + 
BUFFER_UNIT_SIZE);               \
-                    memcpy(new_ptr, buffer.data, buffer.size);                 
                  \
-                    free(const_cast<char*>(buffer.data));                      
                  \
-                    buffer.data = new_ptr;                                     
                  \
-                    buffer.size = buffer.size + BUFFER_UNIT_SIZE;              
                  \
+                while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) {    
                  \
+                    char* new_ptr = (char*)malloc(BUFFER.size + 
BUFFER_UNIT_SIZE);               \
+                    memcpy(new_ptr, BUFFER.data, BUFFER.size);                 
                  \
+                    free(const_cast<char*>(BUFFER.data));                      
                  \
+                    BUFFER.data = new_ptr;                                     
                  \
+                    BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE;              
                  \
                 }                                                              
                  \
-                strcpy(const_cast<char*>(buffer.data) + offset, 
value_str.c_str());              \
+                strcpy(const_cast<char*>(BUFFER.data) + offset, 
value_str.c_str());              \
                 offset += len;                                                 
                  \
                 cur_batch->length[row_id] = len;                               
                  \
             }                                                                  
                  \
@@ -202,7 +202,7 @@ Status VOrcWriterWrapper::close() {
             if (null_data[row_id] != 0) {                                      
                  \
                 cur_batch->notNull[row_id] = 0;                                
                  \
             } else {                                                           
                  \
-                cur_batch->data[row_id] = const_cast<char*>(buffer.data) + 
begin_off + data_off; \
+                cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + 
begin_off + data_off; \
                 data_off += cur_batch->length[row_id];                         
                  \
             }                                                                  
                  \
         }                                                                      
                  \
@@ -211,27 +211,27 @@ Status VOrcWriterWrapper::close() {
             auto value = not_null_column->get_data()[row_id];                  
                  \
             std::string value_str = fmt::format("{}", value);                  
                  \
             size_t len = value_str.size();                                     
                  \
-            while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {        
                  \
-                char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); 
                  \
-                memcpy(new_ptr, buffer.data, buffer.size);                     
                  \
-                free(const_cast<char*>(buffer.data));                          
                  \
-                buffer.data = new_ptr;                                         
                  \
-                buffer.size = buffer.size + BUFFER_UNIT_SIZE;                  
                  \
+            while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) {        
                  \
+                char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); 
                  \
+                memcpy(new_ptr, BUFFER.data, BUFFER.size);                     
                  \
+                free(const_cast<char*>(BUFFER.data));                          
                  \
+                BUFFER.data = new_ptr;                                         
                  \
+                BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE;                  
                  \
             }                                                                  
                  \
-            strcpy(const_cast<char*>(buffer.data) + offset, 
value_str.c_str());                  \
+            strcpy(const_cast<char*>(BUFFER.data) + offset, 
value_str.c_str());                  \
             offset += len;                                                     
                  \
             cur_batch->length[row_id] = len;                                   
                  \
         }                                                                      
                  \
         size_t data_off = 0;                                                   
                  \
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
-            cur_batch->data[row_id] = const_cast<char*>(buffer.data) + 
begin_off + data_off;     \
+            cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + 
begin_off + data_off;     \
             data_off += cur_batch->length[row_id];                             
                  \
         }                                                                      
                  \
     } else {                                                                   
                  \
         RETURN_WRONG_TYPE                                                      
                  \
     }
 
-#define WRITE_DATE_STRING_INTO_BATCH(FROM, TO)                                 
                    \
+#define WRITE_DATE_STRING_INTO_BATCH(FROM, TO, BUFFER)                         
                    \
     orc::StringVectorBatch* cur_batch = 
dynamic_cast<orc::StringVectorBatch*>(root->fields[i]);    \
     const size_t begin_off = offset;                                           
                    \
     if (null_map != nullptr) {                                                 
                    \
@@ -244,13 +244,13 @@ Status VOrcWriterWrapper::close() {
                 cur_batch->notNull[row_id] = 1;                                
                    \
                 int len = binary_cast<FROM, TO>(                               
                    \
                                   assert_cast<const 
ColumnVector<FROM>&>(*col).get_data()[row_id]) \
-                                  .to_buffer(const_cast<char*>(buffer.data) + 
offset);             \
-                while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {    
                    \
-                    char* new_ptr = (char*)malloc(buffer.size + 
BUFFER_UNIT_SIZE);                 \
-                    memcpy(new_ptr, buffer.data, buffer.size);                 
                    \
-                    free(const_cast<char*>(buffer.data));                      
                    \
-                    buffer.data = new_ptr;                                     
                    \
-                    buffer.size = buffer.size + BUFFER_UNIT_SIZE;              
                    \
+                                  .to_buffer(const_cast<char*>(BUFFER.data) + 
offset);             \
+                while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) {    
                    \
+                    char* new_ptr = (char*)malloc(BUFFER.size + 
BUFFER_UNIT_SIZE);                 \
+                    memcpy(new_ptr, BUFFER.data, BUFFER.size);                 
                    \
+                    free(const_cast<char*>(BUFFER.data));                      
                    \
+                    BUFFER.data = new_ptr;                                     
                    \
+                    BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE;              
                    \
                 }                                                              
                    \
                 cur_batch->length[row_id] = len;                               
                    \
                 offset += len;                                                 
                    \
@@ -261,7 +261,7 @@ Status VOrcWriterWrapper::close() {
             if (null_data[row_id] != 0) {                                      
                    \
                 cur_batch->notNull[row_id] = 0;                                
                    \
             } else {                                                           
                    \
-                cur_batch->data[row_id] = const_cast<char*>(buffer.data) + 
begin_off + data_off;   \
+                cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + 
begin_off + data_off;   \
                 data_off += cur_batch->length[row_id];                         
                    \
             }                                                                  
                    \
         }                                                                      
                    \
@@ -269,27 +269,27 @@ Status VOrcWriterWrapper::close() {
                        check_and_get_column<const ColumnVector<FROM>>(col)) {  
                    \
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                    \
             int len = binary_cast<FROM, 
TO>(not_null_column->get_data()[row_id])                   \
-                              .to_buffer(const_cast<char*>(buffer.data) + 
offset);                 \
-            while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {        
                    \
-                char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); 
                    \
-                memcpy(new_ptr, buffer.data, buffer.size);                     
                    \
-                free(const_cast<char*>(buffer.data));                          
                    \
-                buffer.data = new_ptr;                                         
                    \
-                buffer.size = buffer.size + BUFFER_UNIT_SIZE;                  
                    \
+                              .to_buffer(const_cast<char*>(BUFFER.data) + 
offset);                 \
+            while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) {        
                    \
+                char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); 
                    \
+                memcpy(new_ptr, BUFFER.data, BUFFER.size);                     
                    \
+                free(const_cast<char*>(BUFFER.data));                          
                    \
+                BUFFER.data = new_ptr;                                         
                    \
+                BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE;                  
                    \
             }                                                                  
                    \
             cur_batch->length[row_id] = len;                                   
                    \
             offset += len;                                                     
                    \
         }                                                                      
                    \
         size_t data_off = 0;                                                   
                    \
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                    \
-            cur_batch->data[row_id] = const_cast<char*>(buffer.data) + 
begin_off + data_off;       \
+            cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + 
begin_off + data_off;       \
             data_off += cur_batch->length[row_id];                             
                    \
         }                                                                      
                    \
     } else {                                                                   
                    \
         RETURN_WRONG_TYPE                                                      
                    \
     }
 
-#define WRITE_DATETIMEV2_STRING_INTO_BATCH(FROM, TO)                           
                    \
+#define WRITE_DATETIMEV2_STRING_INTO_BATCH(FROM, TO, BUFFER)                   
                    \
     orc::StringVectorBatch* cur_batch = 
dynamic_cast<orc::StringVectorBatch*>(root->fields[i]);    \
     const size_t begin_off = offset;                                           
                    \
     if (null_map != nullptr) {                                                 
                    \
@@ -304,13 +304,13 @@ Status VOrcWriterWrapper::close() {
                 int len =                                                      
                    \
                         binary_cast<FROM, TO>(                                 
                    \
                                 assert_cast<const 
ColumnVector<FROM>&>(*col).get_data()[row_id])   \
-                                .to_buffer(const_cast<char*>(buffer.data) + 
offset, output_scale); \
-                while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {    
                    \
-                    char* new_ptr = (char*)malloc(buffer.size + 
BUFFER_UNIT_SIZE);                 \
-                    memcpy(new_ptr, buffer.data, buffer.size);                 
                    \
-                    free(const_cast<char*>(buffer.data));                      
                    \
-                    buffer.data = new_ptr;                                     
                    \
-                    buffer.size = buffer.size + BUFFER_UNIT_SIZE;              
                    \
+                                .to_buffer(const_cast<char*>(BUFFER.data) + 
offset, output_scale); \
+                while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) {    
                    \
+                    char* new_ptr = (char*)malloc(BUFFER.size + 
BUFFER_UNIT_SIZE);                 \
+                    memcpy(new_ptr, BUFFER.data, BUFFER.size);                 
                    \
+                    free(const_cast<char*>(BUFFER.data));                      
                    \
+                    BUFFER.data = new_ptr;                                     
                    \
+                    BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE;              
                    \
                 }                                                              
                    \
                 cur_batch->length[row_id] = len;                               
                    \
                 offset += len;                                                 
                    \
@@ -321,7 +321,7 @@ Status VOrcWriterWrapper::close() {
             if (null_data[row_id] != 0) {                                      
                    \
                 cur_batch->notNull[row_id] = 0;                                
                    \
             } else {                                                           
                    \
-                cur_batch->data[row_id] = const_cast<char*>(buffer.data) + 
begin_off + data_off;   \
+                cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + 
begin_off + data_off;   \
                 data_off += cur_batch->length[row_id];                         
                    \
             }                                                                  
                    \
         }                                                                      
                    \
@@ -330,20 +330,20 @@ Status VOrcWriterWrapper::close() {
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                    \
             int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;    
                    \
             int len = binary_cast<FROM, 
TO>(not_null_column->get_data()[row_id])                   \
-                              .to_buffer(const_cast<char*>(buffer.data) + 
offset, output_scale);   \
-            while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) {        
                    \
-                char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); 
                    \
-                memcpy(new_ptr, buffer.data, buffer.size);                     
                    \
-                free(const_cast<char*>(buffer.data));                          
                    \
-                buffer.data = new_ptr;                                         
                    \
-                buffer.size = buffer.size + BUFFER_UNIT_SIZE;                  
                    \
+                              .to_buffer(const_cast<char*>(BUFFER.data) + 
offset, output_scale);   \
+            while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) {        
                    \
+                char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); 
                    \
+                memcpy(new_ptr, BUFFER.data, BUFFER.size);                     
                    \
+                free(const_cast<char*>(BUFFER.data));                          
                    \
+                BUFFER.data = new_ptr;                                         
                    \
+                BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE;                  
                    \
             }                                                                  
                    \
             cur_batch->length[row_id] = len;                                   
                    \
             offset += len;                                                     
                    \
         }                                                                      
                    \
         size_t data_off = 0;                                                   
                    \
         for (size_t row_id = 0; row_id < sz; row_id++) {                       
                    \
-            cur_batch->data[row_id] = const_cast<char*>(buffer.data) + 
begin_off + data_off;       \
+            cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + 
begin_off + data_off;       \
             data_off += cur_batch->length[row_id];                             
                    \
         }                                                                      
                    \
     } else {                                                                   
                    \
@@ -404,9 +404,14 @@ Status VOrcWriterWrapper::write(const Block& block) {
     }
 
     // Buffer used by date/datetime/datev2/datetimev2/largeint type
-    char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
-    StringRef buffer(ptr, BUFFER_UNIT_SIZE);
-    size_t offset = 0;
+    std::vector<StringRef> bufferList(block.columns());
+    Defer defer {[&]() {
+        for (auto& bufferRef : bufferList) {
+            if (bufferRef.data) {
+                free(const_cast<char*>(bufferRef.data));
+            }
+        }
+    }};
 
     size_t sz = block.rows();
     auto row_batch = _create_row_batch(sz);
@@ -455,7 +460,12 @@ Status VOrcWriterWrapper::write(const Block& block) {
                 break;
             }
             case TYPE_LARGEINT: {
-                WRITE_LARGEINT_STRING_INTO_BATCH(orc::StringVectorBatch, 
ColumnVector<Int128>)
+                char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
+                bufferList[i].data = ptr;
+                bufferList[i].size = BUFFER_UNIT_SIZE;
+                size_t offset = 0;
+                WRITE_LARGEINT_STRING_INTO_BATCH(orc::StringVectorBatch, 
ColumnVector<Int128>,
+                                                 bufferList[i])
                 SET_NUM_ELEMENTS;
                 break;
             }
@@ -472,17 +482,30 @@ Status VOrcWriterWrapper::write(const Block& block) {
             }
             case TYPE_DATETIME:
             case TYPE_DATE: {
-                WRITE_DATE_STRING_INTO_BATCH(Int64, VecDateTimeValue)
+                char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
+                bufferList[i].data = ptr;
+                bufferList[i].size = BUFFER_UNIT_SIZE;
+                size_t offset = 0;
+                WRITE_DATE_STRING_INTO_BATCH(Int64, VecDateTimeValue, 
bufferList[i])
                 SET_NUM_ELEMENTS
                 break;
             }
             case TYPE_DATEV2: {
-                WRITE_DATE_STRING_INTO_BATCH(UInt32, 
DateV2Value<DateV2ValueType>)
+                char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
+                bufferList[i].data = ptr;
+                bufferList[i].size = BUFFER_UNIT_SIZE;
+                size_t offset = 0;
+                WRITE_DATE_STRING_INTO_BATCH(UInt32, 
DateV2Value<DateV2ValueType>, bufferList[i])
                 SET_NUM_ELEMENTS
                 break;
             }
             case TYPE_DATETIMEV2: {
-                WRITE_DATETIMEV2_STRING_INTO_BATCH(UInt64, 
DateV2Value<DateTimeV2ValueType>)
+                char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
+                bufferList[i].data = ptr;
+                bufferList[i].size = BUFFER_UNIT_SIZE;
+                size_t offset = 0;
+                WRITE_DATETIMEV2_STRING_INTO_BATCH(UInt64, 
DateV2Value<DateTimeV2ValueType>,
+                                                   bufferList[i])
                 SET_NUM_ELEMENTS
                 break;
             }
@@ -594,11 +617,9 @@ Status VOrcWriterWrapper::write(const Block& block) {
         return Status::InternalError(e.what());
     }
     root->numElements = sz;
-
     _writer->add(*row_batch);
     _cur_written_rows += sz;
 
-    free(const_cast<char*>(buffer.data));
     return Status::OK();
 }
 
diff --git a/regression-test/suites/export_p2/test_export_big_data.groovy 
b/regression-test/suites/export_p2/test_export_big_data.groovy
index 0547386851..a6841c9ab6 100644
--- a/regression-test/suites/export_p2/test_export_big_data.groovy
+++ b/regression-test/suites/export_p2/test_export_big_data.groovy
@@ -56,27 +56,51 @@ suite("test_export_big_data", "p2") {
     String region = getS3Region()
     String bucket = context.config.otherConfigs.get("s3BucketName");
     
+
+    def delete_files = { dir_path ->
+        File path = new File(dir_path)
+        if (path.exists()) {
+            for (File f: path.listFiles()) {
+                f.delete();
+            }
+            path.delete();
+        }
+    }
+
+
     def table_export_name = "test_export_big_data"
     // create table and insert
     sql """ DROP TABLE IF EXISTS ${table_export_name} """
     sql """
-        CREATE TABLE IF NOT EXISTS ${table_export_name} (
-        `id` int(11) NULL,
-        `name` string NULL,
-        `age` largeint(11) NULL,
-        `dt` date NULL,
-        `dt2` datev2 NULL,
-        `dtime` datetime NULL,
-        `dtime2` datetimev2 NULL
-        )
-        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        CREATE TABLE ${table_export_name} (
+        `user_id` largeint(40) NOT NULL COMMENT 'id',
+        `date` date NOT NULL,
+        `datetime` datetime NOT NULL,
+        `city` varchar(20) NULL,
+        `age` int(11) NULL,
+        `sex` int(11) NULL,
+        `bool_col` boolean NULL,
+        `int_col` int(11) NULL,
+        `bigint_col` bigint(20) NULL,
+        `largeint_col` largeint(40) NULL,
+        `float_col` float NULL,
+        `double_col` double NULL,
+        `char_col` char(10) NULL,
+        `decimal_col` DECIMAL NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`user_id`, `date`, `datetime`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 10
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );  
     """
 
     sql """ INSERT INTO ${table_export_name} select * from s3(
-            "uri" = 
"https://${bucket}.${s3_endpoint}/regression/export_p2/export_orc/test_export_big_data_dataset.csv";,
+            "uri" = 
"https://${bucket}.${s3_endpoint}/regression/export_p2/export_orc/test_export_big_data_dataset.orc";,
             "s3.access_key"= "${ak}",
             "s3.secret_key" = "${sk}",
-            "format" = "csv");
+            "format" = "orc");
         """
 
     def uuid = UUID.randomUUID().toString()
@@ -87,24 +111,28 @@ suite("test_export_big_data", "p2") {
         EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
         PROPERTIES(
             "label" = "${uuid}",
-            "format" = "orc",
-            "column_separator"=","
+            "format" = "orc"
         );
     """
 
-    while (true) {
-        def res = sql """ show export where label = "${uuid}" """
-        logger.info("export state: " + res[0][2])
-        if (res[0][2] == "FINISHED") {
-            def json = parseJson(res[0][11])
-            assert json instanceof List
-            assertEquals("1", json.fileNumber[0])
-            log.info("outfile_path: ${json.url[0]}")
-            return json.url[0];
-        } else if (res[0][2] == "CANCELLED") {
-            throw new IllegalStateException("""export failed: ${res[0][10]}""")
-        } else {
-            sleep(5000)
+    try {
+        while (true) {
+            def res = sql """ show export where label = "${uuid}" """
+            logger.info("export state: " + res[0][2])
+            if (res[0][2] == "FINISHED") {
+                def json = parseJson(res[0][11])
+                assert json instanceof List
+                assertEquals("1", json.fileNumber[0][0])
+                log.info("outfile_path: ${json.url[0][0]}")
+                return json.url[0][0];
+            } else if (res[0][2] == "CANCELLED") {
+                throw new IllegalStateException("""export failed: 
${res[0][10]}""")
+            } else {
+                sleep(5000)
+            }
         }
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${table_export_name}")
+        delete_files.call("${outFilePath}")
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to