This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a36c387a2b [Refactor](transformer) convert to file format writer to
transformer (#23888)
a36c387a2b is described below
commit a36c387a2b14f843ac56ef4a0332d00bbc7cc5f7
Author: HappenLee <[email protected]>
AuthorDate: Tue Sep 5 10:50:10 2023 +0800
[Refactor](transformer) convert to file format writer to transformer
(#23888)
---
be/src/vec/runtime/vcsv_transformer.cpp | 262 +++++++++++++++++++++
be/src/vec/runtime/vcsv_transformer.h | 75 ++++++
be/src/vec/runtime/vfile_writer_wrapper.h | 8 +-
.../{vorc_writer.cpp => vorc_transformer.cpp} | 20 +-
.../runtime/{vorc_writer.h => vorc_transformer.h} | 13 +-
...parquet_writer.cpp => vparquet_transformer.cpp} | 32 +--
.../{vparquet_writer.h => vparquet_transformer.h} | 18 +-
be/src/vec/sink/writer/vfile_result_writer.cpp | 243 ++-----------------
be/src/vec/sink/writer/vfile_result_writer.h | 16 +-
9 files changed, 402 insertions(+), 285 deletions(-)
diff --git a/be/src/vec/runtime/vcsv_transformer.cpp
b/be/src/vec/runtime/vcsv_transformer.cpp
new file mode 100644
index 0000000000..da5a697460
--- /dev/null
+++ b/be/src/vec/runtime/vcsv_transformer.cpp
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vcsv_transformer.h"
+
+#include <glog/logging.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <exception>
+#include <ostream>
+
+#include "gutil/strings/numbers.h"
+#include "io/fs/file_writer.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/large_int_value.h"
+#include "runtime/primitive_type.h"
+#include "runtime/types.h"
+#include "util/binary_cast.hpp"
+#include "util/mysql_global.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/columns/columns_number.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/pod_array.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/types.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/runtime/vdatetime_value.h"
+
+namespace doris::vectorized {
+
+VCSVTransformer::VCSVTransformer(doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs,
+ bool output_object_data, std::string_view
header_type,
+ std::string_view header, std::string_view
column_separator,
+ std::string_view line_delimiter)
+ : VFileFormatTransformer(output_vexpr_ctxs, output_object_data),
+ _column_separator(column_separator),
+ _line_delimiter(line_delimiter),
+ _file_writer(file_writer) {
+ if (header.size() > 0) {
+ _csv_header = header;
+ if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+ _csv_header += _gen_csv_header_types();
+ }
+ } else {
+ _csv_header = "";
+ }
+}
+
+Status VCSVTransformer::open() {
+ if (!_csv_header.empty()) {
+ return _file_writer->append(Slice(_csv_header.data(),
_csv_header.size()));
+ }
+ return Status::OK();
+}
+
+int64_t VCSVTransformer::written_len() {
+ return _file_writer->bytes_appended();
+}
+
+Status VCSVTransformer::close() {
+ return _file_writer->close();
+}
+
+Status VCSVTransformer::write(const Block& block) {
+ using doris::operator<<;
+ for (size_t i = 0; i < block.rows(); i++) {
+ for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+ auto col = block.get_by_position(col_id);
+ if (col.column->is_null_at(i)) {
+ _plain_text_outstream << NULL_IN_CSV;
+ } else {
+ switch (_output_vexpr_ctxs[col_id]->root()->type().type) {
+ case TYPE_BOOLEAN:
+ case TYPE_TINYINT:
+ _plain_text_outstream << (int)*reinterpret_cast<const
int8_t*>(
+ col.column->get_data_at(i).data);
+ break;
+ case TYPE_SMALLINT:
+ _plain_text_outstream
+ << *reinterpret_cast<const
int16_t*>(col.column->get_data_at(i).data);
+ break;
+ case TYPE_INT:
+ _plain_text_outstream
+ << *reinterpret_cast<const
int32_t*>(col.column->get_data_at(i).data);
+ break;
+ case TYPE_BIGINT:
+ _plain_text_outstream
+ << *reinterpret_cast<const
int64_t*>(col.column->get_data_at(i).data);
+ break;
+ case TYPE_LARGEINT:
+ _plain_text_outstream
+ << *reinterpret_cast<const
__int128*>(col.column->get_data_at(i).data);
+ break;
+ case TYPE_FLOAT: {
+ char buffer[MAX_FLOAT_STR_LENGTH + 2];
+ float float_value =
+ *reinterpret_cast<const
float*>(col.column->get_data_at(i).data);
+ buffer[0] = '\0';
+ int length = FloatToBuffer(float_value,
MAX_FLOAT_STR_LENGTH, buffer);
+ DCHECK(length >= 0) << "gcvt float failed, float value="
<< float_value;
+ _plain_text_outstream << buffer;
+ break;
+ }
+ case TYPE_DOUBLE: {
+ // To prevent loss of precision on float and double types,
+ // they are converted to strings before output.
+ // For example: For a double value 27361919854.929001,
+ // the direct output of using std::stringstream is
2.73619e+10,
+ // and after conversion to a string, it outputs
27361919854.929001
+ char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+ double double_value =
+ *reinterpret_cast<const
double*>(col.column->get_data_at(i).data);
+ buffer[0] = '\0';
+ int length = DoubleToBuffer(double_value,
MAX_DOUBLE_STR_LENGTH, buffer);
+ DCHECK(length >= 0) << "gcvt double failed, double value="
<< double_value;
+ _plain_text_outstream << buffer;
+ break;
+ }
+ case TYPE_DATEV2: {
+ char buf[64];
+ const DateV2Value<DateV2ValueType>* time_val =
+ (const
DateV2Value<DateV2ValueType>*)(col.column->get_data_at(i).data);
+ time_val->to_string(buf);
+ _plain_text_outstream << buf;
+ break;
+ }
+ case TYPE_DATETIMEV2: {
+ char buf[64];
+ const DateV2Value<DateTimeV2ValueType>* time_val =
+ (const
DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i)
+
.data);
+ time_val->to_string(buf,
_output_vexpr_ctxs[col_id]->root()->type().scale);
+ _plain_text_outstream << buf;
+ break;
+ }
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ char buf[64];
+ const VecDateTimeValue* time_val =
+ (const
VecDateTimeValue*)(col.column->get_data_at(i).data);
+ time_val->to_string(buf);
+ _plain_text_outstream << buf;
+ break;
+ }
+ case TYPE_OBJECT:
+ case TYPE_HLL: {
+ if (!_output_object_data) {
+ _plain_text_outstream << NULL_IN_CSV;
+ break;
+ }
+ [[fallthrough]];
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ auto value = col.column->get_data_at(i);
+ _plain_text_outstream << value;
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ const DecimalV2Value decimal_val(
+ reinterpret_cast<const
PackedInt128*>(col.column->get_data_at(i).data)
+ ->value);
+ std::string decimal_str;
+ decimal_str = decimal_val.to_string();
+ _plain_text_outstream << decimal_str;
+ break;
+ }
+ case TYPE_DECIMAL32: {
+ _plain_text_outstream << col.type->to_string(*col.column,
i);
+ break;
+ }
+ case TYPE_DECIMAL64: {
+ _plain_text_outstream << col.type->to_string(*col.column,
i);
+ break;
+ }
+ case TYPE_DECIMAL128I: {
+ _plain_text_outstream << col.type->to_string(*col.column,
i);
+ break;
+ }
+ case TYPE_ARRAY: {
+ _plain_text_outstream << col.type->to_string(*col.column,
i);
+ break;
+ }
+ case TYPE_MAP: {
+ _plain_text_outstream << col.type->to_string(*col.column,
i);
+ break;
+ }
+ case TYPE_STRUCT: {
+ _plain_text_outstream << col.type->to_string(*col.column,
i);
+ break;
+ }
+ default: {
+ // not supported type, like BITMAP, just export null
+ _plain_text_outstream << NULL_IN_CSV;
+ }
+ }
+ }
+ if (col_id < block.columns() - 1) {
+ _plain_text_outstream << _column_separator;
+ }
+ }
+ _plain_text_outstream << _line_delimiter;
+ }
+
+ return _flush_plain_text_outstream();
+}
+
+Status VCSVTransformer::_flush_plain_text_outstream() {
+ size_t pos = _plain_text_outstream.tellp();
+ if (pos == 0) {
+ return Status::OK();
+ }
+
+ const std::string& buf = _plain_text_outstream.str();
+ RETURN_IF_ERROR(_file_writer->append(buf));
+
+ // clear the stream
+ _plain_text_outstream.str("");
+ _plain_text_outstream.clear();
+
+ return Status::OK();
+}
+
+std::string VCSVTransformer::_gen_csv_header_types() {
+ std::string types;
+ int num_columns = _output_vexpr_ctxs.size();
+ for (int i = 0; i < num_columns; ++i) {
+ types += type_to_string(_output_vexpr_ctxs[i]->root()->type().type);
+ if (i < num_columns - 1) {
+ types += _column_separator;
+ }
+ }
+ types += _line_delimiter;
+ return types;
+}
+
+const std::string VCSVTransformer::NULL_IN_CSV = "\\N";
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vcsv_transformer.h
b/be/src/vec/runtime/vcsv_transformer.h
new file mode 100644
index 0000000000..fb3232ac93
--- /dev/null
+++ b/be/src/vec/runtime/vcsv_transformer.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/io/interfaces.h>
+#include <arrow/result.h>
+#include <arrow/status.h>
+#include <gen_cpp/DataSinks_types.h>
+#include <parquet/file_writer.h>
+#include <parquet/properties.h>
+#include <parquet/types.h>
+#include <stdint.h>
+
+#include "vfile_writer_wrapper.h"
+
+namespace doris {
+namespace io {
+class FileWriter;
+} // namespace io
+} // namespace doris
+
+namespace doris::vectorized {
+
+class VCSVTransformer final : public VFileFormatTransformer {
+public:
+ VCSVTransformer(doris::io::FileWriter* file_writer, const
VExprContextSPtrs& output_vexpr_ctxs,
+ bool output_object_data, std::string_view header_type,
std::string_view header,
+ std::string_view column_separator, std::string_view
line_delimiter);
+
+ ~VCSVTransformer() = default;
+
+ Status open() override;
+
+ Status write(const Block& block) override;
+
+ Status close() override;
+
+ int64_t written_len() override;
+
+private:
+ Status _flush_plain_text_outstream();
+ std::string _gen_csv_header_types();
+
+ static const std::string NULL_IN_CSV;
+ std::string _csv_header;
+ std::string_view _column_separator;
+ std::string_view _line_delimiter;
+
+ doris::io::FileWriter* _file_writer;
+ // Used to buffer the export data of plain text
+ // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid
calling
+ // file writer's write() for every single row.
+ // But this cannot solve the problem of a row of data that is too large.
+ // For example: bitmap_to_string() may return large volume of data.
+ // And the speed is relative low, in my test, is about 6.5MB/s.
+ std::stringstream _plain_text_outstream;
+ static const size_t OUTSTREAM_BUFFER_SIZE_BYTES;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vfile_writer_wrapper.h
b/be/src/vec/runtime/vfile_writer_wrapper.h
index e418a14ffa..e94f6b0ead 100644
--- a/be/src/vec/runtime/vfile_writer_wrapper.h
+++ b/be/src/vec/runtime/vfile_writer_wrapper.h
@@ -26,16 +26,16 @@
namespace doris::vectorized {
-class VFileWriterWrapper {
+class VFileFormatTransformer {
public:
- VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool
output_object_data)
+ VFileFormatTransformer(const VExprContextSPtrs& output_vexpr_ctxs, bool
output_object_data)
: _output_vexpr_ctxs(output_vexpr_ctxs),
_cur_written_rows(0),
_output_object_data(output_object_data) {}
- virtual ~VFileWriterWrapper() = default;
+ virtual ~VFileFormatTransformer() = default;
- virtual Status prepare() = 0;
+ virtual Status open() = 0;
virtual Status write(const Block& block) = 0;
diff --git a/be/src/vec/runtime/vorc_writer.cpp
b/be/src/vec/runtime/vorc_transformer.cpp
similarity index 98%
rename from be/src/vec/runtime/vorc_writer.cpp
rename to be/src/vec/runtime/vorc_transformer.cpp
index df9615d668..fa83e83e3a 100644
--- a/be/src/vec/runtime/vorc_writer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/runtime/vorc_writer.h"
+#include "vec/runtime/vorc_transformer.h"
#include <glog/logging.h>
#include <stdlib.h>
@@ -85,15 +85,15 @@ void VOrcOutputStream::set_written_len(int64_t written_len)
{
_written_len = written_len;
}
-VOrcWriterWrapper::VOrcWriterWrapper(doris::io::FileWriter* file_writer,
- const VExprContextSPtrs&
output_vexpr_ctxs,
- const std::string& schema, bool
output_object_data)
- : VFileWriterWrapper(output_vexpr_ctxs, output_object_data),
+VOrcTransformer::VOrcTransformer(doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs,
+ const std::string& schema, bool
output_object_data)
+ : VFileFormatTransformer(output_vexpr_ctxs, output_object_data),
_file_writer(file_writer),
_write_options(new orc::WriterOptions()),
_schema_str(schema) {}
-Status VOrcWriterWrapper::prepare() {
+Status VOrcTransformer::open() {
try {
_schema = orc::Type::buildTypeFromString(_schema_str);
} catch (const std::exception& e) {
@@ -108,15 +108,15 @@ Status VOrcWriterWrapper::prepare() {
return Status::OK();
}
-std::unique_ptr<orc::ColumnVectorBatch>
VOrcWriterWrapper::_create_row_batch(size_t sz) {
+std::unique_ptr<orc::ColumnVectorBatch>
VOrcTransformer::_create_row_batch(size_t sz) {
return _writer->createRowBatch(sz);
}
-int64_t VOrcWriterWrapper::written_len() {
+int64_t VOrcTransformer::written_len() {
return _output_stream->getLength();
}
-Status VOrcWriterWrapper::close() {
+Status VOrcTransformer::close() {
if (_writer != nullptr) {
try {
_writer->close();
@@ -398,7 +398,7 @@ Status VOrcWriterWrapper::close() {
#define SET_NUM_ELEMENTS cur_batch->numElements = sz;
-Status VOrcWriterWrapper::write(const Block& block) {
+Status VOrcTransformer::write(const Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
diff --git a/be/src/vec/runtime/vorc_writer.h
b/be/src/vec/runtime/vorc_transformer.h
similarity index 88%
rename from be/src/vec/runtime/vorc_writer.h
rename to be/src/vec/runtime/vorc_transformer.h
index 9afed17e20..06a42361fb 100644
--- a/be/src/vec/runtime/vorc_writer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -29,7 +29,7 @@
#include "orc/Type.hh"
#include "orc/Writer.hh"
#include "vec/core/block.h"
-#include "vec/runtime/vparquet_writer.h"
+#include "vec/runtime/vparquet_transformer.h"
namespace doris {
namespace io {
@@ -72,15 +72,14 @@ private:
};
// a wrapper of parquet output stream
-class VOrcWriterWrapper final : public VFileWriterWrapper {
+class VOrcTransformer final : public VFileFormatTransformer {
public:
- VOrcWriterWrapper(doris::io::FileWriter* file_writer,
- const VExprContextSPtrs& output_vexpr_ctxs, const
std::string& schema,
- bool output_object_data);
+ VOrcTransformer(doris::io::FileWriter* file_writer, const
VExprContextSPtrs& output_vexpr_ctxs,
+ const std::string& schema, bool output_object_data);
- ~VOrcWriterWrapper() = default;
+ ~VOrcTransformer() = default;
- Status prepare() override;
+ Status open() override;
Status write(const Block& block) override;
diff --git a/be/src/vec/runtime/vparquet_writer.cpp
b/be/src/vec/runtime/vparquet_transformer.cpp
similarity index 97%
rename from be/src/vec/runtime/vparquet_writer.cpp
rename to be/src/vec/runtime/vparquet_transformer.cpp
index 0a6392319b..7d1ceed404 100644
--- a/be/src/vec/runtime/vparquet_writer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/runtime/vparquet_writer.h"
+#include "vec/runtime/vparquet_transformer.h"
#include <arrow/io/type_fwd.h>
#include <glog/logging.h>
@@ -278,14 +278,14 @@ void
ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
}
}
-VParquetWriterWrapper::VParquetWriterWrapper(doris::io::FileWriter*
file_writer,
- const VExprContextSPtrs&
output_vexpr_ctxs,
- const
std::vector<TParquetSchema>& parquet_schemas,
- const
TParquetCompressionType::type& compression_type,
- const bool&
parquet_disable_dictionary,
- const TParquetVersion::type&
parquet_version,
- bool output_object_data)
- : VFileWriterWrapper(output_vexpr_ctxs, output_object_data),
+VParquetTransformer::VParquetTransformer(doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs&
output_vexpr_ctxs,
+ const std::vector<TParquetSchema>&
parquet_schemas,
+ const TParquetCompressionType::type&
compression_type,
+ const bool&
parquet_disable_dictionary,
+ const TParquetVersion::type&
parquet_version,
+ bool output_object_data)
+ : VFileFormatTransformer(output_vexpr_ctxs, output_object_data),
_rg_writer(nullptr),
_parquet_schemas(parquet_schemas),
_compression_type(compression_type),
@@ -294,7 +294,7 @@
VParquetWriterWrapper::VParquetWriterWrapper(doris::io::FileWriter* file_writer,
_outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
}
-Status VParquetWriterWrapper::parse_properties() {
+Status VParquetTransformer::parse_properties() {
try {
parquet::WriterProperties::Builder builder;
ParquetBuildHelper::build_compression_type(builder, _compression_type);
@@ -311,7 +311,7 @@ Status VParquetWriterWrapper::parse_properties() {
return Status::OK();
}
-Status VParquetWriterWrapper::parse_schema() {
+Status VParquetTransformer::parse_schema() {
parquet::schema::NodeVector fields;
parquet::Repetition::type parquet_repetition_type;
parquet::Type::type parquet_physical_type;
@@ -394,7 +394,7 @@ Status VParquetWriterWrapper::parse_schema() {
RETURN_WRONG_TYPE
\
}
-Status VParquetWriterWrapper::write(const Block& block) {
+Status VParquetTransformer::write(const Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
@@ -906,7 +906,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
return Status::OK();
}
-Status VParquetWriterWrapper::prepare() {
+Status VParquetTransformer::open() {
RETURN_IF_ERROR(parse_properties());
RETURN_IF_ERROR(parse_schema());
try {
@@ -921,7 +921,7 @@ Status VParquetWriterWrapper::prepare() {
return Status::OK();
}
-parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() {
+parquet::RowGroupWriter* VParquetTransformer::get_rg_writer() {
if (_rg_writer == nullptr) {
_rg_writer = _writer->AppendBufferedRowGroup();
}
@@ -933,11 +933,11 @@ parquet::RowGroupWriter*
VParquetWriterWrapper::get_rg_writer() {
return _rg_writer;
}
-int64_t VParquetWriterWrapper::written_len() {
+int64_t VParquetTransformer::written_len() {
return _outstream->get_written_len();
}
-Status VParquetWriterWrapper::close() {
+Status VParquetTransformer::close() {
try {
if (_rg_writer != nullptr) {
_rg_writer->Close();
diff --git a/be/src/vec/runtime/vparquet_writer.h
b/be/src/vec/runtime/vparquet_transformer.h
similarity index 87%
rename from be/src/vec/runtime/vparquet_writer.h
rename to be/src/vec/runtime/vparquet_transformer.h
index 36514c9fe8..1fedc3f44f 100644
--- a/be/src/vec/runtime/vparquet_writer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -86,18 +86,18 @@ public:
};
// a wrapper of parquet output stream
-class VParquetWriterWrapper final : public VFileWriterWrapper {
+class VParquetTransformer final : public VFileFormatTransformer {
public:
- VParquetWriterWrapper(doris::io::FileWriter* file_writer,
- const VExprContextSPtrs& output_vexpr_ctxs,
- const std::vector<TParquetSchema>& parquet_schemas,
- const TParquetCompressionType::type&
compression_type,
- const bool& parquet_disable_dictionary,
- const TParquetVersion::type& parquet_version, bool
output_object_data);
+ VParquetTransformer(doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs,
+ const std::vector<TParquetSchema>& parquet_schemas,
+ const TParquetCompressionType::type& compression_type,
+ const bool& parquet_disable_dictionary,
+ const TParquetVersion::type& parquet_version, bool
output_object_data);
- ~VParquetWriterWrapper() = default;
+ ~VParquetTransformer() = default;
- Status prepare() override;
+ Status open() override;
Status write(const Block& block) override;
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index e54c426f74..0523f0ac08 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -30,25 +30,19 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/consts.h"
#include "common/status.h"
-#include "gutil/strings/numbers.h"
#include "io/file_factory.h"
#include "io/fs/broker_file_system.h"
-#include "io/fs/file_writer.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/s3_file_system.h"
#include "io/hdfs_builder.h"
#include "runtime/buffer_control_block.h"
-#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/large_int_value.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_state.h"
-#include "runtime/types.h"
#include "service/backend_options.h"
-#include "util/metrics.h"
-#include "util/mysql_global.h"
#include "util/mysql_row_buffer.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
@@ -57,20 +51,15 @@
#include "vec/columns/column.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
-#include "vec/common/string_ref.h"
#include "vec/core/block.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
-#include "vec/runtime/vdatetime_value.h"
-#include "vec/runtime/vorc_writer.h"
-#include "vec/runtime/vparquet_writer.h"
+#include "vec/runtime/vcsv_transformer.h"
+#include "vec/runtime/vorc_transformer.h"
+#include "vec/runtime/vparquet_transformer.h"
#include "vec/sink/vresult_sink.h"
namespace doris::vectorized {
-const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
-using doris::operator<<;
VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs)
: AsyncResultWriter(output_exprs) {}
@@ -88,8 +77,7 @@ VFileResultWriter::VFileResultWriter(const ResultFileOptions*
file_opts,
_fragment_instance_id(fragment_instance_id),
_sinker(sinker),
_output_block(output_block),
- _output_row_descriptor(output_row_descriptor),
- _vfile_writer(nullptr) {
+ _output_row_descriptor(output_row_descriptor) {
_output_object_data = output_object_data;
}
@@ -158,19 +146,19 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
_file_writer_impl));
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
- // just use file writer is enough
+ _vfile_writer.reset(new VCSVTransformer(
+ _file_writer_impl.get(), _vec_output_expr_ctxs,
_output_object_data, _header_type,
+ _header, _file_opts->column_separator,
_file_opts->line_delimiter));
break;
case TFileFormatType::FORMAT_PARQUET:
- _vfile_writer.reset(new VParquetWriterWrapper(
+ _vfile_writer.reset(new VParquetTransformer(
_file_writer_impl.get(), _vec_output_expr_ctxs,
_file_opts->parquet_schemas,
_file_opts->parquet_commpression_type,
_file_opts->parquert_disable_dictionary,
_file_opts->parquet_version, _output_object_data));
- RETURN_IF_ERROR(_vfile_writer->prepare());
break;
case TFileFormatType::FORMAT_ORC:
- _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(),
_vec_output_expr_ctxs,
- _file_opts->orc_schema,
_output_object_data));
- RETURN_IF_ERROR(_vfile_writer->prepare());
+ _vfile_writer.reset(new VOrcTransformer(_file_writer_impl.get(),
_vec_output_expr_ctxs,
+ _file_opts->orc_schema,
_output_object_data));
break;
default:
return Status::InternalError("unsupported file format: {}",
_file_opts->file_format);
@@ -178,7 +166,8 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
LOG(INFO) << "create file for exporting query result. file name: " <<
file_name
<< ". query id: " << print_id(_state->query_id())
<< " format:" << _file_opts->file_format;
- return Status::OK();
+
+ return _vfile_writer->open();
}
// file name format as: my_prefix_{fragment_instance_id}_0.csv
@@ -187,7 +176,6 @@ Status VFileResultWriter::_get_next_file_name(std::string*
file_name) {
ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" <<
(_file_idx++) << "."
<< _file_format_to_name();
*file_name = ss.str();
- _header_sent = false;
if (_storage_type == TStorageBackendType::LOCAL) {
// For local file writer, the file_path is a local dir.
// Here we do a simple security verification by checking whether the
file exists.
@@ -237,215 +225,22 @@ Status VFileResultWriter::append_block(Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
- RETURN_IF_ERROR(write_csv_header());
SCOPED_TIMER(_append_row_batch_timer);
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
-
- if (_vfile_writer) {
- RETURN_IF_ERROR(_write_file(output_block));
- } else {
- RETURN_IF_ERROR(_write_csv_file(output_block));
- }
+ RETURN_IF_ERROR(_write_file(output_block));
_written_rows += block.rows();
return Status::OK();
}
Status VFileResultWriter::_write_file(const Block& block) {
- RETURN_IF_ERROR(_vfile_writer->write(block));
- // split file if exceed limit
- _current_written_bytes = _vfile_writer->written_len();
- return _create_new_file_if_exceed_size();
-}
-
-Status VFileResultWriter::_write_csv_file(const Block& block) {
- for (size_t i = 0; i < block.rows(); i++) {
- for (size_t col_id = 0; col_id < block.columns(); col_id++) {
- auto col = block.get_by_position(col_id);
- if (col.column->is_null_at(i)) {
- _plain_text_outstream << NULL_IN_CSV;
- } else {
- switch (_vec_output_expr_ctxs[col_id]->root()->type().type) {
- case TYPE_BOOLEAN:
- case TYPE_TINYINT:
- _plain_text_outstream << (int)*reinterpret_cast<const
int8_t*>(
- col.column->get_data_at(i).data);
- break;
- case TYPE_SMALLINT:
- _plain_text_outstream
- << *reinterpret_cast<const
int16_t*>(col.column->get_data_at(i).data);
- break;
- case TYPE_INT:
- _plain_text_outstream
- << *reinterpret_cast<const
int32_t*>(col.column->get_data_at(i).data);
- break;
- case TYPE_BIGINT:
- _plain_text_outstream
- << *reinterpret_cast<const
int64_t*>(col.column->get_data_at(i).data);
- break;
- case TYPE_LARGEINT:
- _plain_text_outstream
- << *reinterpret_cast<const
__int128*>(col.column->get_data_at(i).data);
- break;
- case TYPE_FLOAT: {
- char buffer[MAX_FLOAT_STR_LENGTH + 2];
- float float_value =
- *reinterpret_cast<const
float*>(col.column->get_data_at(i).data);
- buffer[0] = '\0';
- int length = FloatToBuffer(float_value,
MAX_FLOAT_STR_LENGTH, buffer);
- DCHECK(length >= 0) << "gcvt float failed, float value="
<< float_value;
- _plain_text_outstream << buffer;
- break;
- }
- case TYPE_DOUBLE: {
- // To prevent loss of precision on float and double types,
- // they are converted to strings before output.
- // For example: For a double value 27361919854.929001,
- // the direct output of using std::stringstream is
2.73619e+10,
- // and after conversion to a string, it outputs
27361919854.929001
- char buffer[MAX_DOUBLE_STR_LENGTH + 2];
- double double_value =
- *reinterpret_cast<const
double*>(col.column->get_data_at(i).data);
- buffer[0] = '\0';
- int length = DoubleToBuffer(double_value,
MAX_DOUBLE_STR_LENGTH, buffer);
- DCHECK(length >= 0) << "gcvt double failed, double value="
<< double_value;
- _plain_text_outstream << buffer;
- break;
- }
- case TYPE_DATEV2: {
- char buf[64];
- const DateV2Value<DateV2ValueType>* time_val =
- (const
DateV2Value<DateV2ValueType>*)(col.column->get_data_at(i).data);
- time_val->to_string(buf);
- _plain_text_outstream << buf;
- break;
- }
- case TYPE_DATETIMEV2: {
- char buf[64];
- const DateV2Value<DateTimeV2ValueType>* time_val =
- (const
DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i)
-
.data);
- time_val->to_string(buf,
_vec_output_expr_ctxs[col_id]->root()->type().scale);
- _plain_text_outstream << buf;
- break;
- }
- case TYPE_DATE:
- case TYPE_DATETIME: {
- char buf[64];
- const VecDateTimeValue* time_val =
- (const
VecDateTimeValue*)(col.column->get_data_at(i).data);
- time_val->to_string(buf);
- _plain_text_outstream << buf;
- break;
- }
- case TYPE_OBJECT:
- case TYPE_HLL: {
- if (!_output_object_data) {
- _plain_text_outstream << NULL_IN_CSV;
- break;
- }
- [[fallthrough]];
- }
- case TYPE_VARCHAR:
- case TYPE_CHAR:
- case TYPE_STRING: {
- auto value = col.column->get_data_at(i);
- _plain_text_outstream << value;
- break;
- }
- case TYPE_DECIMALV2: {
- const DecimalV2Value decimal_val(
- reinterpret_cast<const
PackedInt128*>(col.column->get_data_at(i).data)
- ->value);
- std::string decimal_str;
- decimal_str = decimal_val.to_string();
- _plain_text_outstream << decimal_str;
- break;
- }
- case TYPE_DECIMAL32: {
- _plain_text_outstream << col.type->to_string(*col.column,
i);
- break;
- }
- case TYPE_DECIMAL64: {
- _plain_text_outstream << col.type->to_string(*col.column,
i);
- break;
- }
- case TYPE_DECIMAL128I: {
- _plain_text_outstream << col.type->to_string(*col.column,
i);
- break;
- }
- case TYPE_ARRAY: {
- _plain_text_outstream << col.type->to_string(*col.column,
i);
- break;
- }
- case TYPE_MAP: {
- _plain_text_outstream << col.type->to_string(*col.column,
i);
- break;
- }
- case TYPE_STRUCT: {
- _plain_text_outstream << col.type->to_string(*col.column,
i);
- break;
- }
- default: {
- // not supported type, like BITMAP, just export null
- _plain_text_outstream << NULL_IN_CSV;
- }
- }
- }
- if (col_id < block.columns() - 1) {
- _plain_text_outstream << _file_opts->column_separator;
- }
- }
- _plain_text_outstream << _file_opts->line_delimiter;
- }
-
- return _flush_plain_text_outstream(true);
-}
-
-std::string VFileResultWriter::gen_types() {
- std::string types;
- int num_columns = _vec_output_expr_ctxs.size();
- for (int i = 0; i < num_columns; ++i) {
- types += type_to_string(_vec_output_expr_ctxs[i]->root()->type().type);
- if (i < num_columns - 1) {
- types += _file_opts->column_separator;
- }
- }
- types += _file_opts->line_delimiter;
- return types;
-}
-
-Status VFileResultWriter::write_csv_header() {
- if (!_header_sent && _header.size() > 0) {
- std::string tmp_header(_header);
- if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
- tmp_header += gen_types();
- }
- RETURN_IF_ERROR(_file_writer_impl->append(tmp_header));
- _header_sent = true;
- }
- return Status::OK();
-}
-
-Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
- SCOPED_TIMER(_file_write_timer);
- size_t pos = _plain_text_outstream.tellp();
- if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
- return Status::OK();
+ {
+ SCOPED_TIMER(_file_write_timer);
+ RETURN_IF_ERROR(_vfile_writer->write(block));
}
-
- const std::string& buf = _plain_text_outstream.str();
- size_t written_len = buf.size();
- RETURN_IF_ERROR(_file_writer_impl->append(buf));
- COUNTER_UPDATE(_written_data_bytes, written_len);
- _current_written_bytes += written_len;
-
- // clear the stream
- _plain_text_outstream.str("");
- _plain_text_outstream.clear();
-
// split file if exceed limit
+ _current_written_bytes = _vfile_writer->written_len();
return _create_new_file_if_exceed_size();
}
@@ -516,7 +311,7 @@ Status VFileResultWriter::_send_result() {
result->result_batch.rows.resize(1);
result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
- std::map<std::string, string> attach_infos;
+ std::map<std::string, std::string> attach_infos;
attach_infos.insert(std::make_pair("FileNumber",
std::to_string(_file_idx)));
attach_infos.insert(
std::make_pair("TotalRows",
std::to_string(_written_rows_counter->value())));
@@ -634,6 +429,4 @@ Status VFileResultWriter::close() {
return _close_file_writer(true);
}
-const string VFileResultWriter::NULL_IN_CSV = "\\N";
-
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index b56e41c377..69a26714dc 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -69,9 +69,6 @@ public:
// file result writer always return statistic result in one row
int64_t get_written_rows() const override { return 1; }
- std::string gen_types();
- Status write_csv_header();
-
void set_header_info(const std::string& header_type, const std::string&
header) {
_header_type = header_type;
_header = header;
@@ -80,11 +77,7 @@ public:
private:
Status _init(RuntimeState* state, RuntimeProfile*);
Status _write_file(const Block& block);
- Status _write_csv_file(const Block& block);
- // if buffer exceed the limit, write the data buffered in
_plain_text_outstream via file_writer
- // if eos, write the data even if buffer is not full.
- Status _flush_plain_text_outstream(bool eos);
void _init_profile(RuntimeProfile*);
Status _create_file_writer(const std::string& file_name);
@@ -106,8 +99,6 @@ private:
// delete the dir of file_path
Status _delete_dir();
- static const std::string NULL_IN_CSV;
-
RuntimeState* _state; // not owned, set when init
const ResultFileOptions* _file_opts;
TStorageBackendType::type _storage_type;
@@ -123,7 +114,6 @@ private:
// For example: bitmap_to_string() may return large volume of data.
// And the speed is relative low, in my test, is about 6.5MB/s.
std::stringstream _plain_text_outstream;
- static const size_t OUTSTREAM_BUFFER_SIZE_BYTES;
// current written bytes, used for split data
int64_t _current_written_bytes = 0;
@@ -148,13 +138,11 @@ private:
Block* _output_block = nullptr;
// set to true if the final statistic result is sent
bool _is_result_sent = false;
- bool _header_sent = false;
RowDescriptor _output_row_descriptor;
- // parquet/orc file writer
- std::unique_ptr<VFileWriterWrapper> _vfile_writer;
+ // convert block to parquet/orc/csv fomrat
+ std::unique_ptr<VFileFormatTransformer> _vfile_writer;
std::string_view _header_type;
std::string_view _header;
- std::unique_ptr<VFileResultWriter> _writer;
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]