This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1f0c45204bc [fix](iceberg) read the primary key columns if hasing
equality delete (#34884)
1f0c45204bc is described below
commit 1f0c45204bcbd8a2b83d53c1f6a519ca6e54991c
Author: Ashin Gau <[email protected]>
AuthorDate: Wed May 15 11:37:25 2024 +0800
[fix](iceberg) read the primary key columns if hasing equality delete
(#34884)
backport: #34835
---
be/src/vec/exec/format/table/iceberg_reader.cpp | 40 ++++++++++++++++++++--
be/src/vec/exec/format/table/iceberg_reader.h | 7 ++++
be/src/vec/exec/scan/vfile_scanner.cpp | 4 ---
.../iceberg/iceberg_equality_delete.out | 24 +++++++++++++
.../iceberg/iceberg_equality_delete.groovy | 8 +++++
5 files changed, 77 insertions(+), 6 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index f9de386e595..8e4cdbd311a 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -115,6 +115,7 @@ Status IcebergTableReader::get_next_block(Block* block,
size_t* read_rows, bool*
return Status::OK();
}
+ RETURN_IF_ERROR(_expand_block_if_need(block));
// To support iceberg schema evolution. We change the column name in block
to
// make it match with the column name in parquet file before reading data.
and
@@ -130,7 +131,7 @@ Status IcebergTableReader::get_next_block(Block* block,
size_t* read_rows, bool*
block->initialize_index_by_name();
}
- auto res = _file_format_reader->get_next_block(block, read_rows, eof);
+ RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows,
eof));
// Set the name back to table column name before return this block.
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
@@ -147,7 +148,7 @@ Status IcebergTableReader::get_next_block(Block* block,
size_t* read_rows, bool*
RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block));
*read_rows = block->rows();
}
- return res;
+ return _shrink_block_if_need(block);
}
Status IcebergTableReader::set_fill_columns(
@@ -253,6 +254,21 @@ Status IcebergTableReader::_equality_delete_base(
}
}
}
+ for (int i = 0; i < equality_delete_col_names.size(); ++i) {
+ const std::string& delete_col = equality_delete_col_names[i];
+ if (std::find(_all_required_col_names.begin(),
_all_required_col_names.end(), delete_col) ==
+ _all_required_col_names.end()) {
+ _expand_col_names.emplace_back(delete_col);
+ DataTypePtr data_type =
DataTypeFactory::instance().create_data_type(
+ equality_delete_col_types[i], true);
+ MutableColumnPtr data_column = data_type->create_column();
+ _expand_columns.emplace_back(
+ ColumnWithTypeAndName(std::move(data_column), data_type,
delete_col));
+ }
+ }
+ for (const std::string& delete_col : _expand_col_names) {
+ _all_required_col_names.emplace_back(delete_col);
+ }
_equality_delete_impl =
EqualityDeleteBase::get_delete_impl(&_equality_delete_block);
return _equality_delete_impl->init(_profile);
}
@@ -269,6 +285,24 @@ void IcebergTableReader::_generate_equality_delete_block(
}
}
+Status IcebergTableReader::_expand_block_if_need(Block* block) {
+ for (auto& col : _expand_columns) {
+ col.column->assume_mutable()->clear();
+ if (block->try_get_by_name(col.name)) {
+ return Status::InternalError("Wrong expand column '{}'", col.name);
+ }
+ block->insert(col);
+ }
+ return Status::OK();
+}
+
+Status IcebergTableReader::_shrink_block_if_need(Block* block) {
+ for (const std::string& expand_col : _expand_col_names) {
+ block->erase(expand_col);
+ }
+ return Status::OK();
+}
+
Status IcebergTableReader::_position_delete_base(
const std::vector<TIcebergDeleteFileDesc>& delete_files) {
std::string data_file_path = _range.path;
@@ -534,6 +568,7 @@ Status IcebergParquetReader::init_reader(
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
+ RETURN_IF_ERROR(init_row_filters(_range));
return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names,
&_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
@@ -606,6 +641,7 @@ Status IcebergOrcReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
+ RETURN_IF_ERROR(init_row_filters(_range));
return orc_reader->init_reader(&_all_required_col_names,
&_new_colname_to_value_range,
conjuncts, false, tuple_descriptor,
row_descriptor,
not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts);
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 81c5613d681..cda50015911 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -137,6 +137,10 @@ protected:
void _generate_equality_delete_block(
Block* block, const std::vector<std::string>&
equality_delete_col_names,
const std::vector<TypeDescriptor>& equality_delete_col_types);
+ // Equality delete should read the primary columns. Add the missing columns
+ Status _expand_block_if_need(Block* block);
+ // Remove the added delete columns
+ Status _shrink_block_if_need(Block* block);
RuntimeProfile* _profile;
RuntimeState* _state;
@@ -161,6 +165,9 @@ protected:
std::vector<std::string> _all_required_col_names;
// col names in table but not in parquet,orc file
std::vector<std::string> _not_in_file_col_names;
+ // equality delete should read the primary columns
+ std::vector<std::string> _expand_col_names;
+ std::vector<ColumnWithTypeAndName> _expand_columns;
io::IOContext* _io_ctx;
bool _has_schema_change = false;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 32828eaa796..03a79c0a18b 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -838,8 +838,6 @@ Status VFileScanner::_get_next_reader() {
_push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
_col_name_to_slot_id,
&_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
-
- RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader = std::move(iceberg_reader);
} else {
std::vector<std::string> place_holder;
@@ -891,8 +889,6 @@ Status VFileScanner::_get_next_reader() {
_push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
_col_name_to_slot_id,
&_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
-
- RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader = std::move(iceberg_reader);
} else {
init_status = orc_reader->init_reader(
diff --git
a/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
b/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
index 2f7f599929b..44cf1b7b2bf 100644
--- a/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
+++ b/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
@@ -41,6 +41,18 @@
19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18
28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are
slyly at the accounts. furiously regular in
20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40
FURNITURE g alongside of the special excuses-- fluffily enticing packages
wake
+-- !count1 --
+19
+
+-- !count1_orc --
+19
+
+-- !max1 --
+update-comment-2
+
+-- !max1_orc --
+update-comment-2
+
-- !one_delete_column --
1 Customer#000000001 IVhzIApeRb ot,c,E 151 update-phone-1
711.56 BUILDING update-comment-1
2 Customer#000000002 XSTf4,NCwDVaWNe6tEgvwfmRchLXak 13
23-768-687-3665 121.65 AUTOMOBILE l accounts. blithely ironic theodolites
integrate boldly: caref
@@ -83,3 +95,15 @@
19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18
28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are
slyly at the accounts. furiously regular in
20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40
FURNITURE g alongside of the special excuses-- fluffily enticing packages
wake
+-- !count3 --
+19
+
+-- !count3_orc --
+19
+
+-- !max3 --
+update-comment-2
+
+-- !max3_orc --
+update-comment-2
+
diff --git
a/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
b/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
index a5a21b3da6a..984181ac2f3 100644
---
a/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
+++
b/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
@@ -40,9 +40,17 @@ suite("iceberg_equality_delete",
"p2,external,iceberg,external_remote,external_r
// one delete column
qt_one_delete_column """select * from customer_flink_one order by
c_custkey"""
qt_one_delete_column_orc """select * from customer_flink_one_orc order
by c_custkey"""
+ qt_count1 """select count(*) from customer_flink_one"""
+ qt_count1_orc """select count(*) from customer_flink_one_orc"""
+ qt_max1 """select max(c_comment) from customer_flink_one"""
+ qt_max1_orc """select max(c_comment) from customer_flink_one_orc"""
// three delete columns
qt_one_delete_column """select * from customer_flink_three order by
c_custkey"""
qt_one_delete_column_orc """select * from customer_flink_three_orc
order by c_custkey"""
+ qt_count3 """select count(*) from customer_flink_three"""
+ qt_count3_orc """select count(*) from customer_flink_three_orc"""
+ qt_max3 """select max(c_comment) from customer_flink_three"""
+ qt_max3_orc """select max(c_comment) from customer_flink_three_orc"""
sql """drop catalog ${catalog_name}"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]