This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/2.1-tmp by this push:
     new 0ef6eabc74c [Enhancement](hive-writer) Write only regular fields to 
file in the hive-writer. (#33000) (#33495)
0ef6eabc74c is described below

commit 0ef6eabc74cb5d93c3e8d62feaeb91bcca37204a
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Wed Apr 10 20:37:22 2024 +0800

    [Enhancement](hive-writer) Write only regular fields to file in the 
hive-writer. (#33000) (#33495)
---
 be/src/vec/runtime/vorc_transformer.cpp           | 104 +++++++++++++++++--
 be/src/vec/runtime/vorc_transformer.h             |  10 +-
 be/src/vec/runtime/vparquet_transformer.cpp       |  32 +++++-
 be/src/vec/runtime/vparquet_transformer.h         |  10 +-
 be/src/vec/sink/writer/vhive_partition_writer.cpp | 118 ++++------------------
 be/src/vec/sink/writer/vhive_partition_writer.h   |  10 +-
 be/src/vec/sink/writer/vhive_table_writer.cpp     |  22 +++-
 be/src/vec/sink/writer/vhive_table_writer.h       |   3 +
 8 files changed, 184 insertions(+), 125 deletions(-)

diff --git a/be/src/vec/runtime/vorc_transformer.cpp 
b/be/src/vec/runtime/vorc_transformer.cpp
index 764a97ae5bc..a07f734acac 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -99,35 +99,45 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state, 
doris::io::FileWriter* fil
         : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
           _file_writer(file_writer),
           _write_options(new orc::WriterOptions()),
-          _schema_str(&schema),
-          _schema(nullptr) {
+          _schema_str(&schema) {
     _write_options->setTimezoneName(_state->timezone());
     _write_options->setUseTightNumericVector(true);
 }
 
 VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* 
file_writer,
                                  const VExprContextSPtrs& output_vexpr_ctxs,
-                                 std::unique_ptr<orc::Type> schema, bool 
output_object_data,
+                                 std::vector<std::string> column_names, bool 
output_object_data,
                                  orc::CompressionKind compression)
         : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
           _file_writer(file_writer),
+          _column_names(std::move(column_names)),
           _write_options(new orc::WriterOptions()),
-          _schema_str(nullptr),
-          _schema(std::move(schema)) {
+          _schema_str(nullptr) {
     _write_options->setTimezoneName(_state->timezone());
     _write_options->setUseTightNumericVector(true);
     _write_options->setCompression(compression);
 }
 
 Status VOrcTransformer::open() {
-    try {
-        if (_schema == nullptr && _schema_str != nullptr) {
+    if (_schema_str != nullptr) {
+        try {
             _schema = orc::Type::buildTypeFromString(*_schema_str);
+        } catch (const std::exception& e) {
+            return Status::InternalError("Orc build schema from \"{}\" failed: 
{}", *_schema_str,
+                                         e.what());
+        }
+    } else {
+        _schema = orc::createStructType();
+        for (int i = 0; i < _output_vexpr_ctxs.size(); i++) {
+            VExprSPtr column_expr = _output_vexpr_ctxs[i]->root();
+            try {
+                _schema->addStructField(_column_names[i], 
_build_orc_type(column_expr->type()));
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
         }
-    } catch (const std::exception& e) {
-        return Status::InternalError("Orc build schema from \"{}\" failed: 
{}", *_schema_str,
-                                     e.what());
     }
+
     _output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
     _writer = orc::createWriter(*_schema, _output_stream.get(), 
*_write_options);
     if (_writer == nullptr) {
@@ -136,6 +146,80 @@ Status VOrcTransformer::open() {
     return Status::OK();
 }
 
+std::unique_ptr<orc::Type> VOrcTransformer::_build_orc_type(const 
TypeDescriptor& type_descriptor) {
+    std::pair<Status, std::unique_ptr<orc::Type>> result;
+    switch (type_descriptor.type) {
+    case TYPE_BOOLEAN: {
+        return orc::createPrimitiveType(orc::BOOLEAN);
+    }
+    case TYPE_TINYINT: {
+        return orc::createPrimitiveType(orc::BYTE);
+    }
+    case TYPE_SMALLINT: {
+        return orc::createPrimitiveType(orc::SHORT);
+    }
+    case TYPE_INT: {
+        return orc::createPrimitiveType(orc::INT);
+    }
+    case TYPE_BIGINT: {
+        return orc::createPrimitiveType(orc::LONG);
+    }
+    case TYPE_FLOAT: {
+        return orc::createPrimitiveType(orc::FLOAT);
+    }
+    case TYPE_DOUBLE: {
+        return orc::createPrimitiveType(orc::DOUBLE);
+    }
+    case TYPE_CHAR: {
+        return orc::createCharType(orc::CHAR, type_descriptor.len);
+    }
+    case TYPE_VARCHAR: {
+        return orc::createCharType(orc::VARCHAR, type_descriptor.len);
+    }
+    case TYPE_STRING: {
+        return orc::createPrimitiveType(orc::STRING);
+    }
+    case TYPE_BINARY: {
+        return orc::createPrimitiveType(orc::STRING);
+    }
+    case TYPE_DATEV2: {
+        return orc::createPrimitiveType(orc::DATE);
+    }
+    case TYPE_DATETIMEV2: {
+        return orc::createPrimitiveType(orc::TIMESTAMP);
+    }
+    case TYPE_DECIMAL32: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_DECIMAL64: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_DECIMAL128I: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_STRUCT: {
+        std::unique_ptr<orc::Type> struct_type = orc::createStructType();
+        for (int j = 0; j < type_descriptor.children.size(); ++j) {
+            struct_type->addStructField(type_descriptor.field_names[j],
+                                        
_build_orc_type(type_descriptor.children[j]));
+        }
+        return struct_type;
+    }
+    case TYPE_ARRAY: {
+        return 
orc::createListType(_build_orc_type(type_descriptor.children[0]));
+    }
+    case TYPE_MAP: {
+        return orc::createMapType(_build_orc_type(type_descriptor.children[0]),
+                                  
_build_orc_type(type_descriptor.children[1]));
+    }
+    default: {
+        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                               "Unsupported type {} to build orc type",
+                               type_descriptor.debug_string());
+    }
+    }
+}
+
 std::unique_ptr<orc::ColumnVectorBatch> 
VOrcTransformer::_create_row_batch(size_t sz) {
     return _writer->createRowBatch(sz);
 }
diff --git a/be/src/vec/runtime/vorc_transformer.h 
b/be/src/vec/runtime/vorc_transformer.h
index 8cfc956c0cd..1f78e549a1b 100644
--- a/be/src/vec/runtime/vorc_transformer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -79,8 +79,9 @@ public:
                     bool output_object_data);
 
     VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
-                    const VExprContextSPtrs& output_vexpr_ctxs, 
std::unique_ptr<orc::Type> schema,
-                    bool output_object_data, orc::CompressionKind compression);
+                    const VExprContextSPtrs& output_vexpr_ctxs,
+                    std::vector<std::string> column_names, bool 
output_object_data,
+                    orc::CompressionKind compression);
 
     ~VOrcTransformer() = default;
 
@@ -93,6 +94,8 @@ public:
     int64_t written_len() override;
 
 private:
+    std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& 
type_descriptor);
+
     std::unique_ptr<orc::ColumnVectorBatch> _create_row_batch(size_t sz);
     // The size of subtypes of a complex type may be different from
     // the size of the complex type itself,
@@ -101,6 +104,7 @@ private:
                              orc::ColumnVectorBatch* orc_col_batch);
 
     doris::io::FileWriter* _file_writer = nullptr;
+    std::vector<std::string> _column_names;
     std::unique_ptr<orc::OutputStream> _output_stream;
     std::unique_ptr<orc::WriterOptions> _write_options;
     const std::string* _schema_str;
@@ -117,4 +121,4 @@ private:
     static constexpr size_t BUFFER_RESERVED_SIZE = 40;
 };
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index b831b1e2dc5..0e5800750b0 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -197,6 +197,22 @@ void 
ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
     }
 }
 
+VParquetTransformer::VParquetTransformer(RuntimeState* state, 
doris::io::FileWriter* file_writer,
+                                         const VExprContextSPtrs& 
output_vexpr_ctxs,
+                                         std::vector<std::string> column_names,
+                                         TParquetCompressionType::type 
compression_type,
+                                         bool parquet_disable_dictionary,
+                                         TParquetVersion::type parquet_version,
+                                         bool output_object_data)
+        : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+          _column_names(std::move(column_names)),
+          _parquet_schemas(nullptr),
+          _compression_type(compression_type),
+          _parquet_disable_dictionary(parquet_disable_dictionary),
+          _parquet_version(parquet_version) {
+    _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
+}
+
 VParquetTransformer::VParquetTransformer(RuntimeState* state, 
doris::io::FileWriter* file_writer,
                                          const VExprContextSPtrs& 
output_vexpr_ctxs,
                                          const std::vector<TParquetSchema>& 
parquet_schemas,
@@ -205,7 +221,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* 
state, doris::io::FileWri
                                          TParquetVersion::type parquet_version,
                                          bool output_object_data)
         : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
-          _parquet_schemas(parquet_schemas),
+          _parquet_schemas(&parquet_schemas),
           _compression_type(compression_type),
           _parquet_disable_dictionary(parquet_disable_dictionary),
           _parquet_version(parquet_version) {
@@ -238,10 +254,16 @@ Status VParquetTransformer::_parse_schema() {
     for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
         std::shared_ptr<arrow::DataType> type;
         
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), 
&type));
-        std::shared_ptr<arrow::Field> field =
-                arrow::field(_parquet_schemas[i].schema_column_name, type,
-                             _output_vexpr_ctxs[i]->root()->is_nullable());
-        fields.emplace_back(field);
+        if (_parquet_schemas != nullptr) {
+            std::shared_ptr<arrow::Field> field =
+                    
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
+                                 _output_vexpr_ctxs[i]->root()->is_nullable());
+            fields.emplace_back(field);
+        } else {
+            std::shared_ptr<arrow::Field> field = arrow::field(
+                    _column_names[i], type, 
_output_vexpr_ctxs[i]->root()->is_nullable());
+            fields.emplace_back(field);
+        }
     }
     _arrow_schema = arrow::schema(std::move(fields));
     return Status::OK();
diff --git a/be/src/vec/runtime/vparquet_transformer.h 
b/be/src/vec/runtime/vparquet_transformer.h
index c9d2de59a51..78be0959363 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -89,6 +89,13 @@ public:
 // a wrapper of parquet output stream
 class VParquetTransformer final : public VFileFormatTransformer {
 public:
+    VParquetTransformer(RuntimeState* state, doris::io::FileWriter* 
file_writer,
+                        const VExprContextSPtrs& output_vexpr_ctxs,
+                        std::vector<std::string> column_names,
+                        TParquetCompressionType::type compression_type,
+                        bool parquet_disable_dictionary, TParquetVersion::type 
parquet_version,
+                        bool output_object_data);
+
     VParquetTransformer(RuntimeState* state, doris::io::FileWriter* 
file_writer,
                         const VExprContextSPtrs& output_vexpr_ctxs,
                         const std::vector<TParquetSchema>& parquet_schemas,
@@ -117,7 +124,8 @@ private:
     std::unique_ptr<parquet::arrow::FileWriter> _writer;
     std::shared_ptr<arrow::Schema> _arrow_schema;
 
-    const std::vector<TParquetSchema>& _parquet_schemas;
+    std::vector<std::string> _column_names;
+    const std::vector<TParquetSchema>* _parquet_schemas = nullptr;
     const TParquetCompressionType::type _compression_type;
     const bool _parquet_disable_dictionary;
     const TParquetVersion::type _parquet_version;
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index ca6e76862ba..7f9a0dd1b1e 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -20,6 +20,7 @@
 #include "io/file_factory.h"
 #include "io/fs/file_system.h"
 #include "runtime/runtime_state.h"
+#include "vec/columns/column_map.h"
 #include "vec/core/materialize_block.h"
 #include "vec/runtime/vorc_transformer.h"
 #include "vec/runtime/vparquet_transformer.h"
@@ -28,14 +29,17 @@ namespace doris {
 namespace vectorized {
 
 VHivePartitionWriter::VHivePartitionWriter(
-        const TDataSink& t_sink, const std::string partition_name, 
TUpdateMode::type update_mode,
-        const VExprContextSPtrs& output_expr_ctxs, const 
std::vector<THiveColumn>& columns,
-        WriteInfo write_info, const std::string file_name, 
TFileFormatType::type file_format_type,
+        const TDataSink& t_sink, std::string partition_name, TUpdateMode::type 
update_mode,
+        const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs& 
write_output_expr_ctxs,
+        const std::set<size_t>& non_write_columns_indices, const 
std::vector<THiveColumn>& columns,
+        WriteInfo write_info, std::string file_name, TFileFormatType::type 
file_format_type,
         TFileCompressType::type hive_compress_type,
         const std::map<std::string, std::string>& hadoop_conf)
         : _partition_name(std::move(partition_name)),
           _update_mode(update_mode),
           _vec_output_expr_ctxs(output_expr_ctxs),
+          _write_output_expr_ctxs(write_output_expr_ctxs),
+          _non_write_columns_indices(non_write_columns_indices),
           _columns(columns),
           _write_info(std::move(write_info)),
           _file_name(std::move(file_name)),
@@ -53,6 +57,12 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
             _write_info.file_type, state->exec_env(), broker_addresses, 
_hadoop_conf,
             fmt::format("{}/{}", _write_info.write_path, _file_name), 0, 
_file_writer));
 
+    std::vector<std::string> column_names;
+    column_names.reserve(_columns.size());
+    for (int i = 0; i < _columns.size(); i++) {
+        column_names.push_back(_columns[i].name);
+    }
+
     switch (_file_format_type) {
     case TFileFormatType::FORMAT_PARQUET: {
         bool parquet_disable_dictionary = false;
@@ -75,16 +85,8 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
                                          to_string(_hive_compress_type));
         }
         }
-        std::vector<TParquetSchema> parquet_schemas;
-        parquet_schemas.reserve(_columns.size());
-        for (int i = 0; i < _columns.size(); i++) {
-            VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
-            TParquetSchema parquet_schema;
-            parquet_schema.schema_column_name = _columns[i].name;
-            parquet_schemas.emplace_back(std::move(parquet_schema));
-        }
         _file_format_transformer.reset(new VParquetTransformer(
-                state, _file_writer.get(), _vec_output_expr_ctxs, 
parquet_schemas,
+                state, _file_writer.get(), _write_output_expr_ctxs, 
std::move(column_names),
                 parquet_compression_type, parquet_disable_dictionary, 
TParquetVersion::PARQUET_1_0,
                 false));
         return _file_format_transformer->open();
@@ -112,21 +114,10 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
             return Status::InternalError("Unsupported type {} with orc", 
_hive_compress_type);
         }
         }
-        orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
-
-        std::unique_ptr<orc::Type> root_schema = orc::createStructType();
-        for (int i = 0; i < _columns.size(); i++) {
-            VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
-            try {
-                root_schema->addStructField(_columns[i].name, 
_build_orc_type(column_expr->type()));
-            } catch (doris::Exception& e) {
-                return e.to_status();
-            }
-        }
 
         _file_format_transformer.reset(
-                new VOrcTransformer(state, _file_writer.get(), 
_vec_output_expr_ctxs,
-                                    std::move(root_schema), false, 
orc_compression_type));
+                new VOrcTransformer(state, _file_writer.get(), 
_write_output_expr_ctxs,
+                                    std::move(column_names), false, 
orc_compression_type));
         return _file_format_transformer->open();
     }
     default: {
@@ -164,81 +155,6 @@ Status VHivePartitionWriter::write(vectorized::Block& 
block, vectorized::IColumn
     return Status::OK();
 }
 
-std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type(
-        const TypeDescriptor& type_descriptor) {
-    std::pair<Status, std::unique_ptr<orc::Type>> result;
-    switch (type_descriptor.type) {
-    case TYPE_BOOLEAN: {
-        return orc::createPrimitiveType(orc::BOOLEAN);
-    }
-    case TYPE_TINYINT: {
-        return orc::createPrimitiveType(orc::BYTE);
-    }
-    case TYPE_SMALLINT: {
-        return orc::createPrimitiveType(orc::SHORT);
-    }
-    case TYPE_INT: {
-        return orc::createPrimitiveType(orc::INT);
-    }
-    case TYPE_BIGINT: {
-        return orc::createPrimitiveType(orc::LONG);
-    }
-    case TYPE_FLOAT: {
-        return orc::createPrimitiveType(orc::FLOAT);
-    }
-    case TYPE_DOUBLE: {
-        return orc::createPrimitiveType(orc::DOUBLE);
-    }
-    case TYPE_CHAR: {
-        return orc::createCharType(orc::CHAR, type_descriptor.len);
-    }
-    case TYPE_VARCHAR: {
-        return orc::createCharType(orc::VARCHAR, type_descriptor.len);
-    }
-    case TYPE_STRING: {
-        return orc::createPrimitiveType(orc::STRING);
-    }
-    case TYPE_BINARY: {
-        return orc::createPrimitiveType(orc::STRING);
-    }
-    case TYPE_DATEV2: {
-        return orc::createPrimitiveType(orc::DATE);
-    }
-    case TYPE_DATETIMEV2: {
-        return orc::createPrimitiveType(orc::TIMESTAMP);
-    }
-    case TYPE_DECIMAL32: {
-        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
-    }
-    case TYPE_DECIMAL64: {
-        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
-    }
-    case TYPE_DECIMAL128I: {
-        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
-    }
-    case TYPE_STRUCT: {
-        std::unique_ptr<orc::Type> struct_type = orc::createStructType();
-        for (int j = 0; j < type_descriptor.children.size(); ++j) {
-            struct_type->addStructField(type_descriptor.field_names[j],
-                                        
_build_orc_type(type_descriptor.children[j]));
-        }
-        return struct_type;
-    }
-    case TYPE_ARRAY: {
-        return 
orc::createListType(_build_orc_type(type_descriptor.children[0]));
-    }
-    case TYPE_MAP: {
-        return orc::createMapType(_build_orc_type(type_descriptor.children[0]),
-                                  
_build_orc_type(type_descriptor.children[1]));
-    }
-    default: {
-        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
-                               "Unsupported type {} to build orc type",
-                               type_descriptor.debug_string());
-    }
-    }
-}
-
 Status 
VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& 
input_block,
                                                           const 
vectorized::IColumn::Filter* filter,
                                                           
doris::vectorized::Block* output_block) {
@@ -263,6 +179,8 @@ Status 
VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Blo
 
     Block::filter_block_internal(output_block, columns_to_filter, *filter);
 
+    output_block->erase(_non_write_columns_indices);
+
     return status;
 }
 
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h 
b/be/src/vec/sink/writer/vhive_partition_writer.h
index d3c52800bea..f6d2caec94d 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.h
+++ b/be/src/vec/sink/writer/vhive_partition_writer.h
@@ -43,10 +43,12 @@ public:
         TFileType::type file_type;
     };
 
-    VHivePartitionWriter(const TDataSink& t_sink, const std::string 
partition_name,
+    VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
                          TUpdateMode::type update_mode, const 
VExprContextSPtrs& output_expr_ctxs,
+                         const VExprContextSPtrs& write_output_expr_ctxs,
+                         const std::set<size_t>& non_write_columns_indices,
                          const std::vector<THiveColumn>& columns, WriteInfo 
write_info,
-                         const std::string file_name, TFileFormatType::type 
file_format_type,
+                         std::string file_name, TFileFormatType::type 
file_format_type,
                          TFileCompressType::type hive_compress_type,
                          const std::map<std::string, std::string>& 
hadoop_conf);
 
@@ -61,8 +63,6 @@ public:
     inline size_t written_len() { return 
_file_format_transformer->written_len(); }
 
 private:
-    std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& 
type_descriptor);
-
     Status _projection_and_filter_block(doris::vectorized::Block& input_block,
                                         const vectorized::IColumn::Filter* 
filter,
                                         doris::vectorized::Block* 
output_block);
@@ -79,6 +79,8 @@ private:
     size_t _input_size_in_bytes = 0;
 
     const VExprContextSPtrs& _vec_output_expr_ctxs;
+    const VExprContextSPtrs& _write_output_expr_ctxs;
+    const std::set<size_t>& _non_write_columns_indices;
 
     const std::vector<THiveColumn>& _columns;
     WriteInfo _write_info;
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp 
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 4ea5fcbf4ed..e56090773b5 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -43,8 +43,25 @@ Status VHiveTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
     _profile = profile;
 
     for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
-        if (_t_sink.hive_table_sink.columns[i].column_type == 
THiveColumnType::PARTITION_KEY) {
+        switch (_t_sink.hive_table_sink.columns[i].column_type) {
+        case THiveColumnType::PARTITION_KEY: {
             _partition_columns_input_index.emplace_back(i);
+            _non_write_columns_indices.insert(i);
+            break;
+        }
+        case THiveColumnType::REGULAR: {
+            _write_output_vexpr_ctxs.push_back(_vec_output_expr_ctxs[i]);
+            break;
+        }
+        case THiveColumnType::SYNTHESIZED: {
+            _non_write_columns_indices.insert(i);
+            break;
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Illegal hive column type {}, it should not 
be here.",
+                                   
to_string(_t_sink.hive_table_sink.columns[i].column_type));
+        }
         }
     }
     return Status::OK();
@@ -232,7 +249,8 @@ std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer
 
     return std::make_shared<VHivePartitionWriter>(
             _t_sink, std::move(partition_name), update_mode, 
_vec_output_expr_ctxs,
-            hive_table_sink.columns, std::move(write_info),
+            _write_output_vexpr_ctxs, _non_write_columns_indices, 
hive_table_sink.columns,
+            std::move(write_info),
             fmt::format("{}{}", _compute_file_name(),
                         _get_file_extension(file_format_type, 
write_compress_type)),
             file_format_type, write_compress_type, 
hive_table_sink.hadoop_config);
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h 
b/be/src/vec/sink/writer/vhive_table_writer.h
index a4681b32e3f..9f48f6afde1 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -68,7 +68,10 @@ private:
     RuntimeState* _state = nullptr;
     RuntimeProfile* _profile = nullptr;
     std::vector<int> _partition_columns_input_index;
+    std::set<size_t> _non_write_columns_indices;
     std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>> 
_partitions_to_writers;
+
+    VExprContextSPtrs _write_output_vexpr_ctxs;
 };
 } // namespace vectorized
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to