This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new d86a9dc481 [feature](outfile) support parquet writer (#12492) (#12873)
d86a9dc481 is described below
commit d86a9dc481b9f9ac5903641b84d44670463d4f2d
Author: Gabriel <[email protected]>
AuthorDate: Fri Sep 23 08:52:17 2022 +0800
[feature](outfile) support parquet writer (#12492) (#12873)
---
be/src/vec/CMakeLists.txt | 3 +-
be/src/vec/runtime/vfile_result_writer.cpp | 43 +-
be/src/vec/runtime/vfile_result_writer.h | 8 +-
be/src/vec/runtime/vparquet_writer.cpp | 654 +++++++++++++++++++++++++++++
be/src/vec/runtime/vparquet_writer.h | 84 ++++
5 files changed, 773 insertions(+), 19 deletions(-)
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 56cfdfcc14..a8355d8a0d 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -182,7 +182,8 @@ set(VEC_FILES
runtime/vdata_stream_mgr.cpp
runtime/vfile_result_writer.cpp
runtime/vpartition_info.cpp
- runtime/vsorted_run_merger.cpp)
+ runtime/vsorted_run_merger.cpp
+ runtime/vparquet_writer.cpp)
add_library(Vec STATIC
${VEC_FILES}
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp
b/be/src/vec/runtime/vfile_result_writer.cpp
index 6d4ecb8db1..7bc56607b8 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -56,7 +56,8 @@ VFileResultWriter::VFileResultWriter(
_parent_profile(parent_profile),
_sinker(sinker),
_output_block(output_block),
- _output_row_descriptor(output_row_descriptor) {
+ _output_row_descriptor(output_row_descriptor),
+ _vparquet_writer(nullptr) {
_output_object_data = output_object_data;
}
@@ -129,7 +130,10 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
// just use file writer is enough
break;
case TFileFormatType::FORMAT_PARQUET:
- return Status::NotSupported("Parquet Writer is not supported yet!");
+ _vparquet_writer.reset(new VParquetWriterWrapper(
+ _file_writer_impl.get(), _output_vexpr_ctxs,
_file_opts->file_properties,
+ _file_opts->schema, _output_object_data));
+ RETURN_IF_ERROR(_vparquet_writer->init());
break;
default:
return Status::InternalError(
@@ -195,18 +199,18 @@ Status VFileResultWriter::append_block(Block& block) {
return Status::OK();
}
SCOPED_TIMER(_append_row_batch_timer);
- if (_parquet_writer != nullptr) {
- return Status::NotSupported("Parquet Writer is not supported yet!");
+ Status status = Status::OK();
+ // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
+ // failed, just return the error status
+ auto output_block =
+
VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, block,
status);
+ auto num_rows = output_block.rows();
+ if (UNLIKELY(num_rows == 0)) {
+ return status;
+ }
+ if (_vparquet_writer) {
+ _write_parquet_file(output_block);
} else {
- Status status = Status::OK();
- // Exec vectorized expr here to speed up, block.rows() == 0 means expr
exec
- // failed, just return the error status
- auto output_block =
VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
-
block, status);
- auto num_rows = output_block.rows();
- if (UNLIKELY(num_rows == 0)) {
- return status;
- }
RETURN_IF_ERROR(_write_csv_file(output_block));
}
@@ -214,6 +218,12 @@ Status VFileResultWriter::append_block(Block& block) {
return Status::OK();
}
+Status VFileResultWriter::_write_parquet_file(const Block& block) {
+ RETURN_IF_ERROR(_vparquet_writer->write(block));
+ // split file if exceed limit
+ return _create_new_file_if_exceed_size();
+}
+
Status VFileResultWriter::_write_csv_file(const Block& block) {
for (size_t i = 0; i < block.rows(); i++) {
for (size_t col_id = 0; col_id < block.columns(); col_id++) {
@@ -348,8 +358,11 @@ Status
VFileResultWriter::_create_new_file_if_exceed_size() {
}
Status VFileResultWriter::_close_file_writer(bool done) {
- if (_parquet_writer != nullptr) {
- return Status::NotSupported("Parquet Writer is not supported yet!");
+ if (_vparquet_writer) {
+ _vparquet_writer->close();
+ _current_written_bytes = _vparquet_writer->written_len();
+ COUNTER_UPDATE(_written_data_bytes, _current_written_bytes);
+ _vparquet_writer.reset(nullptr);
} else if (_file_writer_impl) {
_file_writer_impl->close();
}
diff --git a/be/src/vec/runtime/vfile_result_writer.h
b/be/src/vec/runtime/vfile_result_writer.h
index 5f0bb7971e..a3a4e59a17 100644
--- a/be/src/vec/runtime/vfile_result_writer.h
+++ b/be/src/vec/runtime/vfile_result_writer.h
@@ -20,6 +20,7 @@
#include "exec/file_writer.h"
#include "runtime/file_result_writer.h"
#include "vec/sink/result_sink.h"
+#include "vec/runtime/vparquet_writer.h"
namespace doris {
@@ -48,6 +49,7 @@ public:
int64_t get_written_rows() const override { return 1; }
private:
+ Status _write_parquet_file(const Block& block);
Status _write_csv_file(const Block& block);
// if buffer exceed the limit, write the data buffered in
_plain_text_outstream via file_writer
@@ -81,9 +83,7 @@ private:
// If the result file format is plain text, like CSV, this _file_writer is
owned by this FileResultWriter.
// If the result file format is Parquet, this _file_writer is owned by
_parquet_writer.
- std::unique_ptr<FileWriter> _file_writer_impl;
- // parquet file writer
- ParquetWriterWrapper* _parquet_writer = nullptr;
+ std::unique_ptr<doris::FileWriter> _file_writer_impl;
// Used to buffer the export data of plain text
// TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid
calling
// file writer's write() for every single row.
@@ -119,6 +119,8 @@ private:
bool _is_result_sent = false;
bool _header_sent = false;
RowDescriptor _output_row_descriptor;
+ // parquet file writer
+ std::unique_ptr<VParquetWriterWrapper> _vparquet_writer;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/runtime/vparquet_writer.cpp
b/be/src/vec/runtime/vparquet_writer.cpp
new file mode 100644
index 0000000000..8e376a8149
--- /dev/null
+++ b/be/src/vec/runtime/vparquet_writer.cpp
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vparquet_writer.h"
+
+#include <arrow/array.h>
+#include <arrow/status.h>
+#include <time.h>
+
+#include "io/file_writer.h"
+#include "util/mysql_global.h"
+#include "util/types.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/functions/function_helpers.h"
+
+namespace doris::vectorized {
+
+VParquetWriterWrapper::VParquetWriterWrapper(doris::FileWriter* file_writer,
+ const std::vector<VExprContext*>&
output_vexpr_ctxs,
+ const std::map<std::string,
std::string>& properties,
+ const
std::vector<std::vector<std::string>>& schema,
+ bool output_object_data)
+ : _output_vexpr_ctxs(output_vexpr_ctxs),
+ _str_schema(schema),
+ _cur_written_rows(0),
+ _rg_writer(nullptr),
+ _output_object_data(output_object_data) {
+ _outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
+ parse_properties(properties);
+}
+
+void VParquetWriterWrapper::parse_properties(
+ const std::map<std::string, std::string>& propertie_map) {
+ parquet::WriterProperties::Builder builder;
+ for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) {
+ std::string property_name = it->first;
+ std::string property_value = it->second;
+ if (property_name == "compression") {
+ // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2
+ if (property_value == "snappy") {
+ builder.compression(parquet::Compression::SNAPPY);
+ } else if (property_value == "gzip") {
+ builder.compression(parquet::Compression::GZIP);
+ } else if (property_value == "brotli") {
+ builder.compression(parquet::Compression::BROTLI);
+ } else if (property_value == "zstd") {
+ builder.compression(parquet::Compression::ZSTD);
+ } else if (property_value == "lz4") {
+ builder.compression(parquet::Compression::LZ4);
+ } else if (property_value == "lzo") {
+ builder.compression(parquet::Compression::LZO);
+ } else if (property_value == "bz2") {
+ builder.compression(parquet::Compression::BZ2);
+ } else {
+ builder.compression(parquet::Compression::UNCOMPRESSED);
+ }
+ } else if (property_name == "disable_dictionary") {
+ if (property_value == "true") {
+ builder.enable_dictionary();
+ } else {
+ builder.disable_dictionary();
+ }
+ } else if (property_name == "version") {
+ if (property_value == "v1") {
+ builder.version(parquet::ParquetVersion::PARQUET_1_0);
+ } else {
+ builder.version(parquet::ParquetVersion::PARQUET_2_LATEST);
+ }
+ }
+ }
+ _properties = builder.build();
+}
+
+Status VParquetWriterWrapper::parse_schema(const
std::vector<std::vector<std::string>>& schema) {
+ parquet::schema::NodeVector fields;
+ for (auto column = schema.begin(); column != schema.end(); column++) {
+ std::string repetition_type = (*column)[0];
+ parquet::Repetition::type parquet_repetition_type =
parquet::Repetition::REQUIRED;
+ if (repetition_type.find("required") != std::string::npos) {
+ parquet_repetition_type = parquet::Repetition::REQUIRED;
+ } else if (repetition_type.find("repeated") != std::string::npos) {
+ parquet_repetition_type = parquet::Repetition::REPEATED;
+ } else if (repetition_type.find("optional") != std::string::npos) {
+ parquet_repetition_type = parquet::Repetition::OPTIONAL;
+ } else {
+ parquet_repetition_type = parquet::Repetition::UNDEFINED;
+ }
+
+ std::string data_type = (*column)[1];
+ parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY;
+ if (data_type == "boolean") {
+ parquet_data_type = parquet::Type::BOOLEAN;
+ } else if (data_type.find("int32") != std::string::npos) {
+ parquet_data_type = parquet::Type::INT32;
+ } else if (data_type.find("int64") != std::string::npos) {
+ parquet_data_type = parquet::Type::INT64;
+ } else if (data_type.find("int96") != std::string::npos) {
+ parquet_data_type = parquet::Type::INT96;
+ } else if (data_type.find("float") != std::string::npos) {
+ parquet_data_type = parquet::Type::FLOAT;
+ } else if (data_type.find("double") != std::string::npos) {
+ parquet_data_type = parquet::Type::DOUBLE;
+ } else if (data_type.find("byte_array") != std::string::npos) {
+ parquet_data_type = parquet::Type::BYTE_ARRAY;
+ } else if (data_type.find("fixed_len_byte_array") !=
std::string::npos) {
+ parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
+ } else {
+ parquet_data_type = parquet::Type::UNDEFINED;
+ }
+
+ std::string column_name = (*column)[2];
+ fields.push_back(parquet::schema::PrimitiveNode::Make(column_name,
parquet_repetition_type,
+
parquet::LogicalType::None(),
+
parquet_data_type));
+ _schema = std::static_pointer_cast<parquet::schema::GroupNode>(
+ parquet::schema::GroupNode::Make("schema",
parquet::Repetition::REQUIRED, fields));
+ }
+ return Status::OK();
+}
+
+Status VParquetWriterWrapper::init() {
+ RETURN_IF_ERROR(parse_schema(_str_schema));
+ RETURN_IF_ERROR(init_parquet_writer());
+ RETURN_IF_ERROR(validate_schema());
+ return Status::OK();
+}
+
+Status VParquetWriterWrapper::validate_schema() {
+ for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+ switch (_output_vexpr_ctxs[i]->root()->type().type) {
+ case TYPE_BOOLEAN: {
+ if (_str_schema[i][1] != "boolean") {
+ return Status::InvalidArgument(
+ "project field type is boolean, "
+ "but the definition type of column {} is {}",
+ _str_schema[i][2], _str_schema[i][1]);
+ }
+ break;
+ }
+ case TYPE_TINYINT:
+ case TYPE_SMALLINT:
+ case TYPE_INT: {
+ if (_str_schema[i][1] != "int32") {
+ return Status::InvalidArgument(
+ "project field type is {}, should use int32,"
+ " but the definition type of column {} is {}",
+ _output_vexpr_ctxs[i]->root()->type().debug_string(),
_str_schema[i][2],
+ _str_schema[i][1]);
+ }
+ break;
+ }
+ case TYPE_LARGEINT: {
+ return Status::InvalidArgument("do not support large int type.");
+ }
+ case TYPE_FLOAT: {
+ if (_str_schema[i][1] != "float") {
+ return Status::InvalidArgument(
+ "project field type is float, "
+ "but the definition type of column {} is {}",
+ _str_schema[i][2], _str_schema[i][1]);
+ }
+ break;
+ }
+ case TYPE_DOUBLE: {
+ if (_str_schema[i][1] != "double") {
+ return Status::InvalidArgument(
+ "project field type is double, "
+ "but the definition type of column {} is {}",
+ _str_schema[i][2], _str_schema[i][1]);
+ }
+ break;
+ }
+ case TYPE_BIGINT:
+ case TYPE_DATETIME:
+ case TYPE_DATE:
+ case TYPE_DATEV2:
+ case TYPE_DATETIMEV2: {
+ if (_str_schema[i][1] != "int64") {
+ return Status::InvalidArgument(
+ "project field type is {}, should use int64, "
+ "but the definition type of column {} is {}",
+ _output_vexpr_ctxs[i]->root()->type().debug_string(),
_str_schema[i][2],
+ _str_schema[i][1]);
+ }
+ break;
+ }
+ case TYPE_HLL:
+ case TYPE_OBJECT: {
+ if (!_output_object_data) {
+ return Status::InvalidArgument(
+ "Invalid expression type: {}",
+ _output_vexpr_ctxs[i]->root()->type().debug_string());
+ }
+ [[fallthrough]];
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING:
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128: {
+ if (_str_schema[i][1] != "byte_array") {
+ return Status::InvalidArgument(
+ "project field type is {}, should use byte_array, "
+ "but the definition type of column {} is {}",
+ _output_vexpr_ctxs[i]->root()->type().debug_string(),
_str_schema[i][2],
+ _str_schema[i][1]);
+ }
+ break;
+ }
+ default: {
+ return Status::InvalidArgument("Invalid expression type: {}",
+
_output_vexpr_ctxs[i]->root()->type().debug_string());
+ }
+ }
+ }
+ return Status::OK();
+}
+
+#define RETURN_WRONG_TYPE \
+ return Status::InvalidArgument("Invalid column type: {}",
raw_column->get_name());
+
+#define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE)
\
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
\
+ parquet::WRITER* col_writer =
static_cast<parquet::WRITER*>(rgWriter->column(i)); \
+ __int128 default_value = 0;
\
+ if (null_map != nullptr) {
\
+ for (size_t row_id = 0; row_id < sz; row_id++) {
\
+ col_writer->WriteBatch(1, nullptr, nullptr,
\
+ (*null_map)[row_id] != 0
\
+ ? reinterpret_cast<const
NATIVE_TYPE*>(&default_value) \
+ : reinterpret_cast<const
NATIVE_TYPE*>( \
+ assert_cast<const
COLUMN_TYPE&>(*col) \
+
.get_data_at(row_id) \
+ .data));
\
+ }
\
+ } else if (const auto* not_nullable_column = check_and_get_column<const
COLUMN_TYPE>(col)) { \
+ col_writer->WriteBatch(
\
+ sz, nullptr, nullptr,
\
+ reinterpret_cast<const
NATIVE_TYPE*>(not_nullable_column->get_data().data())); \
+ } else {
\
+ RETURN_WRONG_TYPE
\
+ }
+
+#define DISPATCH_PARQUET_DECIMAL_WRITER(DECIMAL_TYPE)
\
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
\
+ parquet::ByteArrayWriter* col_writer =
\
+ static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
\
+ parquet::ByteArray value;
\
+ auto decimal_type =
\
+
check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get());
\
+ DCHECK(decimal_type);
\
+ if (null_map != nullptr) {
\
+ for (size_t row_id = 0; row_id < sz; row_id++) {
\
+ if ((*null_map)[row_id] != 0) {
\
+ col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ } else {
\
+ auto s = decimal_type->to_string(*col, row_id);
\
+ value.ptr = reinterpret_cast<const uint8_t*>(s.data());
\
+ value.len = s.size();
\
+ col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ }
\
+ }
\
+ } else {
\
+ for (size_t row_id = 0; row_id < sz; row_id++) {
\
+ auto s = decimal_type->to_string(*col, row_id);
\
+ value.ptr = reinterpret_cast<const uint8_t*>(s.data());
\
+ value.len = s.size();
\
+ col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ }
\
+ }
+
+#define DISPATCH_PARQUET_COMPLEX_WRITER(COLUMN_TYPE)
\
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
\
+ parquet::ByteArrayWriter* col_writer =
\
+ static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
\
+ if (null_map != nullptr) {
\
+ for (size_t row_id = 0; row_id < sz; row_id++) {
\
+ if ((*null_map)[row_id] != 0) {
\
+ parquet::ByteArray value;
\
+ col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ } else {
\
+ const auto& tmp = col->get_data_at(row_id);
\
+ parquet::ByteArray value;
\
+ value.ptr = reinterpret_cast<const uint8_t*>(tmp.data);
\
+ value.len = tmp.size;
\
+ col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ }
\
+ }
\
+ } else if (const auto* not_nullable_column = check_and_get_column<const
COLUMN_TYPE>(col)) { \
+ for (size_t row_id = 0; row_id < sz; row_id++) {
\
+ const auto& tmp = not_nullable_column->get_data_at(row_id);
\
+ parquet::ByteArray value;
\
+ value.ptr = reinterpret_cast<const uint8_t*>(tmp.data);
\
+ value.len = tmp.size;
\
+ col_writer->WriteBatch(1, nullptr, nullptr, &value);
\
+ }
\
+ } else {
\
+ RETURN_WRONG_TYPE
\
+ }
+
+Status VParquetWriterWrapper::write(const Block& block) {
+ if (block.rows() == 0) {
+ return Status::OK();
+ }
+ size_t sz = block.rows();
+ try {
+ for (size_t i = 0; i < block.columns(); i++) {
+ auto& raw_column = block.get_by_position(i).column;
+ const auto col = raw_column->is_nullable()
+ ? reinterpret_cast<const ColumnNullable*>(
+
block.get_by_position(i).column.get())
+ ->get_nested_column_ptr()
+ .get()
+ : block.get_by_position(i).column.get();
+ auto null_map =
+ raw_column->is_nullable() && reinterpret_cast<const
ColumnNullable*>(
+
block.get_by_position(i).column.get())
+
->get_null_map_column_ptr()
+ ->has_null()
+ ? reinterpret_cast<const ColumnNullable*>(
+ block.get_by_position(i).column.get())
+ ->get_null_map_column_ptr()
+ : nullptr;
+ auto& type = block.get_by_position(i).type;
+ switch (_output_vexpr_ctxs[i]->root()->type().type) {
+ case TYPE_BOOLEAN: {
+ DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter,
ColumnVector<UInt8>, bool)
+ break;
+ }
+ case TYPE_BIGINT: {
+ DISPATCH_PARQUET_NUMERIC_WRITER(Int64Writer,
ColumnVector<Int64>, int64_t)
+ break;
+ }
+ case TYPE_LARGEINT: {
+ return Status::InvalidArgument("do not support large int
type.");
+ }
+ case TYPE_FLOAT: {
+ DISPATCH_PARQUET_NUMERIC_WRITER(FloatWriter,
ColumnVector<Float32>, float_t)
+ break;
+ }
+ case TYPE_DOUBLE: {
+ DISPATCH_PARQUET_NUMERIC_WRITER(DoubleWriter,
ColumnVector<Float64>, double_t)
+ break;
+ }
+ case TYPE_TINYINT:
+ case TYPE_SMALLINT:
+ case TYPE_INT: {
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::Int32Writer* col_writer =
+
static_cast<parquet::Int32Writer*>(rgWriter->column(i));
+ int32_t default_int32 = 0;
+ if (null_map != nullptr) {
+ if (const auto* nested_column =
+ check_and_get_column<const
ColumnVector<Int32>>(col)) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ col_writer->WriteBatch(
+ 1, nullptr, nullptr,
+ (*null_map)[row_id] != 0
+ ? &default_int32
+ : reinterpret_cast<const int32_t*>(
+
nested_column->get_data_at(row_id).data));
+ }
+ } else if (const auto* int16_column =
+ check_and_get_column<const
ColumnVector<Int16>>(col)) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ const int32_t tmp =
int16_column->get_data()[row_id];
+ col_writer->WriteBatch(
+ 1, nullptr, nullptr,
+ (*null_map)[row_id] != 0
+ ? &default_int32
+ : reinterpret_cast<const
int32_t*>(&tmp));
+ }
+ } else if (const auto* int8_column =
+ check_and_get_column<const
ColumnVector<Int8>>(col)) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ const int32_t tmp =
int8_column->get_data()[row_id];
+ col_writer->WriteBatch(
+ 1, nullptr, nullptr,
+ (*null_map)[row_id] != 0
+ ? &default_int32
+ : reinterpret_cast<const
int32_t*>(&tmp));
+ }
+ } else {
+ RETURN_WRONG_TYPE
+ }
+ } else if (const auto* not_nullable_column =
+ check_and_get_column<const
ColumnVector<Int32>>(col)) {
+ col_writer->WriteBatch(sz, nullptr, nullptr,
+ reinterpret_cast<const int32_t*>(
+
not_nullable_column->get_data().data()));
+ } else if (const auto& int16_column =
+ check_and_get_column<const
ColumnVector<Int16>>(col)) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ const int32_t tmp = int16_column->get_data()[row_id];
+ col_writer->WriteBatch(1, nullptr, nullptr,
+ reinterpret_cast<const
int32_t*>(&tmp));
+ }
+ } else if (const auto& int8_column =
+ check_and_get_column<const
ColumnVector<Int8>>(col)) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ const int32_t tmp = int8_column->get_data()[row_id];
+ col_writer->WriteBatch(1, nullptr, nullptr,
+ reinterpret_cast<const
int32_t*>(&tmp));
+ }
+ } else {
+ RETURN_WRONG_TYPE
+ }
+ break;
+ }
+ case TYPE_DATETIME:
+ case TYPE_DATE: {
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::Int64Writer* col_writer =
+
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
+ int64_t default_int64 = 0;
+ if (null_map != nullptr) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ if ((*null_map)[row_id] != 0) {
+ col_writer->WriteBatch(1, nullptr, nullptr,
&default_int64);
+ } else {
+ const auto tmp = binary_cast<Int64,
VecDateTimeValue>(
+ assert_cast<const
ColumnVector<Int64>&>(*col)
+
.get_data()[row_id])
+ .to_olap_datetime();
+ col_writer->WriteBatch(1, nullptr, nullptr,
+ reinterpret_cast<const
int64_t*>(&tmp));
+ }
+ }
+ } else if (const auto* not_nullable_column =
+ check_and_get_column<const
ColumnVector<Int64>>(col)) {
+ std::vector<uint64_t> res(sz);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ res[row_id] = binary_cast<Int64, VecDateTimeValue>(
+
not_nullable_column->get_data()[row_id])
+ .to_olap_datetime();
+ }
+ col_writer->WriteBatch(sz, nullptr, nullptr,
+ reinterpret_cast<const
int64_t*>(res.data()));
+ } else {
+ RETURN_WRONG_TYPE
+ }
+ break;
+ }
+ case TYPE_DATEV2: {
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::Int64Writer* col_writer =
+
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
+ int64_t default_int64 = 0;
+ if (null_map != nullptr) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ if ((*null_map)[row_id] != 0) {
+ col_writer->WriteBatch(1, nullptr, nullptr,
&default_int64);
+ } else {
+ uint64_t tmp = binary_cast<UInt32,
DateV2Value<DateV2ValueType>>(
+ assert_cast<const
ColumnVector<UInt32>&>(*col)
+ .get_data()[row_id])
+ .to_olap_datetime();
+ col_writer->WriteBatch(1, nullptr, nullptr,
+ reinterpret_cast<const
int64_t*>(&tmp));
+ }
+ }
+ } else if (const auto* not_nullable_column =
+ check_and_get_column<const
ColumnVector<UInt32>>(col)) {
+ std::vector<uint64_t> res(sz);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ res[row_id] = binary_cast<UInt32,
DateV2Value<DateV2ValueType>>(
+
not_nullable_column->get_data()[row_id])
+ .to_olap_datetime();
+ }
+ col_writer->WriteBatch(sz, nullptr, nullptr,
+ reinterpret_cast<const
int64_t*>(res.data()));
+ } else {
+ RETURN_WRONG_TYPE
+ }
+ break;
+ }
+ case TYPE_DATETIMEV2: {
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::Int64Writer* col_writer =
+
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
+ int64_t default_int64 = 0;
+ if (null_map != nullptr) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ if ((*null_map)[row_id] != 0) {
+ col_writer->WriteBatch(1, nullptr, nullptr,
&default_int64);
+ } else {
+ uint64_t tmp = binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(
+ assert_cast<const
ColumnVector<UInt64>&>(*col)
+ .get_data()[row_id])
+ .to_olap_datetime();
+ col_writer->WriteBatch(1, nullptr, nullptr,
+ reinterpret_cast<const
int64_t*>(&tmp));
+ }
+ }
+ } else if (const auto* not_nullable_column =
+ check_and_get_column<const
ColumnVector<UInt64>>(col)) {
+ std::vector<uint64_t> res(sz);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ res[row_id] = binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(
+
not_nullable_column->get_data()[row_id])
+ .to_olap_datetime();
+ }
+ col_writer->WriteBatch(sz, nullptr, nullptr,
+ reinterpret_cast<const
int64_t*>(res.data()));
+ } else {
+ RETURN_WRONG_TYPE
+ }
+ break;
+ }
+ case TYPE_OBJECT: {
+ DISPATCH_PARQUET_COMPLEX_WRITER(ColumnBitmap)
+ break;
+ }
+ case TYPE_HLL: {
+ DISPATCH_PARQUET_COMPLEX_WRITER(ColumnHLL)
+ break;
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ DISPATCH_PARQUET_COMPLEX_WRITER(ColumnString)
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::ByteArrayWriter* col_writer =
+
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
+ parquet::ByteArray value;
+ if (null_map != nullptr) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ if ((*null_map)[row_id] != 0) {
+ col_writer->WriteBatch(1, nullptr, nullptr,
&value);
+ } else {
+ const DecimalV2Value
decimal_val(reinterpret_cast<const PackedInt128*>(
+
col->get_data_at(row_id).data)
+ ->value);
+ char decimal_buffer[MAX_DECIMAL_WIDTH];
+ int output_scale =
_output_vexpr_ctxs[i]->root()->type().scale;
+ value.ptr = reinterpret_cast<const
uint8_t*>(decimal_buffer);
+ value.len = decimal_val.to_buffer(decimal_buffer,
output_scale);
+ col_writer->WriteBatch(1, nullptr, nullptr,
&value);
+ }
+ }
+ } else if (const auto* not_nullable_column =
+ check_and_get_column<const
ColumnDecimal128>(col)) {
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ const DecimalV2Value decimal_val(
+ reinterpret_cast<const PackedInt128*>(
+
not_nullable_column->get_data_at(row_id).data)
+ ->value);
+ char decimal_buffer[MAX_DECIMAL_WIDTH];
+ int output_scale =
_output_vexpr_ctxs[i]->root()->type().scale;
+ value.ptr = reinterpret_cast<const
uint8_t*>(decimal_buffer);
+ value.len = decimal_val.to_buffer(decimal_buffer,
output_scale);
+ col_writer->WriteBatch(1, nullptr, nullptr, &value);
+ }
+ } else {
+ RETURN_WRONG_TYPE
+ }
+ break;
+ }
+ case TYPE_DECIMAL32: {
+ DISPATCH_PARQUET_DECIMAL_WRITER(Decimal32)
+ break;
+ }
+ case TYPE_DECIMAL64: {
+ DISPATCH_PARQUET_DECIMAL_WRITER(Decimal64)
+ break;
+ }
+ case TYPE_DECIMAL128: {
+ DISPATCH_PARQUET_DECIMAL_WRITER(Decimal128)
+ break;
+ }
+ default: {
+ return Status::InvalidArgument(
+ "Invalid expression type: {}",
+ _output_vexpr_ctxs[i]->root()->type().debug_string());
+ }
+ }
+ }
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Parquet write error: " << e.what();
+ return Status::InternalError(e.what());
+ }
+ _cur_written_rows += sz;
+ return Status::OK();
+}
+
+Status VParquetWriterWrapper::init_parquet_writer() {
+ _writer = parquet::ParquetFileWriter::Open(_outstream, _schema,
_properties);
+ if (_writer == nullptr) {
+ return Status::InternalError("Failed to create file writer");
+ }
+ return Status::OK();
+}
+
+parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() {
+ if (_rg_writer == nullptr) {
+ _rg_writer = _writer->AppendBufferedRowGroup();
+ }
+ if (_cur_written_rows > _max_row_per_group) {
+ _rg_writer->Close();
+ _rg_writer = _writer->AppendBufferedRowGroup();
+ _cur_written_rows = 0;
+ }
+ return _rg_writer;
+}
+
+int64_t VParquetWriterWrapper::written_len() {
+ return _outstream->get_written_len();
+}
+
+void VParquetWriterWrapper::close() {
+ try {
+ if (_rg_writer != nullptr) {
+ _rg_writer->Close();
+ _rg_writer = nullptr;
+ }
+ _writer->Close();
+ arrow::Status st = _outstream->Close();
+ if (!st.ok()) {
+ LOG(WARNING) << "close parquet file error: " << st.ToString();
+ }
+ } catch (const std::exception& e) {
+ _rg_writer = nullptr;
+ LOG(WARNING) << "Parquet writer close error: " << e.what();
+ }
+}
+
+VParquetWriterWrapper::~VParquetWriterWrapper() {}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vparquet_writer.h
b/be/src/vec/runtime/vparquet_writer.h
new file mode 100644
index 0000000000..111ad6d689
--- /dev/null
+++ b/be/src/vec/runtime/vparquet_writer.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/api.h>
+#include <arrow/buffer.h>
+#include <arrow/io/api.h>
+#include <arrow/io/file.h>
+#include <arrow/io/interfaces.h>
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/exception.h>
+#include <stdint.h>
+
+#include <map>
+#include <string>
+
+#include "common/status.h"
+#include "exec/parquet_writer.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris::vectorized {
+class FileWriter;
+
+// a wrapper of parquet output stream
+class VParquetWriterWrapper {
+public:
+ VParquetWriterWrapper(doris::FileWriter* file_writer,
+ const std::vector<VExprContext*>& output_vexpr_ctxs,
+ const std::map<std::string, std::string>& properties,
+ const std::vector<std::vector<std::string>>& schema,
+ bool output_object_data);
+ virtual ~VParquetWriterWrapper();
+
+ Status init();
+
+ Status validate_schema();
+
+ Status write(const Block& block);
+
+ Status init_parquet_writer();
+
+ void close();
+
+ void parse_properties(const std::map<std::string, std::string>&
propertie_map);
+
+ Status parse_schema(const std::vector<std::vector<std::string>>& schema);
+
+ parquet::RowGroupWriter* get_rg_writer();
+
+ int64_t written_len();
+
+private:
+ std::shared_ptr<ParquetOutputStream> _outstream;
+ std::shared_ptr<parquet::WriterProperties> _properties;
+ std::shared_ptr<parquet::schema::GroupNode> _schema;
+ std::unique_ptr<parquet::ParquetFileWriter> _writer;
+ const std::vector<VExprContext*>& _output_vexpr_ctxs;
+ std::vector<std::vector<std::string>> _str_schema;
+ int64_t _cur_written_rows = 0;
+ parquet::RowGroupWriter* _rg_writer;
+ const int64_t _max_row_per_group = 10;
+ bool _output_object_data;
+};
+
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]