This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 20634ab7e3 [feature-wip](multi-catalog) support partition&missing
columns in parquet lazy read (#14264)
20634ab7e3 is described below
commit 20634ab7e3d90de41395543e1f4cdb5569bca6f2
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Nov 16 08:43:11 2022 +0800
[feature-wip](multi-catalog) support partition&missing columns in parquet
lazy read (#14264)
PR https://github.com/apache/doris/pull/13917 has supported lazy read for
non-predicate columns in ParquetReader,
but can't trigger lazy read when predicate columns are partition or missing
columns.
This PR support such case, and fill partition and missing columns in
`FileReader`.
---
be/src/exec/text_converter.cpp | 164 ++++++++++++++++++++-
be/src/exec/text_converter.h | 5 +
be/src/vec/exec/format/generic_reader.h | 19 +++
.../exec/format/parquet/vparquet_group_reader.cpp | 135 ++++++++++++-----
.../exec/format/parquet/vparquet_group_reader.h | 35 +++--
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 59 ++++++--
be/src/vec/exec/format/parquet/vparquet_reader.h | 6 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 66 +++++++--
be/src/vec/exec/scan/vfile_scanner.h | 1 +
be/test/vec/exec/parquet/parquet_reader_test.cpp | 4 +
10 files changed, 425 insertions(+), 69 deletions(-)
diff --git a/be/src/exec/text_converter.cpp b/be/src/exec/text_converter.cpp
index 5fac00569c..5888eefc43 100644
--- a/be/src/exec/text_converter.cpp
+++ b/be/src/exec/text_converter.cpp
@@ -17,15 +17,177 @@
#include "text_converter.h"
-#include <boost/algorithm/string.hpp>
+#include <sql.h>
+#include "runtime/decimalv2_value.h"
+#include "runtime/descriptors.h"
#include "runtime/mem_pool.h"
#include "runtime/string_value.h"
+#include "runtime/tuple.h"
+#include "util/string_parser.hpp"
+#include "util/types.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/runtime/vdatetime_value.h"
namespace doris {
TextConverter::TextConverter(char escape_char) : _escape_char(escape_char) {}
+bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc,
+ vectorized::IColumn* nullable_col_ptr,
const char* data,
+ size_t len, bool copy_string, bool
need_escape, size_t rows) {
+ vectorized::IColumn* col_ptr = nullable_col_ptr;
+ // \N means it's NULL
+ if (slot_desc->is_nullable()) {
+ auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr);
+ if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len ==
SQL_NULL_DATA) {
+ nullable_column->insert_many_defaults(rows);
+ return true;
+ } else {
+ auto& null_map = nullable_column->get_null_map_data();
+ null_map.resize_fill(null_map.size() + rows, 0);
+ col_ptr = &nullable_column->get_nested_column();
+ }
+ }
+
+ StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
+ size_t origin_size = col_ptr->size();
+ // Parse the raw-text data. Translate the text string to internal format.
+ switch (slot_desc->type().type) {
+ case TYPE_HLL: {
+ HyperLogLog hyper_log_log(Slice(data, len));
+ auto& hyper_data =
reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data();
+ for (size_t i = 0; i < rows; ++i) {
+ hyper_data.emplace_back(hyper_log_log);
+ }
+ break;
+ }
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR: {
+ if (need_escape) {
+ unescape_string_on_spot(data, &len);
+ }
+
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_many_data(data,
len, rows);
+ break;
+ }
+
+ case TYPE_BOOLEAN: {
+ bool num = StringParser::string_to_bool(data, len, &parse_result);
+ reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, (uint8_t)num);
+ break;
+ }
+ case TYPE_TINYINT: {
+ int8_t num = StringParser::string_to_int<int8_t>(data, len,
&parse_result);
+ reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, num);
+ break;
+ }
+ case TYPE_SMALLINT: {
+ int16_t num = StringParser::string_to_int<int16_t>(data, len,
&parse_result);
+ reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, num);
+ break;
+ }
+ case TYPE_INT: {
+ int32_t num = StringParser::string_to_int<int32_t>(data, len,
&parse_result);
+ reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, num);
+ break;
+ }
+ case TYPE_BIGINT: {
+ int64_t num = StringParser::string_to_int<int64_t>(data, len,
&parse_result);
+ reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, num);
+ break;
+ }
+ case TYPE_LARGEINT: {
+ __int128 num = StringParser::string_to_int<__int128>(data, len,
&parse_result);
+
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, num);
+ break;
+ }
+
+ case TYPE_FLOAT: {
+ float num = StringParser::string_to_float<float>(data, len,
&parse_result);
+
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, num);
+ break;
+ }
+ case TYPE_DOUBLE: {
+ double num = StringParser::string_to_float<double>(data, len,
&parse_result);
+
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, num);
+ break;
+ }
+ case TYPE_DATE: {
+ vectorized::VecDateTimeValue ts_slot;
+ if (!ts_slot.from_date_str(data, len)) {
+ parse_result = StringParser::PARSE_FAILURE;
+ break;
+ }
+ ts_slot.cast_to_date();
+ reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows,
*reinterpret_cast<int64_t*>(&ts_slot));
+ break;
+ }
+
+ case TYPE_DATETIME: {
+ vectorized::VecDateTimeValue ts_slot;
+ if (!ts_slot.from_date_str(data, len)) {
+ parse_result = StringParser::PARSE_FAILURE;
+ break;
+ }
+ ts_slot.to_datetime();
+ reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows,
*reinterpret_cast<int64_t*>(&ts_slot));
+ break;
+ }
+
+ case TYPE_DECIMALV2: {
+ DecimalV2Value decimal_slot;
+ if (decimal_slot.parse_from_str(data, len)) {
+ parse_result = StringParser::PARSE_FAILURE;
+ break;
+ }
+
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)
+ ->get_data()
+ .resize_fill(origin_size + rows, decimal_slot.value());
+ break;
+ }
+
+ default:
+ DCHECK(false) << "bad slot type: " << slot_desc->type();
+ break;
+ }
+
+ if (UNLIKELY(parse_result == StringParser::PARSE_FAILURE)) {
+ if (true == slot_desc->is_nullable()) {
+ auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr);
+ size_t size = nullable_column->get_null_map_data().size();
+ doris::vectorized::NullMap& null_map_data =
nullable_column->get_null_map_data();
+ for (int i = 1; i <= rows; ++i) {
+ null_map_data[size - i] = 1;
+ }
+ nullable_column->get_nested_column().insert_many_defaults(rows);
+ }
+ return false;
+ }
+ return true;
+}
+
void TextConverter::unescape_string(StringValue* value, MemPool* pool) {
char* new_data = reinterpret_cast<char*>(pool->allocate(value->len));
unescape_string(value->ptr, new_data, &value->len);
diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h
index a3d3fe5ff6..79deac95ff 100644
--- a/be/src/exec/text_converter.h
+++ b/be/src/exec/text_converter.h
@@ -56,6 +56,11 @@ public:
bool write_vec_column(const SlotDescriptor* slot_desc,
vectorized::IColumn* nullable_col_ptr,
const char* data, size_t len, bool copy_string, bool
need_escape);
+ /// Write consecutive rows of the same data.
+ bool write_vec_column(const SlotDescriptor* slot_desc,
vectorized::IColumn* nullable_col_ptr,
+ const char* data, size_t len, bool copy_string, bool
need_escape,
+ size_t rows);
+
// Removes escape characters from len characters of the null-terminated
string src,
// and copies the unescaped string into dest, changing *len to the
unescaped length.
// No null-terminator is added to dest.
diff --git a/be/src/vec/exec/format/generic_reader.h
b/be/src/vec/exec/format/generic_reader.h
index e098557b82..dd2bdd249c 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -19,6 +19,7 @@
#include "common/status.h"
#include "runtime/types.h"
+#include "vec/exprs/vexpr_context.h"
namespace doris::vectorized {
@@ -43,6 +44,24 @@ public:
return Status::NotSupported("get_parser_schema is not implemented for
this reader.");
}
virtual ~GenericReader() = default;
+
+ /// If the underlying FileReader has filled the partition&missing columns,
+ /// The FileScanner does not need to fill
+ bool fill_all_columns() const { return _fill_all_columns; }
+
+ /// Tell the underlying FileReader the partition&missing columns,
+ /// and the FileReader determine to fill columns or not.
+ /// Should set _fill_all_columns = true, if fill the columns.
+ virtual Status set_fill_columns(
+ const std::unordered_map<std::string, std::tuple<std::string,
const SlotDescriptor*>>&
+ partition_columns,
+ const std::unordered_map<std::string, VExprContext*>&
missing_columns) {
+ return Status::OK();
+ }
+
+protected:
+ /// Whether the underlying FileReader has filled the partition&missing
columns
+ bool _fill_all_columns = false;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0f71990b2b..e607cd510d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -33,13 +33,7 @@ RowGroupReader::RowGroupReader(doris::FileReader*
file_reader,
_row_group_meta(row_group),
_remaining_rows(row_group.num_rows),
_ctz(ctz),
- _vconjunct_ctx(lazy_read_ctx.vconjunct_ctx),
- _can_lazy_read(lazy_read_ctx.can_lazy_read),
- _resize_first_column(lazy_read_ctx.resize_first_column),
- _all_read_columns(lazy_read_ctx.all_read_columns),
- _predicate_columns(lazy_read_ctx.predicate_columns),
- _predicate_col_ids(lazy_read_ctx.predicate_col_ids),
- _lazy_read_columns(lazy_read_ctx.lazy_read_columns) {}
+ _lazy_read_ctx(lazy_read_ctx) {}
RowGroupReader::~RowGroupReader() {
_column_readers.clear();
@@ -54,7 +48,6 @@ Status RowGroupReader::init(const FieldDescriptor& schema,
std::vector<RowRange>
const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb
<< 20;
const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb <<
20;
size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE /
_read_columns.size());
- std::set<std::string> predicate_columns(_predicate_columns.begin(),
_predicate_columns.end());
for (auto& read_col : _read_columns) {
auto field =
const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
std::unique_ptr<ParquetColumnReader> reader;
@@ -70,13 +63,6 @@ Status RowGroupReader::init(const FieldDescriptor& schema,
std::vector<RowRange>
return Status::Corruption("Init row group reader failed");
}
_column_readers[read_col._file_slot_name] = std::move(reader);
- PrimitiveType column_type = field->type.type;
- if (column_type == TYPE_ARRAY || column_type == TYPE_MAP ||
column_type == TYPE_STRUCT) {
- _can_lazy_read = false;
- }
- }
- if (_vconjunct_ctx == nullptr) {
- _can_lazy_read = false;
}
return Status::OK();
}
@@ -85,16 +71,29 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
bool* _batch_eof) {
// Process external table query task that select columns are all from path.
if (_read_columns.empty()) {
- return _read_empty_batch(batch_size, read_rows, _batch_eof);
+ RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, _batch_eof));
+ RETURN_IF_ERROR(
+ _fill_partition_columns(block, *read_rows,
_lazy_read_ctx.partition_columns));
+ RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows,
_lazy_read_ctx.missing_columns));
+
+ Status st =
+ VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx,
block, block->columns());
+ *read_rows = block->rows();
+ return st;
}
- if (_can_lazy_read) {
+ if (_lazy_read_ctx.can_lazy_read) {
// call _do_lazy_read recursively when current batch is skipped
return _do_lazy_read(block, batch_size, read_rows, _batch_eof);
} else {
ColumnSelectVector run_length_vector;
- RETURN_IF_ERROR(_read_column_data(block, _all_read_columns,
batch_size, read_rows,
- _batch_eof, run_length_vector));
- Status st = VExprContext::filter_block(_vconjunct_ctx, block,
block->columns());
+ RETURN_IF_ERROR(_read_column_data(block,
_lazy_read_ctx.all_read_columns, batch_size,
+ read_rows, _batch_eof,
run_length_vector));
+ RETURN_IF_ERROR(
+ _fill_partition_columns(block, *read_rows,
_lazy_read_ctx.partition_columns));
+ RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows,
_lazy_read_ctx.missing_columns));
+
+ Status st =
+ VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx,
block, block->columns());
*read_rows = block->rows();
return st;
}
@@ -132,7 +131,6 @@ Status RowGroupReader::_read_column_data(Block* block,
const std::vector<std::st
col_idx++;
}
*read_rows = batch_read_rows;
- _read_rows += batch_read_rows;
*_batch_eof = has_eof;
return Status::OK();
}
@@ -143,19 +141,23 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
size_t pre_read_rows;
bool pre_eof;
ColumnSelectVector run_length_vector;
- RETURN_IF_ERROR(_read_column_data(block, _predicate_columns, batch_size,
&pre_read_rows,
- &pre_eof, run_length_vector));
+ RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns,
batch_size,
+ &pre_read_rows, &pre_eof,
run_length_vector));
+ RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
+
_lazy_read_ctx.predicate_partition_columns));
+ RETURN_IF_ERROR(
+ _fill_missing_columns(block, pre_read_rows,
_lazy_read_ctx.predicate_missing_columns));
// generate filter vector
- if (_resize_first_column) {
+ if (_lazy_read_ctx.resize_first_column) {
// VExprContext.execute has an optimization, the filtering is executed
when block->rows() > 0
// The following process may be tricky and time-consuming, but we have
no other way.
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
}
size_t origin_column_num = block->columns();
int filter_column_id = -1;
- RETURN_IF_ERROR(_vconjunct_ctx->execute(block, &filter_column_id));
+ RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block,
&filter_column_id));
ColumnPtr& sv = block->get_by_position(filter_column_id).column;
- if (_resize_first_column) {
+ if (_lazy_read_ctx.resize_first_column) {
// We have to clean the first column to insert right data.
block->get_by_position(0).column->assume_mutable()->clear();
}
@@ -167,10 +169,16 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
if (select_vector.filter_all() && !pre_eof) {
// If continuous batches are skipped, we can cache them to skip a
whole page
_cached_filtered_rows += pre_read_rows;
- for (auto& col : _predicate_columns) {
+ for (auto& col : _lazy_read_ctx.predicate_columns) {
// clean block to read predicate columns
block->get_by_name(col).column->assume_mutable()->clear();
}
+ for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
+ block->get_by_name(col.first).column->assume_mutable()->clear();
+ }
+ for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
+ block->get_by_name(col.first).column->assume_mutable()->clear();
+ }
Block::erase_useless_column(block, origin_column_num);
return _do_lazy_read(block, batch_size, read_rows, batch_eof);
}
@@ -184,8 +192,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
// lazy read columns
size_t lazy_read_rows;
bool lazy_eof;
- RETURN_IF_ERROR(_read_column_data(block, _lazy_read_columns,
pre_read_rows, &lazy_read_rows,
- &lazy_eof, select_vector));
+ RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns,
pre_read_rows,
+ &lazy_read_rows, &lazy_eof,
select_vector));
if (pre_read_rows != lazy_read_rows) {
return Status::Corruption("Can't read the same number of rows when
doing lazy read");
}
@@ -194,24 +202,29 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
// filter data in predicate columns, and remove filter column
if (select_vector.has_filter()) {
- Block::filter_block(block, _predicate_col_ids, filter_column_id,
origin_column_num);
+ Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids,
filter_column_id,
+ origin_column_num);
} else {
Block::erase_useless_column(block, origin_column_num);
}
+
size_t column_num = block->columns();
- size_t column_size = -1;
+ size_t column_size = 0;
for (int i = 0; i < column_num; ++i) {
size_t cz = block->get_by_position(i).column->size();
- if (column_size != -1) {
+ if (column_size != 0 && cz != 0) {
DCHECK_EQ(column_size, cz);
}
- column_size = cz;
+ if (cz != 0) {
+ column_size = cz;
+ }
}
_lazy_read_filtered_rows += pre_read_rows - column_size;
*read_rows = column_size;
*batch_eof = pre_eof;
- return Status::OK();
+ RETURN_IF_ERROR(_fill_partition_columns(block, column_size,
_lazy_read_ctx.partition_columns));
+ return _fill_missing_columns(block, column_size,
_lazy_read_ctx.missing_columns);
}
const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t
num_rows,
@@ -271,6 +284,60 @@ void
RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
select_vector.build(map, total_rows, false);
}
+Status RowGroupReader::_fill_partition_columns(
+ Block* block, size_t rows,
+ const std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>&
+ partition_columns) {
+ for (auto& kv : partition_columns) {
+ auto doris_column = block->get_by_name(kv.first).column;
+ IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
+ auto& [value, slot_desc] = kv.second;
+ if (!_text_converter->write_vec_column(slot_desc, col_ptr,
const_cast<char*>(value.c_str()),
+ value.size(), true, false,
rows)) {
+ return Status::InternalError("Failed to fill partition column:
{}={}",
+ slot_desc->col_name(), value);
+ }
+ }
+ return Status::OK();
+}
+
+Status RowGroupReader::_fill_missing_columns(
+ Block* block, size_t rows,
+ const std::unordered_map<std::string, VExprContext*>& missing_columns)
{
+ for (auto& kv : missing_columns) {
+ if (kv.second == nullptr) {
+ // no default column, fill with null
+ auto nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(
+
(*std::move(block->get_by_name(kv.first).column)).mutate().get());
+ nullable_column->insert_many_defaults(rows);
+ } else {
+ // fill with default value
+ auto* ctx = kv.second;
+ auto origin_column_num = block->columns();
+ int result_column_id = -1;
+ // PT1 => dest primitive type
+ RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
+ bool is_origin_column = result_column_id < origin_column_num;
+ if (!is_origin_column) {
+ // call resize because the first column of _src_block_ptr may
not be filled by reader,
+ // so _src_block_ptr->rows() may return wrong result, cause
the column created by `ctx->execute()`
+ // has only one row.
+
std::move(*block->get_by_position(result_column_id).column).mutate()->resize(rows);
+ auto result_column_ptr =
block->get_by_position(result_column_id).column;
+ // result_column_ptr maybe a ColumnConst, convert it to a
normal column
+ result_column_ptr =
result_column_ptr->convert_to_full_column_if_const();
+ auto origin_column_type = block->get_by_name(kv.first).type;
+ bool is_nullable = origin_column_type->is_nullable();
+ block->replace_by_position(
+ block->get_position_by_name(kv.first),
+ is_nullable ? make_nullable(result_column_ptr) :
result_column_ptr);
+ block->erase(result_column_id);
+ }
+ }
+ }
+ return Status::OK();
+}
+
Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows,
bool* _batch_eof) {
if (batch_size < _remaining_rows) {
*read_rows = batch_size;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index d1e0315c68..661d248f12 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -17,6 +17,7 @@
#pragma once
#include <common/status.h>
+#include "exec/text_converter.h"
#include "io/file_reader.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_context.h"
@@ -29,11 +30,22 @@ public:
struct LazyReadContext {
VExprContext* vconjunct_ctx = nullptr;
bool can_lazy_read = false;
+ // block->rows() returns the number of rows of the first column,
+ // so we should check and resize the first column
bool resize_first_column = true;
std::vector<std::string> all_read_columns;
+ // include predicate_partition_columns & predicate_missing_columns
+ std::vector<uint32_t> all_predicate_col_ids;
std::vector<std::string> predicate_columns;
- std::vector<uint32_t> predicate_col_ids;
std::vector<std::string> lazy_read_columns;
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ predicate_partition_columns;
+ // lazy read partition columns or all partition columns
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ partition_columns;
+ std::unordered_map<std::string, VExprContext*>
predicate_missing_columns;
+ // lazy read missing columns or all missing columns
+ std::unordered_map<std::string, VExprContext*> missing_columns;
};
RowGroupReader(doris::FileReader* file_reader,
@@ -45,7 +57,7 @@ public:
Status init(const FieldDescriptor& schema, std::vector<RowRange>&
row_ranges,
std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
Status next_batch(Block* block, size_t batch_size, size_t* read_rows,
bool* _batch_eof);
- int64_t lazy_read_filtered_rows() { return _lazy_read_filtered_rows; }
+ int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows;
}
ParquetColumnReader::Statistics statistics();
@@ -58,6 +70,13 @@ private:
const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool*
can_filter_all);
void _rebuild_select_vector(ColumnSelectVector& select_vector,
std::unique_ptr<uint8_t[]>& filter_map, size_t
pre_read_rows);
+ Status _fill_partition_columns(
+ Block* block, size_t rows,
+ const std::unordered_map<std::string, std::tuple<std::string,
const SlotDescriptor*>>&
+ partition_columns);
+ Status _fill_missing_columns(
+ Block* block, size_t rows,
+ const std::unordered_map<std::string, VExprContext*>&
missing_columns);
doris::FileReader* _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_column_readers;
@@ -65,20 +84,12 @@ private:
const int32_t _row_group_id;
const tparquet::RowGroup& _row_group_meta;
int64_t _remaining_rows;
- int64_t _read_rows = 0;
cctz::time_zone* _ctz;
- VExprContext* _vconjunct_ctx;
- bool _can_lazy_read;
- // block->rows() returns the number of rows of the first column,
- // so we should check and resize the first column
- const bool _resize_first_column;
- const std::vector<std::string>& _all_read_columns;
- const std::vector<std::string>& _predicate_columns;
- const std::vector<uint32_t>& _predicate_col_ids;
- const std::vector<std::string>& _lazy_read_columns;
+ const LazyReadContext& _lazy_read_ctx;
int64_t _lazy_read_filtered_rows = 0;
// If continuous batches are skipped, we can cache them to skip a whole
page
size_t _cached_filtered_rows = 0;
+ std::unique_ptr<TextConverter> _text_converter = nullptr;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 2ab2a1dfdc..63efce2882 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -160,13 +160,15 @@ Status ParquetReader::init_reader(
RETURN_IF_ERROR(_init_read_columns());
// build column predicates for column lazy read
_lazy_read_ctx.vconjunct_ctx = vconjunct_ctx;
- _init_lazy_read();
- RETURN_IF_ERROR(_init_row_group_readers());
- return Status::OK();
+ return _init_row_group_readers();
}
-void ParquetReader::_init_lazy_read() {
+Status ParquetReader::set_fill_columns(
+ const std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>&
+ partition_columns,
+ const std::unordered_map<std::string, VExprContext*>& missing_columns)
{
+ SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
std::unordered_map<std::string, uint32_t> predicate_columns;
std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) {
if (VSlotRef* slot_ref = typeid_cast<VSlotRef*>(expr)) {
@@ -200,27 +202,62 @@ void ParquetReader::_init_lazy_read() {
if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
visit_slot(_lazy_read_ctx.vconjunct_ctx->root());
}
+
+ bool has_complex_type = false;
+ const FieldDescriptor& schema = _file_metadata->schema();
for (auto& read_col : _read_columns) {
_lazy_read_ctx.all_read_columns.emplace_back(read_col._file_slot_name);
+ PrimitiveType column_type =
schema.get_column(read_col._file_slot_name)->type.type;
+ if (column_type == TYPE_ARRAY || column_type == TYPE_MAP ||
column_type == TYPE_STRUCT) {
+ has_complex_type = true;
+ }
if (predicate_columns.size() > 0) {
auto iter = predicate_columns.find(read_col._file_slot_name);
if (iter == predicate_columns.end()) {
_lazy_read_ctx.lazy_read_columns.emplace_back(read_col._file_slot_name);
} else {
_lazy_read_ctx.predicate_columns.emplace_back(iter->first);
- _lazy_read_ctx.predicate_col_ids.emplace_back(iter->second);
+
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
}
}
}
- if (_lazy_read_ctx.predicate_columns.size() > 0 &&
+
+ for (auto& kv : partition_columns) {
+ auto iter = predicate_columns.find(kv.first);
+ if (iter == predicate_columns.end()) {
+ _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
+ } else {
+ _lazy_read_ctx.predicate_partition_columns.emplace(kv.first,
kv.second);
+ _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
+ }
+ }
+
+ for (auto& kv : missing_columns) {
+ auto iter = predicate_columns.find(kv.first);
+ if (iter == predicate_columns.end()) {
+ _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
+ } else {
+ _lazy_read_ctx.predicate_missing_columns.emplace(kv.first,
kv.second);
+ _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
+ }
+ }
+
+ if (!has_complex_type && _lazy_read_ctx.predicate_columns.size() > 0 &&
_lazy_read_ctx.lazy_read_columns.size() > 0) {
- if (predicate_columns.size() ==
_lazy_read_ctx.predicate_columns.size()) {
- // TODO: support partition columns
- // _vconjunct_ctx has partition columns, and will push down to row
group reader.
- // However, row group reader can't get partition column values now.
- _lazy_read_ctx.can_lazy_read = true;
+ _lazy_read_ctx.can_lazy_read = true;
+ }
+
+ if (!_lazy_read_ctx.can_lazy_read) {
+ for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
+ _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
+ }
+ for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
+ _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
}
}
+
+ _fill_all_columns = true;
+ return Status::OK();
}
Status ParquetReader::_init_read_columns() {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 9f602e6901..b726a401ad 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -82,6 +82,11 @@ public:
Statistics& statistics() { return _statistics; }
+ Status set_fill_columns(
+ const std::unordered_map<std::string, std::tuple<std::string,
const SlotDescriptor*>>&
+ partition_columns,
+ const std::unordered_map<std::string, VExprContext*>&
missing_columns) override;
+
private:
struct ParquetProfile {
RuntimeProfile::Counter* filtered_row_groups;
@@ -108,7 +113,6 @@ private:
void _init_profile();
bool _next_row_group_reader();
- void _init_lazy_read();
Status _init_read_columns();
Status _init_row_group_readers();
// Page Index Filter
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index a2400b1dad..8e6aabbd47 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -151,10 +151,13 @@ Status VFileScanner::_get_block_impl(RuntimeState* state,
Block* block, bool* eo
if (read_rows > 0) {
// Convert the src block columns type to string in-place.
RETURN_IF_ERROR(_cast_to_input_block(block));
- // Fill rows in src block with partition columns from path. (e.g.
Hive partition columns)
- RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
- // Fill columns not exist in file with null or default value
- RETURN_IF_ERROR(_fill_missing_columns(read_rows));
+ // FileReader can fill partition and missing columns itself
+ if (!_cur_reader->fill_all_columns()) {
+ // Fill rows in src block with partition columns from path.
(e.g. Hive partition columns)
+ RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
+ // Fill columns not exist in file with null or default value
+ RETURN_IF_ERROR(_fill_missing_columns(read_rows));
+ }
// Apply _pre_conjunct_ctx_ptr to filter src block.
RETURN_IF_ERROR(_pre_filter_src_block());
// Convert src block to output block (dest block), string to dest
data type and apply filters.
@@ -261,10 +264,11 @@ Status VFileScanner::_fill_columns_from_path(size_t rows)
{
auto doris_column =
_src_block_ptr->get_by_name(slot_desc->col_name()).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
- for (size_t j = 0; j < rows; ++j) {
- _text_converter->write_vec_column(slot_desc, col_ptr,
-
const_cast<char*>(column_from_path.c_str()),
- column_from_path.size(),
true, false);
+ if (!_text_converter->write_vec_column(slot_desc, col_ptr,
+
const_cast<char*>(column_from_path.c_str()),
+ column_from_path.size(),
true, false, rows)) {
+ return Status::InternalError("Failed to fill partition column:
{}={}",
+ slot_desc->col_name(),
column_from_path);
}
}
}
@@ -473,8 +477,7 @@ Status VFileScanner::_get_next_reader() {
_cur_reader.reset(new ParquetReader(
_profile, _params, range, _file_col_names,
_state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj())));
- if (_push_down_expr == nullptr && _vconjunct_ctx != nullptr &&
- _partition_slot_descs.empty()) { // TODO: support partition
columns
+ if (_push_down_expr == nullptr && _vconjunct_ctx != nullptr) {
RETURN_IF_ERROR(_vconjunct_ctx->clone(_state,
&_push_down_expr));
_discard_conjuncts();
}
@@ -521,6 +524,7 @@ Status VFileScanner::_get_next_reader() {
_name_to_col_type.clear();
_missing_cols.clear();
_cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
+ RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
@@ -535,6 +539,48 @@ Status VFileScanner::_get_next_reader() {
return Status::OK();
}
+Status VFileScanner::_generate_fill_columns() {
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ partition_columns;
+ std::unordered_map<std::string, VExprContext*> missing_columns;
+
+ const TFileRangeDesc& range = _ranges.at(_next_range - 1);
+ if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
+ for (const auto& slot_desc : _partition_slot_descs) {
+ if (slot_desc) {
+ auto it = _partition_slot_index_map.find(slot_desc->id());
+ if (it == std::end(_partition_slot_index_map)) {
+ return Status::InternalError("Unknown source slot
descriptor, slot_id={}",
+ slot_desc->id());
+ }
+ const std::string& column_from_path =
range.columns_from_path[it->second];
+ partition_columns.emplace(slot_desc->col_name(),
+ std::make_tuple(column_from_path,
slot_desc));
+ }
+ }
+ }
+
+ if (!_missing_cols.empty()) {
+ for (auto slot_desc : _real_tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ if (_missing_cols.find(slot_desc->col_name()) ==
_missing_cols.end()) {
+ continue;
+ }
+
+ auto it = _col_default_value_ctx.find(slot_desc->col_name());
+ if (it == _col_default_value_ctx.end()) {
+ return Status::InternalError("failed to find default value
expr for slot: {}",
+ slot_desc->col_name());
+ }
+ missing_columns.emplace(slot_desc->col_name(), it->second);
+ }
+ }
+
+ return _cur_reader->set_fill_columns(partition_columns, missing_columns);
+}
+
Status VFileScanner::_init_expr_ctxes() {
DCHECK(!_ranges.empty());
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 3edd75a5ac..cfe26d9753 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -135,6 +135,7 @@ private:
Status _fill_missing_columns(size_t rows);
Status _pre_filter_src_block();
Status _convert_to_output_block(Block* block);
+ Status _generate_fill_columns();
void _reset_counter() {
_counter.num_rows_unselected = 0;
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 4b99ccae0b..743b2291cf 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -113,6 +113,10 @@ TEST_F(ParquetReaderTest, normal) {
std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
p_reader->init_reader(&colname_to_value_range, nullptr);
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ partition_columns;
+ std::unordered_map<std::string, VExprContext*> missing_columns;
+ p_reader->set_fill_columns(partition_columns, missing_columns);
Block* block = new Block();
for (const auto& slot_desc : tuple_desc->slots()) {
auto data_type =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]