morningman commented on a change in pull request #5938:
URL: https://github.com/apache/incubator-doris/pull/5938#discussion_r645981969



##########
File path: be/src/exec/parquet_writer.cpp
##########
@@ -60,33 +67,397 @@ arrow::Status ParquetOutputStream::Tell(int64_t* position) 
const {
 }
 
 arrow::Status ParquetOutputStream::Close() {
+    if (_is_closed) {
+        return arrow::Status::OK();
+    }
     Status st = _file_writer->close();
     if (!st.ok()) {
+        LOG(WARNING) << "close parquet output stream failed: " << 
st.get_error_msg();
         return arrow::Status::IOError(st.get_error_msg());
     }
     _is_closed = true;
     return arrow::Status::OK();
 }
 
+int64_t ParquetOutputStream::get_current_written_len() {
+    return _written_len;
+}
+
+void ParquetOutputStream::set_current_written_len(int64_t written_len) {
+    _written_len = written_len;
+}
+
 /// ParquetWriterWrapper
 ParquetWriterWrapper::ParquetWriterWrapper(FileWriter* file_writer,
-                                           const std::vector<ExprContext*>& 
output_expr_ctxs)
-        : _output_expr_ctxs(output_expr_ctxs) {
-    // TODO(cmy): implement
-    _outstream = new ParquetOutputStream(file_writer);
+                                           const std::vector<ExprContext*>& 
output_expr_ctxs,
+                                           const std::map<std::string, 
std::string>& properties,
+                                           const 
std::vector<std::vector<std::string>>& schema)
+        :_output_expr_ctxs(output_expr_ctxs),
+          _str_schema(schema),
+          _cur_writed_rows(0),
+          _rg_writer(nullptr) {
+    _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
+    parse_properties(properties);
+    parse_schema(schema);
+    init_parquet_writer();
+}
+
+void ParquetWriterWrapper::parse_properties(const std::map<std::string, 
std::string>& propertie_map) {
+    parquet::WriterProperties::Builder builder;
+    for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) {
+        std::string property_name = it->first;
+        std::string property_value = it->second;
+        if (property_name == "compression") {
+            // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2
+            if (property_value == "snappy") {
+                builder.compression(parquet::Compression::SNAPPY);
+            } else if (property_value == "gzip") {
+                builder.compression(parquet::Compression::GZIP);
+            } else if (property_value == "brotli") {
+                builder.compression(parquet::Compression::BROTLI);
+            } else if (property_value == "zstd") {
+                builder.compression(parquet::Compression::ZSTD);
+            } else if (property_value == "lz4") {
+                builder.compression(parquet::Compression::LZ4);
+            } else if (property_value == "lzo") {
+                builder.compression(parquet::Compression::LZO);
+            } else if (property_value == "bz2") {
+                builder.compression(parquet::Compression::BZ2);
+            } else {
+                builder.compression(parquet::Compression::UNCOMPRESSED);
+            }
+        } else if (property_name == "disable_dictionary") {
+            if (property_value == "true") {
+                builder.enable_dictionary();
+            } else {
+                builder.disable_dictionary();
+            }
+        } else if (property_name == "version") {
+            if (property_value == "v1") {
+                builder.version(parquet::ParquetVersion::PARQUET_1_0);
+            } else {
+                builder.version(parquet::ParquetVersion::PARQUET_2_0);
+            }
+        }
+    }
+    _properties = builder.build();
+}
+
+Status ParquetWriterWrapper::parse_schema(const 
std::vector<std::vector<std::string>>& schema) {
+    parquet::schema::NodeVector fields;
+    for (auto column = schema.begin(); column != schema.end(); column++) {
+        std::string repetition_type = (*column)[0];
+        parquet::Repetition::type parquet_repetition_type = 
parquet::Repetition::REQUIRED;
+        if (repetition_type.find("required") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REQUIRED;
+        } else if (repetition_type.find("repeated") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REPEATED;
+        } else if (repetition_type.find("optional") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::OPTIONAL;
+        } else {
+            parquet_repetition_type = parquet::Repetition::UNDEFINED;
+        }
+
+        std::string data_type = (*column)[1];
+        parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY;
+        if (data_type =="boolean") {
+            parquet_data_type = parquet::Type::BOOLEAN;
+        } else if (data_type.find("int32") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT32;
+        } else if (data_type.find("int64") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT64;
+        } else if (data_type.find("int96") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT96;
+        } else if (data_type.find("float") != std::string::npos) {
+            parquet_data_type = parquet::Type::FLOAT;
+        } else if (data_type.find("double") != std::string::npos) {
+            parquet_data_type = parquet::Type::DOUBLE;
+        } else if (data_type.find("byte_array") != std::string::npos) {
+            parquet_data_type = parquet::Type::BYTE_ARRAY;
+        } else if (data_type.find("fixed_len_byte_array") != 
std::string::npos) {
+            parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
+        } else {
+            parquet_data_type = parquet::Type::UNDEFINED;
+        }
+
+        std::string column_name = (*column)[2];
+        fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, 
parquet_repetition_type,
+                                                              
parquet::LogicalType::None(), parquet_data_type));
+        _schema = std::static_pointer_cast<parquet::schema::GroupNode>(
+                parquet::schema::GroupNode::Make("schema", 
parquet::Repetition::REQUIRED, fields));
+    }
+    return Status::OK();
 }
 
 Status ParquetWriterWrapper::write(const RowBatch& row_batch) {
-    // TODO(cmy): implement
+    int num_rows = row_batch.num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        TupleRow* row = row_batch.get_row(i);
+        RETURN_IF_ERROR(_write_one_row(row));
+        _cur_writed_rows++;
+    }
     return Status::OK();
 }
 
+Status ParquetWriterWrapper::init_parquet_writer() {
+    _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, 
_properties);
+    if (_writer == nullptr) {
+        return Status::InternalError("Failed to create file writer");
+    }
+    return Status::OK();
+}
+
+parquet::RowGroupWriter* ParquetWriterWrapper::getRGWriter() {
+    if (_rg_writer == nullptr) {
+        _rg_writer = _writer->AppendBufferedRowGroup();
+    }
+    if (_cur_writed_rows > _max_row_per_group) {
+        _rg_writer->Close();
+        _rg_writer = _writer->AppendBufferedRowGroup();
+        _cur_writed_rows = 0;
+    }
+    return _rg_writer;
+}
+
+Status ParquetWriterWrapper::_write_one_row(TupleRow* row) {
+    int num_columns = _output_expr_ctxs.size();
+    if (num_columns != _str_schema.size()) {
+        return Status::InternalError("project field size is not equal to 
schema column size");
+    }
+    try {
+        for (int index = 0; index < num_columns; ++index) {
+            void *item = _output_expr_ctxs[index]->get_value(row);
+            switch (_output_expr_ctxs[index]->root()->type().type) {
+                case TYPE_BOOLEAN: {
+                    if (_str_schema[index][1] != "boolean") {
+                        std::stringstream ss;
+                        ss << "project field type is boolean, but the 
definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    LOG(WARNING) << "wangxixu-write-one-data:";

Review comment:
       ```suggestion
   ```

##########
File path: be/src/exec/parquet_writer.cpp
##########
@@ -60,33 +67,397 @@ arrow::Status ParquetOutputStream::Tell(int64_t* position) 
const {
 }
 
 arrow::Status ParquetOutputStream::Close() {
+    if (_is_closed) {
+        return arrow::Status::OK();
+    }
     Status st = _file_writer->close();
     if (!st.ok()) {
+        LOG(WARNING) << "close parquet output stream failed: " << 
st.get_error_msg();
         return arrow::Status::IOError(st.get_error_msg());
     }
     _is_closed = true;
     return arrow::Status::OK();
 }
 
+int64_t ParquetOutputStream::get_current_written_len() {
+    return _written_len;
+}
+
+void ParquetOutputStream::set_current_written_len(int64_t written_len) {
+    _written_len = written_len;
+}
+
 /// ParquetWriterWrapper
 ParquetWriterWrapper::ParquetWriterWrapper(FileWriter* file_writer,
-                                           const std::vector<ExprContext*>& 
output_expr_ctxs)
-        : _output_expr_ctxs(output_expr_ctxs) {
-    // TODO(cmy): implement
-    _outstream = new ParquetOutputStream(file_writer);
+                                           const std::vector<ExprContext*>& 
output_expr_ctxs,
+                                           const std::map<std::string, 
std::string>& properties,
+                                           const 
std::vector<std::vector<std::string>>& schema)
+        :_output_expr_ctxs(output_expr_ctxs),
+          _str_schema(schema),
+          _cur_writed_rows(0),
+          _rg_writer(nullptr) {
+    _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
+    parse_properties(properties);
+    parse_schema(schema);
+    init_parquet_writer();
+}
+
+void ParquetWriterWrapper::parse_properties(const std::map<std::string, 
std::string>& propertie_map) {
+    parquet::WriterProperties::Builder builder;
+    for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) {
+        std::string property_name = it->first;
+        std::string property_value = it->second;
+        if (property_name == "compression") {
+            // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2
+            if (property_value == "snappy") {
+                builder.compression(parquet::Compression::SNAPPY);
+            } else if (property_value == "gzip") {
+                builder.compression(parquet::Compression::GZIP);
+            } else if (property_value == "brotli") {
+                builder.compression(parquet::Compression::BROTLI);
+            } else if (property_value == "zstd") {
+                builder.compression(parquet::Compression::ZSTD);
+            } else if (property_value == "lz4") {
+                builder.compression(parquet::Compression::LZ4);
+            } else if (property_value == "lzo") {
+                builder.compression(parquet::Compression::LZO);
+            } else if (property_value == "bz2") {
+                builder.compression(parquet::Compression::BZ2);
+            } else {
+                builder.compression(parquet::Compression::UNCOMPRESSED);
+            }
+        } else if (property_name == "disable_dictionary") {
+            if (property_value == "true") {
+                builder.enable_dictionary();
+            } else {
+                builder.disable_dictionary();
+            }
+        } else if (property_name == "version") {
+            if (property_value == "v1") {
+                builder.version(parquet::ParquetVersion::PARQUET_1_0);
+            } else {
+                builder.version(parquet::ParquetVersion::PARQUET_2_0);
+            }
+        }
+    }
+    _properties = builder.build();
+}
+
+Status ParquetWriterWrapper::parse_schema(const 
std::vector<std::vector<std::string>>& schema) {
+    parquet::schema::NodeVector fields;
+    for (auto column = schema.begin(); column != schema.end(); column++) {
+        std::string repetition_type = (*column)[0];
+        parquet::Repetition::type parquet_repetition_type = 
parquet::Repetition::REQUIRED;
+        if (repetition_type.find("required") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REQUIRED;
+        } else if (repetition_type.find("repeated") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REPEATED;
+        } else if (repetition_type.find("optional") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::OPTIONAL;
+        } else {
+            parquet_repetition_type = parquet::Repetition::UNDEFINED;
+        }
+
+        std::string data_type = (*column)[1];
+        parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY;
+        if (data_type =="boolean") {
+            parquet_data_type = parquet::Type::BOOLEAN;
+        } else if (data_type.find("int32") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT32;
+        } else if (data_type.find("int64") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT64;
+        } else if (data_type.find("int96") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT96;
+        } else if (data_type.find("float") != std::string::npos) {
+            parquet_data_type = parquet::Type::FLOAT;
+        } else if (data_type.find("double") != std::string::npos) {
+            parquet_data_type = parquet::Type::DOUBLE;
+        } else if (data_type.find("byte_array") != std::string::npos) {
+            parquet_data_type = parquet::Type::BYTE_ARRAY;
+        } else if (data_type.find("fixed_len_byte_array") != 
std::string::npos) {
+            parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
+        } else {
+            parquet_data_type = parquet::Type::UNDEFINED;
+        }
+
+        std::string column_name = (*column)[2];
+        fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, 
parquet_repetition_type,
+                                                              
parquet::LogicalType::None(), parquet_data_type));
+        _schema = std::static_pointer_cast<parquet::schema::GroupNode>(
+                parquet::schema::GroupNode::Make("schema", 
parquet::Repetition::REQUIRED, fields));
+    }
+    return Status::OK();
 }
 
 Status ParquetWriterWrapper::write(const RowBatch& row_batch) {
-    // TODO(cmy): implement
+    int num_rows = row_batch.num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        TupleRow* row = row_batch.get_row(i);
+        RETURN_IF_ERROR(_write_one_row(row));
+        _cur_writed_rows++;
+    }
     return Status::OK();
 }
 
+Status ParquetWriterWrapper::init_parquet_writer() {
+    _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, 
_properties);
+    if (_writer == nullptr) {
+        return Status::InternalError("Failed to create file writer");
+    }
+    return Status::OK();
+}
+
+parquet::RowGroupWriter* ParquetWriterWrapper::getRGWriter() {
+    if (_rg_writer == nullptr) {
+        _rg_writer = _writer->AppendBufferedRowGroup();
+    }
+    if (_cur_writed_rows > _max_row_per_group) {
+        _rg_writer->Close();
+        _rg_writer = _writer->AppendBufferedRowGroup();
+        _cur_writed_rows = 0;
+    }
+    return _rg_writer;
+}
+
+Status ParquetWriterWrapper::_write_one_row(TupleRow* row) {
+    int num_columns = _output_expr_ctxs.size();
+    if (num_columns != _str_schema.size()) {
+        return Status::InternalError("project field size is not equal to 
schema column size");
+    }
+    try {
+        for (int index = 0; index < num_columns; ++index) {
+            void *item = _output_expr_ctxs[index]->get_value(row);
+            switch (_output_expr_ctxs[index]->root()->type().type) {
+                case TYPE_BOOLEAN: {
+                    if (_str_schema[index][1] != "boolean") {
+                        std::stringstream ss;
+                        ss << "project field type is boolean, but the 
definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    LOG(WARNING) << "wangxixu-write-one-data:";
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::BoolWriter *col_writer = 
static_cast<parquet::BoolWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
static_cast<bool*> (item));
+                    } else {
+                        bool default_bool = false;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_bool);
+                    }
+                    LOG(WARNING) << "wangxixu-write-finished0.";
+                    break;
+                }
+                case TYPE_TINYINT:
+                case TYPE_SMALLINT:
+                case TYPE_INT: {
+                    if (_str_schema[index][1] != "int32") {
+                        std::stringstream ss;
+                        ss << "project field type is tiny int/small int/int, 
should use int32, but the definition type of column "
+                                << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int32Writer *col_writer = 
static_cast<parquet::Int32Writer *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
static_cast<int32_t *>(item));
+                    } else {
+                        int32_t default_int32 = 0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int32);
+                    }
+                    break;
+                }
+                case TYPE_BIGINT: {
+                    if (_str_schema[index][1] != "int64") {
+                        std::stringstream ss;
+                        ss << "project field type is big int, should use 
int64, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int64Writer *col_writer = 
static_cast<parquet::Int64Writer *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, (int64_t * 
)(item));
+                    } else {
+                        int64_t default_int644 = 0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int644);
+                    }
+                    break;
+                }
+                case TYPE_LARGEINT: {
+                    // TODO: not support int_128
+                    // It is better write a default value, because rg_writer 
need all columns has value before flush to disk.
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int64Writer *col_writer = 
static_cast<parquet::Int64Writer *>(rgWriter->column(index));
+                    int64_t default_int64 = 0;
+                    col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                    return Status::InvalidArgument("do not support large int 
type.");
+                }
+                case TYPE_FLOAT: {
+                    if (_str_schema[index][1] != "float") {
+                        std::stringstream ss;
+                        ss << "project field type is float, but the definition 
type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::FloatWriter *col_writer = 
static_cast<parquet::FloatWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, (float_t 
*) (item));
+                    } else {
+                        float_t default_float = 0.0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_float);
+                    }
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    if (_str_schema[index][1] != "double") {
+                        std::stringstream ss;
+                        ss << "project field type is double, but the 
definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::DoubleWriter *col_writer = 
static_cast<parquet::DoubleWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, (double_t 
*) (item));
+                    } else {
+                        double_t default_double = 0.0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_double);
+                    }
+                    break;
+                }
+                case TYPE_DATETIME:
+                case TYPE_DATE: {
+                    if (_str_schema[index][1] != "int64") {
+                        std::stringstream ss;
+                        ss << "project field type is date/datetime, should use 
int64, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int64Writer *col_writer = 
static_cast<parquet::Int64Writer *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        const DateTimeValue *time_val = (const DateTimeValue 
*) (item);
+                        int64_t timestamp = time_val->to_olap_datetime();
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&timestamp);
+                    } else {
+                        int64_t default_int64 = 0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                    }
+                    break;
+                }
+                case TYPE_CHAR:
+                case TYPE_VARCHAR: {
+                    if (_str_schema[index][1] != "byte_array") {
+                        std::stringstream ss;
+                        ss << "project field type is char/varchar, should use 
byte_array, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::ByteArrayWriter *col_writer = 
static_cast<parquet::ByteArrayWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        const StringValue *string_val = (const StringValue *) 
(item);
+                        parquet::ByteArray value;
+                        value.ptr = reinterpret_cast<const uint8_t 
*>(string_val->ptr);
+                        value.len = string_val->len;
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    } else {
+                        parquet::ByteArray value;
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    }
+                    break;
+                }
+                case TYPE_DECIMAL: {
+                    if (_str_schema[index][1] != "byte_array") {
+                        std::stringstream ss;
+                        ss << "project field type is decimal, should use 
byte_array, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::ByteArrayWriter *col_writer = 
static_cast<parquet::ByteArrayWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        const DecimalValue *decimal_val = 
reinterpret_cast<const DecimalValue *>(item);
+                        std::string decimal_str;
+                        int output_scale = 
_output_expr_ctxs[index]->root()->output_scale();
+                        if (output_scale > 0 && output_scale <= 30) {
+                            decimal_str = decimal_val->to_string(output_scale);
+                        } else {
+                            decimal_str = decimal_val->to_string();
+                        }
+                        parquet::ByteArray value;
+                        value.ptr = reinterpret_cast<const uint8_t 
*>(&decimal_str);
+                        value.len = decimal_str.length();
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    } else {
+                        parquet::ByteArray value;
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    }
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    if (_str_schema[index][1] != "byte_array") {
+                        std::stringstream ss;
+                        ss << "project field type is decimal v2, should use 
byte_array, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::ByteArrayWriter *col_writer = 
static_cast<parquet::ByteArrayWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        const DecimalV2Value decimal_val(
+                                reinterpret_cast<const PackedInt128 
*>(item)->value);
+                        std::string decimal_str;
+                        int output_scale = 
_output_expr_ctxs[index]->root()->output_scale();
+                        if (output_scale > 0 && output_scale <= 30) {
+                            decimal_str = decimal_val.to_string(output_scale);
+                        } else {
+                            decimal_str = decimal_val.to_string();
+                        }
+                        parquet::ByteArray value;
+                        value.ptr = reinterpret_cast<const uint8_t 
*>(&decimal_str);
+                        value.len = decimal_str.length();
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    } else {
+                        parquet::ByteArray value;
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    }
+                    break;
+                }
+                default: {
+                    std::stringstream ss;
+                    ss << "unsupported file format: " << 
_output_expr_ctxs[index]->root()->type().type;
+                    return Status::InvalidArgument(ss.str());
+                }
+            }
+        }
+    } catch (const std::exception& e) {
+        LOG(WARNING) <<"Parquet write error: " << e.what();
+        return Status::InternalError(e.what());
+    }
+    return Status::OK();
+}
+
+int64_t ParquetWriterWrapper::getWritedLen() {
+    return _outstream->get_current_written_len();
+}
 void ParquetWriterWrapper::close() {
-    // TODO(cmy): implement
+    try {
+        if (_rg_writer != nullptr) {
+            _rg_writer->Close();
+            _rg_writer = nullptr;
+        }
+        _writer->Close();
+        _outstream->Close();
+    } catch (const std::exception& e) {
+        _rg_writer = nullptr;
+        LOG(WARNING) <<"Parquet writer close error: " << e.what();

Review comment:
       ```suggestion
           LOG(WARNING) << "Parquet writer close error: " << e.what();
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
##########
@@ -121,10 +141,19 @@ public BrokerDesc getBrokerDesc() {
     public void analyze(Analyzer analyzer) throws AnalysisException {
         analyzeFilePath();
 
-        if (!format.equals("csv")) {
-            throw new AnalysisException("Only support CSV format");
+        if (Strings.isNullOrEmpty(filePath)) {
+            throw new AnalysisException("Must specify file in OUTFILE clause");
+        }
+        switch (this.format) {
+            case "csv":
+                fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
+                break;
+            case "parquet":
+                fileFormatType = TFileFormatType.FORMAT_PARQUET;
+                break;
+            default:
+                throw new AnalysisException("format:"+this.format+" not be 
supported.");

Review comment:
       ```suggestion
                   throw new AnalysisException("format:" + this.format + " is 
supported.");
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
##########
@@ -216,6 +250,43 @@ private void getBrokerProperties(Set<String> 
processedPropKeys) {
         brokerDesc = new BrokerDesc(brokerName, brokerProps);
     }
 
+    private void getParquetProperties(Set<String> processedPropKeys) throws 
AnalysisException {

Review comment:
       Better add comment with an example to explain what kind of property 
format does this method parsed

##########
File path: be/src/exec/parquet_writer.cpp
##########
@@ -60,33 +67,397 @@ arrow::Status ParquetOutputStream::Tell(int64_t* position) 
const {
 }
 
 arrow::Status ParquetOutputStream::Close() {
+    if (_is_closed) {
+        return arrow::Status::OK();
+    }
     Status st = _file_writer->close();
     if (!st.ok()) {
+        LOG(WARNING) << "close parquet output stream failed: " << 
st.get_error_msg();
         return arrow::Status::IOError(st.get_error_msg());
     }
     _is_closed = true;
     return arrow::Status::OK();
 }
 
+int64_t ParquetOutputStream::get_current_written_len() {
+    return _written_len;
+}
+
+void ParquetOutputStream::set_current_written_len(int64_t written_len) {
+    _written_len = written_len;
+}
+
 /// ParquetWriterWrapper
 ParquetWriterWrapper::ParquetWriterWrapper(FileWriter* file_writer,
-                                           const std::vector<ExprContext*>& 
output_expr_ctxs)
-        : _output_expr_ctxs(output_expr_ctxs) {
-    // TODO(cmy): implement
-    _outstream = new ParquetOutputStream(file_writer);
+                                           const std::vector<ExprContext*>& 
output_expr_ctxs,
+                                           const std::map<std::string, 
std::string>& properties,
+                                           const 
std::vector<std::vector<std::string>>& schema)
+        :_output_expr_ctxs(output_expr_ctxs),
+          _str_schema(schema),
+          _cur_writed_rows(0),
+          _rg_writer(nullptr) {
+    _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
+    parse_properties(properties);
+    parse_schema(schema);
+    init_parquet_writer();
+}
+
+void ParquetWriterWrapper::parse_properties(const std::map<std::string, 
std::string>& propertie_map) {
+    parquet::WriterProperties::Builder builder;
+    for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) {
+        std::string property_name = it->first;
+        std::string property_value = it->second;
+        if (property_name == "compression") {
+            // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2
+            if (property_value == "snappy") {
+                builder.compression(parquet::Compression::SNAPPY);
+            } else if (property_value == "gzip") {
+                builder.compression(parquet::Compression::GZIP);
+            } else if (property_value == "brotli") {
+                builder.compression(parquet::Compression::BROTLI);
+            } else if (property_value == "zstd") {
+                builder.compression(parquet::Compression::ZSTD);
+            } else if (property_value == "lz4") {
+                builder.compression(parquet::Compression::LZ4);
+            } else if (property_value == "lzo") {
+                builder.compression(parquet::Compression::LZO);
+            } else if (property_value == "bz2") {
+                builder.compression(parquet::Compression::BZ2);
+            } else {
+                builder.compression(parquet::Compression::UNCOMPRESSED);
+            }
+        } else if (property_name == "disable_dictionary") {
+            if (property_value == "true") {
+                builder.enable_dictionary();
+            } else {
+                builder.disable_dictionary();
+            }
+        } else if (property_name == "version") {
+            if (property_value == "v1") {
+                builder.version(parquet::ParquetVersion::PARQUET_1_0);
+            } else {
+                builder.version(parquet::ParquetVersion::PARQUET_2_0);
+            }
+        }
+    }
+    _properties = builder.build();
+}
+
+Status ParquetWriterWrapper::parse_schema(const 
std::vector<std::vector<std::string>>& schema) {
+    parquet::schema::NodeVector fields;
+    for (auto column = schema.begin(); column != schema.end(); column++) {
+        std::string repetition_type = (*column)[0];
+        parquet::Repetition::type parquet_repetition_type = 
parquet::Repetition::REQUIRED;
+        if (repetition_type.find("required") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REQUIRED;
+        } else if (repetition_type.find("repeated") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REPEATED;
+        } else if (repetition_type.find("optional") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::OPTIONAL;
+        } else {
+            parquet_repetition_type = parquet::Repetition::UNDEFINED;
+        }
+
+        std::string data_type = (*column)[1];
+        parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY;
+        if (data_type =="boolean") {
+            parquet_data_type = parquet::Type::BOOLEAN;
+        } else if (data_type.find("int32") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT32;
+        } else if (data_type.find("int64") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT64;
+        } else if (data_type.find("int96") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT96;
+        } else if (data_type.find("float") != std::string::npos) {
+            parquet_data_type = parquet::Type::FLOAT;
+        } else if (data_type.find("double") != std::string::npos) {
+            parquet_data_type = parquet::Type::DOUBLE;
+        } else if (data_type.find("byte_array") != std::string::npos) {
+            parquet_data_type = parquet::Type::BYTE_ARRAY;
+        } else if (data_type.find("fixed_len_byte_array") != 
std::string::npos) {
+            parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
+        } else {
+            parquet_data_type = parquet::Type::UNDEFINED;
+        }
+
+        std::string column_name = (*column)[2];
+        fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, 
parquet_repetition_type,
+                                                              
parquet::LogicalType::None(), parquet_data_type));
+        _schema = std::static_pointer_cast<parquet::schema::GroupNode>(
+                parquet::schema::GroupNode::Make("schema", 
parquet::Repetition::REQUIRED, fields));
+    }
+    return Status::OK();
 }
 
 Status ParquetWriterWrapper::write(const RowBatch& row_batch) {
-    // TODO(cmy): implement
+    int num_rows = row_batch.num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        TupleRow* row = row_batch.get_row(i);
+        RETURN_IF_ERROR(_write_one_row(row));
+        _cur_writed_rows++;
+    }
     return Status::OK();
 }
 
+Status ParquetWriterWrapper::init_parquet_writer() {
+    _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, 
_properties);
+    if (_writer == nullptr) {
+        return Status::InternalError("Failed to create file writer");
+    }
+    return Status::OK();
+}
+
+parquet::RowGroupWriter* ParquetWriterWrapper::getRGWriter() {
+    if (_rg_writer == nullptr) {
+        _rg_writer = _writer->AppendBufferedRowGroup();
+    }
+    if (_cur_writed_rows > _max_row_per_group) {
+        _rg_writer->Close();
+        _rg_writer = _writer->AppendBufferedRowGroup();
+        _cur_writed_rows = 0;
+    }
+    return _rg_writer;
+}
+
+Status ParquetWriterWrapper::_write_one_row(TupleRow* row) {
+    int num_columns = _output_expr_ctxs.size();
+    if (num_columns != _str_schema.size()) {
+        return Status::InternalError("project field size is not equal to 
schema column size");
+    }
+    try {
+        for (int index = 0; index < num_columns; ++index) {
+            void *item = _output_expr_ctxs[index]->get_value(row);
+            switch (_output_expr_ctxs[index]->root()->type().type) {
+                case TYPE_BOOLEAN: {
+                    if (_str_schema[index][1] != "boolean") {
+                        std::stringstream ss;
+                        ss << "project field type is boolean, but the 
definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    LOG(WARNING) << "wangxixu-write-one-data:";
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::BoolWriter *col_writer = 
static_cast<parquet::BoolWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
static_cast<bool*> (item));
+                    } else {
+                        bool default_bool = false;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_bool);
+                    }
+                    LOG(WARNING) << "wangxixu-write-finished0.";
+                    break;
+                }
+                case TYPE_TINYINT:
+                case TYPE_SMALLINT:
+                case TYPE_INT: {
+                    if (_str_schema[index][1] != "int32") {
+                        std::stringstream ss;
+                        ss << "project field type is tiny int/small int/int, 
should use int32, but the definition type of column "
+                                << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int32Writer *col_writer = 
static_cast<parquet::Int32Writer *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
static_cast<int32_t *>(item));
+                    } else {
+                        int32_t default_int32 = 0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int32);
+                    }
+                    break;
+                }
+                case TYPE_BIGINT: {
+                    if (_str_schema[index][1] != "int64") {
+                        std::stringstream ss;
+                        ss << "project field type is big int, should use 
int64, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int64Writer *col_writer = 
static_cast<parquet::Int64Writer *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, (int64_t * 
)(item));
+                    } else {
+                        int64_t default_int644 = 0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int644);
+                    }
+                    break;
+                }
+                case TYPE_LARGEINT: {

Review comment:
       I think this should be checked on FE side, and reject request with 
LARGEINT column

##########
File path: be/src/exec/parquet_writer.cpp
##########
@@ -60,33 +67,397 @@ arrow::Status ParquetOutputStream::Tell(int64_t* position) 
const {
 }
 
 arrow::Status ParquetOutputStream::Close() {
+    if (_is_closed) {
+        return arrow::Status::OK();
+    }
     Status st = _file_writer->close();
     if (!st.ok()) {
+        LOG(WARNING) << "close parquet output stream failed: " << 
st.get_error_msg();
         return arrow::Status::IOError(st.get_error_msg());
     }
     _is_closed = true;
     return arrow::Status::OK();
 }
 
+int64_t ParquetOutputStream::get_current_written_len() {
+    return _written_len;
+}
+
+void ParquetOutputStream::set_current_written_len(int64_t written_len) {
+    _written_len = written_len;
+}
+
 /// ParquetWriterWrapper
 ParquetWriterWrapper::ParquetWriterWrapper(FileWriter* file_writer,
-                                           const std::vector<ExprContext*>& 
output_expr_ctxs)
-        : _output_expr_ctxs(output_expr_ctxs) {
-    // TODO(cmy): implement
-    _outstream = new ParquetOutputStream(file_writer);
+                                           const std::vector<ExprContext*>& 
output_expr_ctxs,
+                                           const std::map<std::string, 
std::string>& properties,
+                                           const 
std::vector<std::vector<std::string>>& schema)
+        :_output_expr_ctxs(output_expr_ctxs),
+          _str_schema(schema),
+          _cur_writed_rows(0),
+          _rg_writer(nullptr) {
+    _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
+    parse_properties(properties);
+    parse_schema(schema);
+    init_parquet_writer();
+}
+
+void ParquetWriterWrapper::parse_properties(const std::map<std::string, 
std::string>& propertie_map) {
+    parquet::WriterProperties::Builder builder;
+    for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) {
+        std::string property_name = it->first;
+        std::string property_value = it->second;
+        if (property_name == "compression") {
+            // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2
+            if (property_value == "snappy") {
+                builder.compression(parquet::Compression::SNAPPY);
+            } else if (property_value == "gzip") {
+                builder.compression(parquet::Compression::GZIP);
+            } else if (property_value == "brotli") {
+                builder.compression(parquet::Compression::BROTLI);
+            } else if (property_value == "zstd") {
+                builder.compression(parquet::Compression::ZSTD);
+            } else if (property_value == "lz4") {
+                builder.compression(parquet::Compression::LZ4);
+            } else if (property_value == "lzo") {
+                builder.compression(parquet::Compression::LZO);
+            } else if (property_value == "bz2") {
+                builder.compression(parquet::Compression::BZ2);
+            } else {
+                builder.compression(parquet::Compression::UNCOMPRESSED);
+            }
+        } else if (property_name == "disable_dictionary") {
+            if (property_value == "true") {
+                builder.enable_dictionary();
+            } else {
+                builder.disable_dictionary();
+            }
+        } else if (property_name == "version") {
+            if (property_value == "v1") {
+                builder.version(parquet::ParquetVersion::PARQUET_1_0);
+            } else {
+                builder.version(parquet::ParquetVersion::PARQUET_2_0);
+            }
+        }
+    }
+    _properties = builder.build();
+}
+
+Status ParquetWriterWrapper::parse_schema(const 
std::vector<std::vector<std::string>>& schema) {
+    parquet::schema::NodeVector fields;
+    for (auto column = schema.begin(); column != schema.end(); column++) {
+        std::string repetition_type = (*column)[0];
+        parquet::Repetition::type parquet_repetition_type = 
parquet::Repetition::REQUIRED;
+        if (repetition_type.find("required") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REQUIRED;
+        } else if (repetition_type.find("repeated") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REPEATED;
+        } else if (repetition_type.find("optional") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::OPTIONAL;
+        } else {
+            parquet_repetition_type = parquet::Repetition::UNDEFINED;
+        }
+
+        std::string data_type = (*column)[1];
+        parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY;
+        if (data_type =="boolean") {
+            parquet_data_type = parquet::Type::BOOLEAN;
+        } else if (data_type.find("int32") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT32;
+        } else if (data_type.find("int64") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT64;
+        } else if (data_type.find("int96") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT96;
+        } else if (data_type.find("float") != std::string::npos) {
+            parquet_data_type = parquet::Type::FLOAT;
+        } else if (data_type.find("double") != std::string::npos) {
+            parquet_data_type = parquet::Type::DOUBLE;
+        } else if (data_type.find("byte_array") != std::string::npos) {
+            parquet_data_type = parquet::Type::BYTE_ARRAY;
+        } else if (data_type.find("fixed_len_byte_array") != 
std::string::npos) {
+            parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
+        } else {
+            parquet_data_type = parquet::Type::UNDEFINED;
+        }
+
+        std::string column_name = (*column)[2];
+        fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, 
parquet_repetition_type,
+                                                              
parquet::LogicalType::None(), parquet_data_type));
+        _schema = std::static_pointer_cast<parquet::schema::GroupNode>(
+                parquet::schema::GroupNode::Make("schema", 
parquet::Repetition::REQUIRED, fields));
+    }
+    return Status::OK();
 }
 
 Status ParquetWriterWrapper::write(const RowBatch& row_batch) {
-    // TODO(cmy): implement
+    int num_rows = row_batch.num_rows();
+    for (int i = 0; i < num_rows; ++i) {
+        TupleRow* row = row_batch.get_row(i);
+        RETURN_IF_ERROR(_write_one_row(row));
+        _cur_writed_rows++;
+    }
     return Status::OK();
 }
 
+Status ParquetWriterWrapper::init_parquet_writer() {
+    _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, 
_properties);
+    if (_writer == nullptr) {
+        return Status::InternalError("Failed to create file writer");
+    }
+    return Status::OK();
+}
+
+parquet::RowGroupWriter* ParquetWriterWrapper::getRGWriter() {
+    if (_rg_writer == nullptr) {
+        _rg_writer = _writer->AppendBufferedRowGroup();
+    }
+    if (_cur_writed_rows > _max_row_per_group) {
+        _rg_writer->Close();
+        _rg_writer = _writer->AppendBufferedRowGroup();
+        _cur_writed_rows = 0;
+    }
+    return _rg_writer;
+}
+
+Status ParquetWriterWrapper::_write_one_row(TupleRow* row) {
+    int num_columns = _output_expr_ctxs.size();
+    if (num_columns != _str_schema.size()) {
+        return Status::InternalError("project field size is not equal to 
schema column size");
+    }
+    try {
+        for (int index = 0; index < num_columns; ++index) {
+            void *item = _output_expr_ctxs[index]->get_value(row);
+            switch (_output_expr_ctxs[index]->root()->type().type) {
+                case TYPE_BOOLEAN: {
+                    if (_str_schema[index][1] != "boolean") {
+                        std::stringstream ss;
+                        ss << "project field type is boolean, but the 
definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    LOG(WARNING) << "wangxixu-write-one-data:";
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::BoolWriter *col_writer = 
static_cast<parquet::BoolWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
static_cast<bool*> (item));
+                    } else {
+                        bool default_bool = false;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_bool);
+                    }
+                    LOG(WARNING) << "wangxixu-write-finished0.";
+                    break;
+                }
+                case TYPE_TINYINT:
+                case TYPE_SMALLINT:
+                case TYPE_INT: {
+                    if (_str_schema[index][1] != "int32") {
+                        std::stringstream ss;
+                        ss << "project field type is tiny int/small int/int, 
should use int32, but the definition type of column "
+                                << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int32Writer *col_writer = 
static_cast<parquet::Int32Writer *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
static_cast<int32_t *>(item));
+                    } else {
+                        int32_t default_int32 = 0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int32);
+                    }
+                    break;
+                }
+                case TYPE_BIGINT: {
+                    if (_str_schema[index][1] != "int64") {
+                        std::stringstream ss;
+                        ss << "project field type is big int, should use 
int64, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int64Writer *col_writer = 
static_cast<parquet::Int64Writer *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, (int64_t * 
)(item));
+                    } else {
+                        int64_t default_int644 = 0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int644);
+                    }
+                    break;
+                }
+                case TYPE_LARGEINT: {
+                    // TODO: not support int_128
+                    // It is better write a default value, because rg_writer 
need all columns has value before flush to disk.
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int64Writer *col_writer = 
static_cast<parquet::Int64Writer *>(rgWriter->column(index));
+                    int64_t default_int64 = 0;
+                    col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                    return Status::InvalidArgument("do not support large int 
type.");
+                }
+                case TYPE_FLOAT: {
+                    if (_str_schema[index][1] != "float") {
+                        std::stringstream ss;
+                        ss << "project field type is float, but the definition 
type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::FloatWriter *col_writer = 
static_cast<parquet::FloatWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, (float_t 
*) (item));
+                    } else {
+                        float_t default_float = 0.0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_float);
+                    }
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    if (_str_schema[index][1] != "double") {
+                        std::stringstream ss;
+                        ss << "project field type is double, but the 
definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::DoubleWriter *col_writer = 
static_cast<parquet::DoubleWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        col_writer->WriteBatch(1, nullptr, nullptr, (double_t 
*) (item));
+                    } else {
+                        double_t default_double = 0.0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_double);
+                    }
+                    break;
+                }
+                case TYPE_DATETIME:
+                case TYPE_DATE: {
+                    if (_str_schema[index][1] != "int64") {
+                        std::stringstream ss;
+                        ss << "project field type is date/datetime, should use 
int64, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::Int64Writer *col_writer = 
static_cast<parquet::Int64Writer *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        const DateTimeValue *time_val = (const DateTimeValue 
*) (item);
+                        int64_t timestamp = time_val->to_olap_datetime();
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&timestamp);
+                    } else {
+                        int64_t default_int64 = 0;
+                        col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                    }
+                    break;
+                }
+                case TYPE_CHAR:
+                case TYPE_VARCHAR: {
+                    if (_str_schema[index][1] != "byte_array") {
+                        std::stringstream ss;
+                        ss << "project field type is char/varchar, should use 
byte_array, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::ByteArrayWriter *col_writer = 
static_cast<parquet::ByteArrayWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        const StringValue *string_val = (const StringValue *) 
(item);
+                        parquet::ByteArray value;
+                        value.ptr = reinterpret_cast<const uint8_t 
*>(string_val->ptr);
+                        value.len = string_val->len;
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    } else {
+                        parquet::ByteArray value;
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    }
+                    break;
+                }
+                case TYPE_DECIMAL: {
+                    if (_str_schema[index][1] != "byte_array") {
+                        std::stringstream ss;
+                        ss << "project field type is decimal, should use 
byte_array, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::ByteArrayWriter *col_writer = 
static_cast<parquet::ByteArrayWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        const DecimalValue *decimal_val = 
reinterpret_cast<const DecimalValue *>(item);
+                        std::string decimal_str;
+                        int output_scale = 
_output_expr_ctxs[index]->root()->output_scale();
+                        if (output_scale > 0 && output_scale <= 30) {
+                            decimal_str = decimal_val->to_string(output_scale);
+                        } else {
+                            decimal_str = decimal_val->to_string();
+                        }
+                        parquet::ByteArray value;
+                        value.ptr = reinterpret_cast<const uint8_t 
*>(&decimal_str);
+                        value.len = decimal_str.length();
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    } else {
+                        parquet::ByteArray value;
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    }
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    if (_str_schema[index][1] != "byte_array") {
+                        std::stringstream ss;
+                        ss << "project field type is decimal v2, should use 
byte_array, but the definition type of column "
+                            << _str_schema[index][2] << " is " << 
_str_schema[index][1];
+                        return Status::InvalidArgument(ss.str());
+                    }
+                    parquet::RowGroupWriter* rgWriter = getRGWriter();
+                    parquet::ByteArrayWriter *col_writer = 
static_cast<parquet::ByteArrayWriter *>(rgWriter->column(index));
+                    if (item != nullptr) {
+                        const DecimalV2Value decimal_val(
+                                reinterpret_cast<const PackedInt128 
*>(item)->value);
+                        std::string decimal_str;
+                        int output_scale = 
_output_expr_ctxs[index]->root()->output_scale();
+                        if (output_scale > 0 && output_scale <= 30) {
+                            decimal_str = decimal_val.to_string(output_scale);
+                        } else {
+                            decimal_str = decimal_val.to_string();
+                        }
+                        parquet::ByteArray value;
+                        value.ptr = reinterpret_cast<const uint8_t 
*>(&decimal_str);
+                        value.len = decimal_str.length();
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    } else {
+                        parquet::ByteArray value;
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    }
+                    break;
+                }
+                default: {
+                    std::stringstream ss;
+                    ss << "unsupported file format: " << 
_output_expr_ctxs[index]->root()->type().type;
+                    return Status::InvalidArgument(ss.str());
+                }
+            }
+        }
+    } catch (const std::exception& e) {
+        LOG(WARNING) <<"Parquet write error: " << e.what();
+        return Status::InternalError(e.what());
+    }
+    return Status::OK();
+}
+
+int64_t ParquetWriterWrapper::getWritedLen() {

Review comment:
       ```suggestion
   int64_t ParquetWriterWrapper::get_written_len() {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to