This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/2.1-tmp by this push:
new 0ef6eabc74c [Enhancement](hive-writer) Write only regular fields to
file in the hive-writer. (#33000) (#33495)
0ef6eabc74c is described below
commit 0ef6eabc74cb5d93c3e8d62feaeb91bcca37204a
Author: Qi Chen <[email protected]>
AuthorDate: Wed Apr 10 20:37:22 2024 +0800
[Enhancement](hive-writer) Write only regular fields to file in the
hive-writer. (#33000) (#33495)
---
be/src/vec/runtime/vorc_transformer.cpp | 104 +++++++++++++++++--
be/src/vec/runtime/vorc_transformer.h | 10 +-
be/src/vec/runtime/vparquet_transformer.cpp | 32 +++++-
be/src/vec/runtime/vparquet_transformer.h | 10 +-
be/src/vec/sink/writer/vhive_partition_writer.cpp | 118 ++++------------------
be/src/vec/sink/writer/vhive_partition_writer.h | 10 +-
be/src/vec/sink/writer/vhive_table_writer.cpp | 22 +++-
be/src/vec/sink/writer/vhive_table_writer.h | 3 +
8 files changed, 184 insertions(+), 125 deletions(-)
diff --git a/be/src/vec/runtime/vorc_transformer.cpp
b/be/src/vec/runtime/vorc_transformer.cpp
index 764a97ae5bc..a07f734acac 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -99,35 +99,45 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state,
doris::io::FileWriter* fil
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_file_writer(file_writer),
_write_options(new orc::WriterOptions()),
- _schema_str(&schema),
- _schema(nullptr) {
+ _schema_str(&schema) {
_write_options->setTimezoneName(_state->timezone());
_write_options->setUseTightNumericVector(true);
}
VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
- std::unique_ptr<orc::Type> schema, bool
output_object_data,
+ std::vector<std::string> column_names, bool
output_object_data,
orc::CompressionKind compression)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_file_writer(file_writer),
+ _column_names(std::move(column_names)),
_write_options(new orc::WriterOptions()),
- _schema_str(nullptr),
- _schema(std::move(schema)) {
+ _schema_str(nullptr) {
_write_options->setTimezoneName(_state->timezone());
_write_options->setUseTightNumericVector(true);
_write_options->setCompression(compression);
}
Status VOrcTransformer::open() {
- try {
- if (_schema == nullptr && _schema_str != nullptr) {
+ if (_schema_str != nullptr) {
+ try {
_schema = orc::Type::buildTypeFromString(*_schema_str);
+ } catch (const std::exception& e) {
+ return Status::InternalError("Orc build schema from \"{}\" failed:
{}", *_schema_str,
+ e.what());
+ }
+ } else {
+ _schema = orc::createStructType();
+ for (int i = 0; i < _output_vexpr_ctxs.size(); i++) {
+ VExprSPtr column_expr = _output_vexpr_ctxs[i]->root();
+ try {
+ _schema->addStructField(_column_names[i],
_build_orc_type(column_expr->type()));
+ } catch (doris::Exception& e) {
+ return e.to_status();
+ }
}
- } catch (const std::exception& e) {
- return Status::InternalError("Orc build schema from \"{}\" failed:
{}", *_schema_str,
- e.what());
}
+
_output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
_writer = orc::createWriter(*_schema, _output_stream.get(),
*_write_options);
if (_writer == nullptr) {
@@ -136,6 +146,80 @@ Status VOrcTransformer::open() {
return Status::OK();
}
+std::unique_ptr<orc::Type> VOrcTransformer::_build_orc_type(const
TypeDescriptor& type_descriptor) {
+ std::pair<Status, std::unique_ptr<orc::Type>> result;
+ switch (type_descriptor.type) {
+ case TYPE_BOOLEAN: {
+ return orc::createPrimitiveType(orc::BOOLEAN);
+ }
+ case TYPE_TINYINT: {
+ return orc::createPrimitiveType(orc::BYTE);
+ }
+ case TYPE_SMALLINT: {
+ return orc::createPrimitiveType(orc::SHORT);
+ }
+ case TYPE_INT: {
+ return orc::createPrimitiveType(orc::INT);
+ }
+ case TYPE_BIGINT: {
+ return orc::createPrimitiveType(orc::LONG);
+ }
+ case TYPE_FLOAT: {
+ return orc::createPrimitiveType(orc::FLOAT);
+ }
+ case TYPE_DOUBLE: {
+ return orc::createPrimitiveType(orc::DOUBLE);
+ }
+ case TYPE_CHAR: {
+ return orc::createCharType(orc::CHAR, type_descriptor.len);
+ }
+ case TYPE_VARCHAR: {
+ return orc::createCharType(orc::VARCHAR, type_descriptor.len);
+ }
+ case TYPE_STRING: {
+ return orc::createPrimitiveType(orc::STRING);
+ }
+ case TYPE_BINARY: {
+ return orc::createPrimitiveType(orc::STRING);
+ }
+ case TYPE_DATEV2: {
+ return orc::createPrimitiveType(orc::DATE);
+ }
+ case TYPE_DATETIMEV2: {
+ return orc::createPrimitiveType(orc::TIMESTAMP);
+ }
+ case TYPE_DECIMAL32: {
+ return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
+ }
+ case TYPE_DECIMAL64: {
+ return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
+ }
+ case TYPE_DECIMAL128I: {
+ return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
+ }
+ case TYPE_STRUCT: {
+ std::unique_ptr<orc::Type> struct_type = orc::createStructType();
+ for (int j = 0; j < type_descriptor.children.size(); ++j) {
+ struct_type->addStructField(type_descriptor.field_names[j],
+
_build_orc_type(type_descriptor.children[j]));
+ }
+ return struct_type;
+ }
+ case TYPE_ARRAY: {
+ return
orc::createListType(_build_orc_type(type_descriptor.children[0]));
+ }
+ case TYPE_MAP: {
+ return orc::createMapType(_build_orc_type(type_descriptor.children[0]),
+
_build_orc_type(type_descriptor.children[1]));
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} to build orc type",
+ type_descriptor.debug_string());
+ }
+ }
+}
+
std::unique_ptr<orc::ColumnVectorBatch>
VOrcTransformer::_create_row_batch(size_t sz) {
return _writer->createRowBatch(sz);
}
diff --git a/be/src/vec/runtime/vorc_transformer.h
b/be/src/vec/runtime/vorc_transformer.h
index 8cfc956c0cd..1f78e549a1b 100644
--- a/be/src/vec/runtime/vorc_transformer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -79,8 +79,9 @@ public:
bool output_object_data);
VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
- const VExprContextSPtrs& output_vexpr_ctxs,
std::unique_ptr<orc::Type> schema,
- bool output_object_data, orc::CompressionKind compression);
+ const VExprContextSPtrs& output_vexpr_ctxs,
+ std::vector<std::string> column_names, bool
output_object_data,
+ orc::CompressionKind compression);
~VOrcTransformer() = default;
@@ -93,6 +94,8 @@ public:
int64_t written_len() override;
private:
+ std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor&
type_descriptor);
+
std::unique_ptr<orc::ColumnVectorBatch> _create_row_batch(size_t sz);
// The size of subtypes of a complex type may be different from
// the size of the complex type itself,
@@ -101,6 +104,7 @@ private:
orc::ColumnVectorBatch* orc_col_batch);
doris::io::FileWriter* _file_writer = nullptr;
+ std::vector<std::string> _column_names;
std::unique_ptr<orc::OutputStream> _output_stream;
std::unique_ptr<orc::WriterOptions> _write_options;
const std::string* _schema_str;
@@ -117,4 +121,4 @@ private:
static constexpr size_t BUFFER_RESERVED_SIZE = 40;
};
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp
b/be/src/vec/runtime/vparquet_transformer.cpp
index b831b1e2dc5..0e5800750b0 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -197,6 +197,22 @@ void
ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
}
}
+VParquetTransformer::VParquetTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs&
output_vexpr_ctxs,
+ std::vector<std::string> column_names,
+ TParquetCompressionType::type
compression_type,
+ bool parquet_disable_dictionary,
+ TParquetVersion::type parquet_version,
+ bool output_object_data)
+ : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+ _column_names(std::move(column_names)),
+ _parquet_schemas(nullptr),
+ _compression_type(compression_type),
+ _parquet_disable_dictionary(parquet_disable_dictionary),
+ _parquet_version(parquet_version) {
+ _outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
+}
+
VParquetTransformer::VParquetTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
const VExprContextSPtrs&
output_vexpr_ctxs,
const std::vector<TParquetSchema>&
parquet_schemas,
@@ -205,7 +221,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState*
state, doris::io::FileWri
TParquetVersion::type parquet_version,
bool output_object_data)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
- _parquet_schemas(parquet_schemas),
+ _parquet_schemas(&parquet_schemas),
_compression_type(compression_type),
_parquet_disable_dictionary(parquet_disable_dictionary),
_parquet_version(parquet_version) {
@@ -238,10 +254,16 @@ Status VParquetTransformer::_parse_schema() {
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> type;
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(),
&type));
- std::shared_ptr<arrow::Field> field =
- arrow::field(_parquet_schemas[i].schema_column_name, type,
- _output_vexpr_ctxs[i]->root()->is_nullable());
- fields.emplace_back(field);
+ if (_parquet_schemas != nullptr) {
+ std::shared_ptr<arrow::Field> field =
+
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
+ _output_vexpr_ctxs[i]->root()->is_nullable());
+ fields.emplace_back(field);
+ } else {
+ std::shared_ptr<arrow::Field> field = arrow::field(
+ _column_names[i], type,
_output_vexpr_ctxs[i]->root()->is_nullable());
+ fields.emplace_back(field);
+ }
}
_arrow_schema = arrow::schema(std::move(fields));
return Status::OK();
diff --git a/be/src/vec/runtime/vparquet_transformer.h
b/be/src/vec/runtime/vparquet_transformer.h
index c9d2de59a51..78be0959363 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -89,6 +89,13 @@ public:
// a wrapper of parquet output stream
class VParquetTransformer final : public VFileFormatTransformer {
public:
+ VParquetTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs,
+ std::vector<std::string> column_names,
+ TParquetCompressionType::type compression_type,
+ bool parquet_disable_dictionary, TParquetVersion::type
parquet_version,
+ bool output_object_data);
+
VParquetTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
const std::vector<TParquetSchema>& parquet_schemas,
@@ -117,7 +124,8 @@ private:
std::unique_ptr<parquet::arrow::FileWriter> _writer;
std::shared_ptr<arrow::Schema> _arrow_schema;
- const std::vector<TParquetSchema>& _parquet_schemas;
+ std::vector<std::string> _column_names;
+ const std::vector<TParquetSchema>* _parquet_schemas = nullptr;
const TParquetCompressionType::type _compression_type;
const bool _parquet_disable_dictionary;
const TParquetVersion::type _parquet_version;
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index ca6e76862ba..7f9a0dd1b1e 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -20,6 +20,7 @@
#include "io/file_factory.h"
#include "io/fs/file_system.h"
#include "runtime/runtime_state.h"
+#include "vec/columns/column_map.h"
#include "vec/core/materialize_block.h"
#include "vec/runtime/vorc_transformer.h"
#include "vec/runtime/vparquet_transformer.h"
@@ -28,14 +29,17 @@ namespace doris {
namespace vectorized {
VHivePartitionWriter::VHivePartitionWriter(
- const TDataSink& t_sink, const std::string partition_name,
TUpdateMode::type update_mode,
- const VExprContextSPtrs& output_expr_ctxs, const
std::vector<THiveColumn>& columns,
- WriteInfo write_info, const std::string file_name,
TFileFormatType::type file_format_type,
+ const TDataSink& t_sink, std::string partition_name, TUpdateMode::type
update_mode,
+ const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs&
write_output_expr_ctxs,
+ const std::set<size_t>& non_write_columns_indices, const
std::vector<THiveColumn>& columns,
+ WriteInfo write_info, std::string file_name, TFileFormatType::type
file_format_type,
TFileCompressType::type hive_compress_type,
const std::map<std::string, std::string>& hadoop_conf)
: _partition_name(std::move(partition_name)),
_update_mode(update_mode),
_vec_output_expr_ctxs(output_expr_ctxs),
+ _write_output_expr_ctxs(write_output_expr_ctxs),
+ _non_write_columns_indices(non_write_columns_indices),
_columns(columns),
_write_info(std::move(write_info)),
_file_name(std::move(file_name)),
@@ -53,6 +57,12 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
_write_info.file_type, state->exec_env(), broker_addresses,
_hadoop_conf,
fmt::format("{}/{}", _write_info.write_path, _file_name), 0,
_file_writer));
+ std::vector<std::string> column_names;
+ column_names.reserve(_columns.size());
+ for (int i = 0; i < _columns.size(); i++) {
+ column_names.push_back(_columns[i].name);
+ }
+
switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
bool parquet_disable_dictionary = false;
@@ -75,16 +85,8 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
to_string(_hive_compress_type));
}
}
- std::vector<TParquetSchema> parquet_schemas;
- parquet_schemas.reserve(_columns.size());
- for (int i = 0; i < _columns.size(); i++) {
- VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
- TParquetSchema parquet_schema;
- parquet_schema.schema_column_name = _columns[i].name;
- parquet_schemas.emplace_back(std::move(parquet_schema));
- }
_file_format_transformer.reset(new VParquetTransformer(
- state, _file_writer.get(), _vec_output_expr_ctxs,
parquet_schemas,
+ state, _file_writer.get(), _write_output_expr_ctxs,
std::move(column_names),
parquet_compression_type, parquet_disable_dictionary,
TParquetVersion::PARQUET_1_0,
false));
return _file_format_transformer->open();
@@ -112,21 +114,10 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
return Status::InternalError("Unsupported type {} with orc",
_hive_compress_type);
}
}
- orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
-
- std::unique_ptr<orc::Type> root_schema = orc::createStructType();
- for (int i = 0; i < _columns.size(); i++) {
- VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
- try {
- root_schema->addStructField(_columns[i].name,
_build_orc_type(column_expr->type()));
- } catch (doris::Exception& e) {
- return e.to_status();
- }
- }
_file_format_transformer.reset(
- new VOrcTransformer(state, _file_writer.get(),
_vec_output_expr_ctxs,
- std::move(root_schema), false,
orc_compression_type));
+ new VOrcTransformer(state, _file_writer.get(),
_write_output_expr_ctxs,
+ std::move(column_names), false,
orc_compression_type));
return _file_format_transformer->open();
}
default: {
@@ -164,81 +155,6 @@ Status VHivePartitionWriter::write(vectorized::Block&
block, vectorized::IColumn
return Status::OK();
}
-std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type(
- const TypeDescriptor& type_descriptor) {
- std::pair<Status, std::unique_ptr<orc::Type>> result;
- switch (type_descriptor.type) {
- case TYPE_BOOLEAN: {
- return orc::createPrimitiveType(orc::BOOLEAN);
- }
- case TYPE_TINYINT: {
- return orc::createPrimitiveType(orc::BYTE);
- }
- case TYPE_SMALLINT: {
- return orc::createPrimitiveType(orc::SHORT);
- }
- case TYPE_INT: {
- return orc::createPrimitiveType(orc::INT);
- }
- case TYPE_BIGINT: {
- return orc::createPrimitiveType(orc::LONG);
- }
- case TYPE_FLOAT: {
- return orc::createPrimitiveType(orc::FLOAT);
- }
- case TYPE_DOUBLE: {
- return orc::createPrimitiveType(orc::DOUBLE);
- }
- case TYPE_CHAR: {
- return orc::createCharType(orc::CHAR, type_descriptor.len);
- }
- case TYPE_VARCHAR: {
- return orc::createCharType(orc::VARCHAR, type_descriptor.len);
- }
- case TYPE_STRING: {
- return orc::createPrimitiveType(orc::STRING);
- }
- case TYPE_BINARY: {
- return orc::createPrimitiveType(orc::STRING);
- }
- case TYPE_DATEV2: {
- return orc::createPrimitiveType(orc::DATE);
- }
- case TYPE_DATETIMEV2: {
- return orc::createPrimitiveType(orc::TIMESTAMP);
- }
- case TYPE_DECIMAL32: {
- return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
- }
- case TYPE_DECIMAL64: {
- return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
- }
- case TYPE_DECIMAL128I: {
- return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
- }
- case TYPE_STRUCT: {
- std::unique_ptr<orc::Type> struct_type = orc::createStructType();
- for (int j = 0; j < type_descriptor.children.size(); ++j) {
- struct_type->addStructField(type_descriptor.field_names[j],
-
_build_orc_type(type_descriptor.children[j]));
- }
- return struct_type;
- }
- case TYPE_ARRAY: {
- return
orc::createListType(_build_orc_type(type_descriptor.children[0]));
- }
- case TYPE_MAP: {
- return orc::createMapType(_build_orc_type(type_descriptor.children[0]),
-
_build_orc_type(type_descriptor.children[1]));
- }
- default: {
- throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
- "Unsupported type {} to build orc type",
- type_descriptor.debug_string());
- }
- }
-}
-
Status
VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block&
input_block,
const
vectorized::IColumn::Filter* filter,
doris::vectorized::Block* output_block) {
@@ -263,6 +179,8 @@ Status
VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Blo
Block::filter_block_internal(output_block, columns_to_filter, *filter);
+ output_block->erase(_non_write_columns_indices);
+
return status;
}
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h
b/be/src/vec/sink/writer/vhive_partition_writer.h
index d3c52800bea..f6d2caec94d 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.h
+++ b/be/src/vec/sink/writer/vhive_partition_writer.h
@@ -43,10 +43,12 @@ public:
TFileType::type file_type;
};
- VHivePartitionWriter(const TDataSink& t_sink, const std::string
partition_name,
+ VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
TUpdateMode::type update_mode, const
VExprContextSPtrs& output_expr_ctxs,
+ const VExprContextSPtrs& write_output_expr_ctxs,
+ const std::set<size_t>& non_write_columns_indices,
const std::vector<THiveColumn>& columns, WriteInfo
write_info,
- const std::string file_name, TFileFormatType::type
file_format_type,
+ std::string file_name, TFileFormatType::type
file_format_type,
TFileCompressType::type hive_compress_type,
const std::map<std::string, std::string>&
hadoop_conf);
@@ -61,8 +63,6 @@ public:
inline size_t written_len() { return
_file_format_transformer->written_len(); }
private:
- std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor&
type_descriptor);
-
Status _projection_and_filter_block(doris::vectorized::Block& input_block,
const vectorized::IColumn::Filter*
filter,
doris::vectorized::Block*
output_block);
@@ -79,6 +79,8 @@ private:
size_t _input_size_in_bytes = 0;
const VExprContextSPtrs& _vec_output_expr_ctxs;
+ const VExprContextSPtrs& _write_output_expr_ctxs;
+ const std::set<size_t>& _non_write_columns_indices;
const std::vector<THiveColumn>& _columns;
WriteInfo _write_info;
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 4ea5fcbf4ed..e56090773b5 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -43,8 +43,25 @@ Status VHiveTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
_profile = profile;
for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
- if (_t_sink.hive_table_sink.columns[i].column_type ==
THiveColumnType::PARTITION_KEY) {
+ switch (_t_sink.hive_table_sink.columns[i].column_type) {
+ case THiveColumnType::PARTITION_KEY: {
_partition_columns_input_index.emplace_back(i);
+ _non_write_columns_indices.insert(i);
+ break;
+ }
+ case THiveColumnType::REGULAR: {
+ _write_output_vexpr_ctxs.push_back(_vec_output_expr_ctxs[i]);
+ break;
+ }
+ case THiveColumnType::SYNTHESIZED: {
+ _non_write_columns_indices.insert(i);
+ break;
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Illegal hive column type {}, it should not
be here.",
+
to_string(_t_sink.hive_table_sink.columns[i].column_type));
+ }
}
}
return Status::OK();
@@ -232,7 +249,8 @@ std::shared_ptr<VHivePartitionWriter>
VHiveTableWriter::_create_partition_writer
return std::make_shared<VHivePartitionWriter>(
_t_sink, std::move(partition_name), update_mode,
_vec_output_expr_ctxs,
- hive_table_sink.columns, std::move(write_info),
+ _write_output_vexpr_ctxs, _non_write_columns_indices,
hive_table_sink.columns,
+ std::move(write_info),
fmt::format("{}{}", _compute_file_name(),
_get_file_extension(file_format_type,
write_compress_type)),
file_format_type, write_compress_type,
hive_table_sink.hadoop_config);
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h
b/be/src/vec/sink/writer/vhive_table_writer.h
index a4681b32e3f..9f48f6afde1 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -68,7 +68,10 @@ private:
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
std::vector<int> _partition_columns_input_index;
+ std::set<size_t> _non_write_columns_indices;
std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>>
_partitions_to_writers;
+
+ VExprContextSPtrs _write_output_vexpr_ctxs;
};
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]