This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f4044eb9652366a35fa75d847b552c2bdf518e3b Author: Tiewei Fang <[email protected]> AuthorDate: Wed Aug 30 19:01:44 2023 +0800 [fix](Outfile) fix core dump when export data to orc file format using `outfile` (#23586) * fix * add test --- be/src/vec/runtime/vorc_writer.cpp | 141 ++++++++++++--------- .../suites/export_p2/test_export_big_data.groovy | 82 ++++++++---- 2 files changed, 136 insertions(+), 87 deletions(-) diff --git a/be/src/vec/runtime/vorc_writer.cpp b/be/src/vec/runtime/vorc_writer.cpp index 34f274cc06..df9615d668 100644 --- a/be/src/vec/runtime/vorc_writer.cpp +++ b/be/src/vec/runtime/vorc_writer.cpp @@ -171,7 +171,7 @@ Status VOrcWriterWrapper::close() { RETURN_WRONG_TYPE \ } -#define WRITE_LARGEINT_STRING_INTO_BATCH(VECTOR_BATCH, COLUMN) \ +#define WRITE_LARGEINT_STRING_INTO_BATCH(VECTOR_BATCH, COLUMN, BUFFER) \ VECTOR_BATCH* cur_batch = dynamic_cast<VECTOR_BATCH*>(root->fields[i]); \ const size_t begin_off = offset; \ if (null_map != nullptr) { \ @@ -185,14 +185,14 @@ Status VOrcWriterWrapper::close() { auto value = assert_cast<const COLUMN&>(*col).get_data()[row_id]; \ std::string value_str = fmt::format("{}", value); \ size_t len = value_str.size(); \ - while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ - char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ - memcpy(new_ptr, buffer.data, buffer.size); \ - free(const_cast<char*>(buffer.data)); \ - buffer.data = new_ptr; \ - buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) { \ + char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, BUFFER.data, BUFFER.size); \ + free(const_cast<char*>(BUFFER.data)); \ + BUFFER.data = new_ptr; \ + BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE; \ } \ - strcpy(const_cast<char*>(buffer.data) + offset, value_str.c_str()); \ + strcpy(const_cast<char*>(BUFFER.data) + offset, value_str.c_str()); \ offset += len; \ cur_batch->length[row_id] = len; \ } \ @@ -202,7 +202,7 @@ Status VOrcWriterWrapper::close() { if (null_data[row_id] != 0) { \ cur_batch->notNull[row_id] = 0; \ } else { \ - cur_batch->data[row_id] = const_cast<char*>(buffer.data) + begin_off + data_off; \ + cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + begin_off + data_off; \ data_off += cur_batch->length[row_id]; \ } \ } \ @@ -211,27 +211,27 @@ Status VOrcWriterWrapper::close() { auto value = not_null_column->get_data()[row_id]; \ std::string value_str = fmt::format("{}", value); \ size_t len = value_str.size(); \ - while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ - char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ - memcpy(new_ptr, buffer.data, buffer.size); \ - free(const_cast<char*>(buffer.data)); \ - buffer.data = new_ptr; \ - buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) { \ + char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, BUFFER.data, BUFFER.size); \ + free(const_cast<char*>(BUFFER.data)); \ + BUFFER.data = new_ptr; \ + BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE; \ } \ - strcpy(const_cast<char*>(buffer.data) + offset, value_str.c_str()); \ + strcpy(const_cast<char*>(BUFFER.data) + offset, value_str.c_str()); \ offset += len; \ cur_batch->length[row_id] = len; \ } \ size_t data_off = 0; \ for (size_t row_id = 0; row_id < sz; row_id++) { \ - cur_batch->data[row_id] = const_cast<char*>(buffer.data) + begin_off + data_off; \ + cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + begin_off + data_off; \ data_off += cur_batch->length[row_id]; \ } \ } else { \ RETURN_WRONG_TYPE \ } -#define WRITE_DATE_STRING_INTO_BATCH(FROM, TO) \ +#define WRITE_DATE_STRING_INTO_BATCH(FROM, TO, BUFFER) \ orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(root->fields[i]); \ const size_t begin_off = offset; \ if (null_map != nullptr) { \ @@ -244,13 +244,13 @@ Status VOrcWriterWrapper::close() { cur_batch->notNull[row_id] = 1; \ int len = binary_cast<FROM, TO>( \ assert_cast<const ColumnVector<FROM>&>(*col).get_data()[row_id]) \ - .to_buffer(const_cast<char*>(buffer.data) + offset); \ - while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ - char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ - memcpy(new_ptr, buffer.data, buffer.size); \ - free(const_cast<char*>(buffer.data)); \ - buffer.data = new_ptr; \ - buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + .to_buffer(const_cast<char*>(BUFFER.data) + offset); \ + while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) { \ + char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, BUFFER.data, BUFFER.size); \ + free(const_cast<char*>(BUFFER.data)); \ + BUFFER.data = new_ptr; \ + BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE; \ } \ cur_batch->length[row_id] = len; \ offset += len; \ @@ -261,7 +261,7 @@ Status VOrcWriterWrapper::close() { if (null_data[row_id] != 0) { \ cur_batch->notNull[row_id] = 0; \ } else { \ - cur_batch->data[row_id] = const_cast<char*>(buffer.data) + begin_off + data_off; \ + cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + begin_off + data_off; \ data_off += cur_batch->length[row_id]; \ } \ } \ @@ -269,27 +269,27 @@ Status VOrcWriterWrapper::close() { check_and_get_column<const ColumnVector<FROM>>(col)) { \ for (size_t row_id = 0; row_id < sz; row_id++) { \ int len = binary_cast<FROM, TO>(not_null_column->get_data()[row_id]) \ - .to_buffer(const_cast<char*>(buffer.data) + offset); \ - while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ - char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ - memcpy(new_ptr, buffer.data, buffer.size); \ - free(const_cast<char*>(buffer.data)); \ - buffer.data = new_ptr; \ - buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + .to_buffer(const_cast<char*>(BUFFER.data) + offset); \ + while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) { \ + char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, BUFFER.data, BUFFER.size); \ + free(const_cast<char*>(BUFFER.data)); \ + BUFFER.data = new_ptr; \ + BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE; \ } \ cur_batch->length[row_id] = len; \ offset += len; \ } \ size_t data_off = 0; \ for (size_t row_id = 0; row_id < sz; row_id++) { \ - cur_batch->data[row_id] = const_cast<char*>(buffer.data) + begin_off + data_off; \ + cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + begin_off + data_off; \ data_off += cur_batch->length[row_id]; \ } \ } else { \ RETURN_WRONG_TYPE \ } -#define WRITE_DATETIMEV2_STRING_INTO_BATCH(FROM, TO) \ +#define WRITE_DATETIMEV2_STRING_INTO_BATCH(FROM, TO, BUFFER) \ orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(root->fields[i]); \ const size_t begin_off = offset; \ if (null_map != nullptr) { \ @@ -304,13 +304,13 @@ Status VOrcWriterWrapper::close() { int len = \ binary_cast<FROM, TO>( \ assert_cast<const ColumnVector<FROM>&>(*col).get_data()[row_id]) \ - .to_buffer(const_cast<char*>(buffer.data) + offset, output_scale); \ - while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ - char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ - memcpy(new_ptr, buffer.data, buffer.size); \ - free(const_cast<char*>(buffer.data)); \ - buffer.data = new_ptr; \ - buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + .to_buffer(const_cast<char*>(BUFFER.data) + offset, output_scale); \ + while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) { \ + char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, BUFFER.data, BUFFER.size); \ + free(const_cast<char*>(BUFFER.data)); \ + BUFFER.data = new_ptr; \ + BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE; \ } \ cur_batch->length[row_id] = len; \ offset += len; \ @@ -321,7 +321,7 @@ Status VOrcWriterWrapper::close() { if (null_data[row_id] != 0) { \ cur_batch->notNull[row_id] = 0; \ } else { \ - cur_batch->data[row_id] = const_cast<char*>(buffer.data) + begin_off + data_off; \ + cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + begin_off + data_off; \ data_off += cur_batch->length[row_id]; \ } \ } \ @@ -330,20 +330,20 @@ Status VOrcWriterWrapper::close() { for (size_t row_id = 0; row_id < sz; row_id++) { \ int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; \ int len = binary_cast<FROM, TO>(not_null_column->get_data()[row_id]) \ - .to_buffer(const_cast<char*>(buffer.data) + offset, output_scale); \ - while (buffer.size - BUFFER_RESERVED_SIZE < offset + len) { \ - char* new_ptr = (char*)malloc(buffer.size + BUFFER_UNIT_SIZE); \ - memcpy(new_ptr, buffer.data, buffer.size); \ - free(const_cast<char*>(buffer.data)); \ - buffer.data = new_ptr; \ - buffer.size = buffer.size + BUFFER_UNIT_SIZE; \ + .to_buffer(const_cast<char*>(BUFFER.data) + offset, output_scale); \ + while (BUFFER.size - BUFFER_RESERVED_SIZE < offset + len) { \ + char* new_ptr = (char*)malloc(BUFFER.size + BUFFER_UNIT_SIZE); \ + memcpy(new_ptr, BUFFER.data, BUFFER.size); \ + free(const_cast<char*>(BUFFER.data)); \ + BUFFER.data = new_ptr; \ + BUFFER.size = BUFFER.size + BUFFER_UNIT_SIZE; \ } \ cur_batch->length[row_id] = len; \ offset += len; \ } \ size_t data_off = 0; \ for (size_t row_id = 0; row_id < sz; row_id++) { \ - cur_batch->data[row_id] = const_cast<char*>(buffer.data) + begin_off + data_off; \ + cur_batch->data[row_id] = const_cast<char*>(BUFFER.data) + begin_off + data_off; \ data_off += cur_batch->length[row_id]; \ } \ } else { \ @@ -404,9 +404,14 @@ Status VOrcWriterWrapper::write(const Block& block) { } // Buffer used by date/datetime/datev2/datetimev2/largeint type - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - StringRef buffer(ptr, BUFFER_UNIT_SIZE); - size_t offset = 0; + std::vector<StringRef> bufferList(block.columns()); + Defer defer {[&]() { + for (auto& bufferRef : bufferList) { + if (bufferRef.data) { + free(const_cast<char*>(bufferRef.data)); + } + } + }}; size_t sz = block.rows(); auto row_batch = _create_row_batch(sz); @@ -455,7 +460,12 @@ Status VOrcWriterWrapper::write(const Block& block) { break; } case TYPE_LARGEINT: { - WRITE_LARGEINT_STRING_INTO_BATCH(orc::StringVectorBatch, ColumnVector<Int128>) + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + bufferList[i].data = ptr; + bufferList[i].size = BUFFER_UNIT_SIZE; + size_t offset = 0; + WRITE_LARGEINT_STRING_INTO_BATCH(orc::StringVectorBatch, ColumnVector<Int128>, + bufferList[i]) SET_NUM_ELEMENTS; break; } @@ -472,17 +482,30 @@ Status VOrcWriterWrapper::write(const Block& block) { } case TYPE_DATETIME: case TYPE_DATE: { - WRITE_DATE_STRING_INTO_BATCH(Int64, VecDateTimeValue) + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + bufferList[i].data = ptr; + bufferList[i].size = BUFFER_UNIT_SIZE; + size_t offset = 0; + WRITE_DATE_STRING_INTO_BATCH(Int64, VecDateTimeValue, bufferList[i]) SET_NUM_ELEMENTS break; } case TYPE_DATEV2: { - WRITE_DATE_STRING_INTO_BATCH(UInt32, DateV2Value<DateV2ValueType>) + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + bufferList[i].data = ptr; + bufferList[i].size = BUFFER_UNIT_SIZE; + size_t offset = 0; + WRITE_DATE_STRING_INTO_BATCH(UInt32, DateV2Value<DateV2ValueType>, bufferList[i]) SET_NUM_ELEMENTS break; } case TYPE_DATETIMEV2: { - WRITE_DATETIMEV2_STRING_INTO_BATCH(UInt64, DateV2Value<DateTimeV2ValueType>) + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + bufferList[i].data = ptr; + bufferList[i].size = BUFFER_UNIT_SIZE; + size_t offset = 0; + WRITE_DATETIMEV2_STRING_INTO_BATCH(UInt64, DateV2Value<DateTimeV2ValueType>, + bufferList[i]) SET_NUM_ELEMENTS break; } @@ -594,11 +617,9 @@ Status VOrcWriterWrapper::write(const Block& block) { return Status::InternalError(e.what()); } root->numElements = sz; - _writer->add(*row_batch); _cur_written_rows += sz; - free(const_cast<char*>(buffer.data)); return Status::OK(); } diff --git a/regression-test/suites/export_p2/test_export_big_data.groovy b/regression-test/suites/export_p2/test_export_big_data.groovy index 0547386851..a6841c9ab6 100644 --- a/regression-test/suites/export_p2/test_export_big_data.groovy +++ b/regression-test/suites/export_p2/test_export_big_data.groovy @@ -56,27 +56,51 @@ suite("test_export_big_data", "p2") { String region = getS3Region() String bucket = context.config.otherConfigs.get("s3BucketName"); + + def delete_files = { dir_path -> + File path = new File(dir_path) + if (path.exists()) { + for (File f: path.listFiles()) { + f.delete(); + } + path.delete(); + } + } + + def table_export_name = "test_export_big_data" // create table and insert sql """ DROP TABLE IF EXISTS ${table_export_name} """ sql """ - CREATE TABLE IF NOT EXISTS ${table_export_name} ( - `id` int(11) NULL, - `name` string NULL, - `age` largeint(11) NULL, - `dt` date NULL, - `dt2` datev2 NULL, - `dtime` datetime NULL, - `dtime2` datetimev2 NULL - ) - DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + CREATE TABLE ${table_export_name} ( + `user_id` largeint(40) NOT NULL COMMENT 'id', + `date` date NOT NULL, + `datetime` datetime NOT NULL, + `city` varchar(20) NULL, + `age` int(11) NULL, + `sex` int(11) NULL, + `bool_col` boolean NULL, + `int_col` int(11) NULL, + `bigint_col` bigint(20) NULL, + `largeint_col` largeint(40) NULL, + `float_col` float NULL, + `double_col` double NULL, + `char_col` char(10) NULL, + `decimal_col` DECIMAL NULL + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `datetime`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); """ sql """ INSERT INTO ${table_export_name} select * from s3( - "uri" = "https://${bucket}.${s3_endpoint}/regression/export_p2/export_orc/test_export_big_data_dataset.csv", + "uri" = "https://${bucket}.${s3_endpoint}/regression/export_p2/export_orc/test_export_big_data_dataset.orc", "s3.access_key"= "${ak}", "s3.secret_key" = "${sk}", - "format" = "csv"); + "format" = "orc"); """ def uuid = UUID.randomUUID().toString() @@ -87,24 +111,28 @@ suite("test_export_big_data", "p2") { EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/" PROPERTIES( "label" = "${uuid}", - "format" = "orc", - "column_separator"="," + "format" = "orc" ); """ - while (true) { - def res = sql """ show export where label = "${uuid}" """ - logger.info("export state: " + res[0][2]) - if (res[0][2] == "FINISHED") { - def json = parseJson(res[0][11]) - assert json instanceof List - assertEquals("1", json.fileNumber[0]) - log.info("outfile_path: ${json.url[0]}") - return json.url[0]; - } else if (res[0][2] == "CANCELLED") { - throw new IllegalStateException("""export failed: ${res[0][10]}""") - } else { - sleep(5000) + try { + while (true) { + def res = sql """ show export where label = "${uuid}" """ + logger.info("export state: " + res[0][2]) + if (res[0][2] == "FINISHED") { + def json = parseJson(res[0][11]) + assert json instanceof List + assertEquals("1", json.fileNumber[0][0]) + log.info("outfile_path: ${json.url[0][0]}") + return json.url[0][0]; + } else if (res[0][2] == "CANCELLED") { + throw new IllegalStateException("""export failed: ${res[0][10]}""") + } else { + sleep(5000) + } } + } finally { + try_sql("DROP TABLE IF EXISTS ${table_export_name}") + delete_files.call("${outFilePath}") } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
