This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new 22ee53d1df [fix](parquet) fix write error data as parquet format.
(#13004)
22ee53d1df is described below
commit 22ee53d1df4a06ef1671b672d1233ba629c8888a
Author: luozenglin <[email protected]>
AuthorDate: Tue Sep 27 13:28:53 2022 +0800
[fix](parquet) fix write error data as parquet format. (#13004)
Fix incorrect data conversion when writing tiny int and small int data
to parquet files in non-vectorized engine.
---
be/src/exec/parquet_writer.cpp | 46 +++++++++++++++++++++++++++++++-----------
be/src/exec/parquet_writer.h | 3 +++
2 files changed, 37 insertions(+), 12 deletions(-)
diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp
index bf8be94097..8bbe5d326b 100644
--- a/be/src/exec/parquet_writer.cpp
+++ b/be/src/exec/parquet_writer.cpp
@@ -223,6 +223,17 @@ parquet::RowGroupWriter*
ParquetWriterWrapper::get_rg_writer() {
return _rg_writer;
}
+template <typename T>
+void ParquetWriterWrapper::write_int32_column(int index, T* item) {
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::Int32Writer* col_writer =
static_cast<parquet::Int32Writer*>(rgWriter->column(index));
+ int32_t value = 0;
+ if (item != nullptr) {
+ value = *item;
+ }
+ col_writer->WriteBatch(1, nullptr, nullptr, &value);
+}
+
Status ParquetWriterWrapper::_write_one_row(TupleRow* row) {
int num_columns = _output_expr_ctxs.size();
if (num_columns != _str_schema.size()) {
@@ -250,8 +261,28 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row)
{
}
break;
}
- case TYPE_TINYINT:
- case TYPE_SMALLINT:
+ case TYPE_TINYINT: {
+ if (_str_schema[index][1] != "int32") {
+ std::stringstream ss;
+ ss << "project field type is tiny int, should use int32,
but the "
+ "definition type of column "
+ << _str_schema[index][2] << " is " <<
_str_schema[index][1];
+ return Status::InvalidArgument(ss.str());
+ }
+ write_int32_column(index, static_cast<int8_t*>(item));
+ break;
+ }
+ case TYPE_SMALLINT: {
+ if (_str_schema[index][1] != "int32") {
+ std::stringstream ss;
+ ss << "project field type is small int, should use int32,
but the "
+ "definition type of column "
+ << _str_schema[index][2] << " is " <<
_str_schema[index][1];
+ return Status::InvalidArgument(ss.str());
+ }
+ write_int32_column(index, static_cast<int16_t*>(item));
+ break;
+ }
case TYPE_INT: {
if (_str_schema[index][1] != "int32") {
std::stringstream ss;
@@ -260,16 +291,7 @@ Status ParquetWriterWrapper::_write_one_row(TupleRow* row)
{
<< _str_schema[index][2] << " is " <<
_str_schema[index][1];
return Status::InvalidArgument(ss.str());
}
-
- parquet::RowGroupWriter* rgWriter = get_rg_writer();
- parquet::Int32Writer* col_writer =
-
static_cast<parquet::Int32Writer*>(rgWriter->column(index));
- if (item != nullptr) {
- col_writer->WriteBatch(1, nullptr, nullptr,
static_cast<int32_t*>(item));
- } else {
- int32_t default_int32 = 0;
- col_writer->WriteBatch(1, nullptr, nullptr,
&default_int32);
- }
+ write_int32_column(index, static_cast<int32_t*>(item));
break;
}
case TYPE_BIGINT: {
diff --git a/be/src/exec/parquet_writer.h b/be/src/exec/parquet_writer.h
index c076aed941..effaf22757 100644
--- a/be/src/exec/parquet_writer.h
+++ b/be/src/exec/parquet_writer.h
@@ -93,6 +93,9 @@ public:
int64_t written_len();
private:
+ template <typename T>
+ void write_int32_column(int index, T* item);
+
std::shared_ptr<ParquetOutputStream> _outstream;
std::shared_ptr<parquet::WriterProperties> _properties;
std::shared_ptr<parquet::schema::GroupNode> _schema;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]