This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new af5b34b791c [fix](parquet)Fixed the problem that when Parquert reader
use index to read files, there will be multiple threads modify same object.
(#50161) (#50415)
af5b34b791c is described below
commit af5b34b791c8c621a58d27d3584006297acc0446
Author: daidai <[email protected]>
AuthorDate: Tue May 6 10:00:37 2025 +0800
[fix](parquet)Fixed the problem that when Parquert reader use index to read
files, there will be multiple threads modify same object. (#50161) (#50415)
bp #50161
---
be/src/vec/exec/format/avro/avro_jni_reader.cpp | 2 +-
be/src/vec/exec/format/avro/avro_jni_reader.h | 4 +-
be/src/vec/exec/format/jni_reader.cpp | 2 +-
be/src/vec/exec/format/jni_reader.h | 4 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 4 +-
be/src/vec/exec/format/orc/vorc_reader.h | 6 +-
.../exec/format/parquet/vparquet_page_index.cpp | 2 +-
.../vec/exec/format/parquet/vparquet_page_index.h | 2 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 19 +-
be/src/vec/exec/format/parquet/vparquet_reader.h | 9 +-
be/src/vec/exec/format/table/hudi_jni_reader.cpp | 2 +-
be/src/vec/exec/format/table/hudi_jni_reader.h | 4 +-
be/src/vec/exec/format/table/iceberg_reader.cpp | 4 +-
be/src/vec/exec/format/table/iceberg_reader.h | 6 +-
.../vec/exec/format/table/lakesoul_jni_reader.cpp | 2 +-
be/src/vec/exec/format/table/lakesoul_jni_reader.h | 4 +-
.../exec/format/table/max_compute_jni_reader.cpp | 2 +-
.../vec/exec/format/table/max_compute_jni_reader.h | 4 +-
be/src/vec/exec/format/table/paimon_jni_reader.cpp | 2 +-
be/src/vec/exec/format/table/paimon_jni_reader.h | 4 +-
.../format/table/transactional_hive_reader.cpp | 2 +-
.../exec/format/table/transactional_hive_reader.h | 2 +-
.../format/table/trino_connector_jni_reader.cpp | 2 +-
.../exec/format/table/trino_connector_jni_reader.h | 2 +-
be/src/vec/exec/jni_connector.cpp | 4 +-
be/src/vec/exec/jni_connector.h | 5 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 2 +-
be/src/vec/exec/scan/vfile_scanner.h | 13 +-
.../exec/format/parquet/parquet_reader_test.cpp | 195 +++++++++++++++++++--
29 files changed, 243 insertions(+), 72 deletions(-)
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
index 03135aa5c94..6591abab58d 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
@@ -55,7 +55,7 @@ Status
AvroJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor
}
Status AvroJNIReader::init_fetch_table_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
std::ostringstream required_fields;
std::ostringstream columns_types;
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h
b/be/src/vec/exec/format/avro/avro_jni_reader.h
index 82388f32915..c8d55cf58cf 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.h
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.h
@@ -71,7 +71,7 @@ public:
std::unordered_set<std::string>* missing_cols) override;
Status init_fetch_table_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
TFileType::type get_file_type();
@@ -85,7 +85,7 @@ public:
private:
const TFileScanRangeParams _params;
const TFileRangeDesc _range;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/jni_reader.cpp
b/be/src/vec/exec/format/jni_reader.cpp
index 563f6cbea51..1f0f5818446 100644
--- a/be/src/vec/exec/format/jni_reader.cpp
+++ b/be/src/vec/exec/format/jni_reader.cpp
@@ -79,7 +79,7 @@ Status
MockJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor
}
Status MockJniReader::init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
diff --git a/be/src/vec/exec/format/jni_reader.h
b/be/src/vec/exec/format/jni_reader.h
index 3f156f598a0..487e1ee1e6b 100644
--- a/be/src/vec/exec/format/jni_reader.h
+++ b/be/src/vec/exec/format/jni_reader.h
@@ -89,7 +89,7 @@ public:
std::unordered_set<std::string>* missing_cols) override;
Status init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
Status close() override {
if (_jni_connector) {
@@ -106,7 +106,7 @@ protected:
}
private:
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 3a87202184a..193756bc64d 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -279,7 +279,7 @@ Status OrcReader::_create_file_reader() {
Status OrcReader::init_reader(
const std::vector<std::string>* column_names,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid, const
TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
@@ -694,7 +694,7 @@ bool static
build_search_argument(std::vector<OrcPredicate>& predicates, int ind
}
bool OrcReader::_init_search_argument(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
if ((!_enable_filter_by_min_max) || colname_to_value_range->empty()) {
return false;
}
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 3154c39a97d..dc9d565e803 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -142,7 +142,7 @@ public:
//If you want to read the file by index instead of column name, set
hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor*
row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
@@ -291,7 +291,7 @@ private:
static bool _check_acid_schema(const orc::Type& type);
static const orc::Type& _remove_acid(const orc::Type& type);
bool _init_search_argument(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
void _init_bloom_filter(
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
void _init_system_properties();
@@ -598,7 +598,7 @@ private:
std::vector<DecimalScaleParams> _decimal_scale_params;
size_t _decimal_scale_params_index;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
bool _is_acid = false;
std::unique_ptr<IColumn::Filter> _filter;
LazyReadContext _lazy_read_ctx;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
index 53fb1579c8e..0f5567887e5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
@@ -57,7 +57,7 @@ Status
PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
}
Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex*
column_index,
- ColumnValueRangeType&
col_val_range,
+ const ColumnValueRangeType&
col_val_range,
const FieldSchema* col_schema,
std::vector<int>& skipped_ranges,
const cctz::time_zone& ctz) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h
b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index 10998706999..3b30e1e8dd6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -47,7 +47,7 @@ public:
Status create_skipped_row_range(tparquet::OffsetIndex& offset_index, int
total_rows_of_group,
int page_idx, RowRange* row_range);
Status collect_skipped_page_range(tparquet::ColumnIndex* column_index,
- ColumnValueRangeType& col_val_range,
+ const ColumnValueRangeType&
col_val_range,
const FieldSchema* col_schema,
std::vector<int>& skipped_ranges, const
cctz::time_zone& ctz);
bool check_and_get_page_index_ranges(const
std::vector<tparquet::ColumnChunk>& columns);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index ae978453f8a..d669a57c609 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -295,7 +295,7 @@ void ParquetReader::iceberg_sanitize(const
std::vector<std::string>& read_column
Status ParquetReader::init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
@@ -346,7 +346,6 @@ Status ParquetReader::init_reader(
_missing_cols.emplace_back(name);
}
} else {
- std::unordered_map<std::string, ColumnValueRangeType>
new_colname_to_value_range;
const auto& table_column_idxs = _scan_params.column_idxs;
std::map<int, int> table_col_id_to_idx;
for (int i = 0; i < table_column_idxs.size(); i++) {
@@ -360,21 +359,15 @@ Status ParquetReader::init_reader(
auto& table_col = all_column_names[idx];
auto file_col = schema_desc.get_column(id)->name;
_read_columns.emplace_back(file_col);
+ _table_col_to_file_col[table_col] = file_col;
- if (table_col != file_col) {
- _table_col_to_file_col[table_col] = file_col;
- auto iter = _colname_to_value_range->find(table_col);
- if (iter != _colname_to_value_range->end()) {
- continue;
- }
- new_colname_to_value_range[file_col] = iter->second;
- _colname_to_value_range->erase(iter->first);
+ auto iter = _colname_to_value_range->find(table_col);
+ if (iter != _colname_to_value_range->end()) {
+ _colname_to_value_range_index_read.emplace(file_col,
iter->second);
}
}
}
- for (auto it : new_colname_to_value_range) {
- _colname_to_value_range->emplace(it.first, std::move(it.second));
- }
+ _colname_to_value_range = &_colname_to_value_range_index_read;
}
// build column predicates for column lazy read
_lazy_read_ctx.conjuncts = conjuncts;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 00db2652382..0d62bf9f3cd 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -111,7 +111,7 @@ public:
Status init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
@@ -250,7 +250,12 @@ private:
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
// table column name to file column name map. For iceberg schema evolution.
std::unordered_map<std::string, std::string> _table_col_to_file_col;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
+
+ // During initialization, multiple vfile_scanner's _colname_to_value_range
will point to the same object,
+ // so the content in the object cannot be modified (there is a
multi-threading problem).
+ // _colname_to_value_range_index_read used when _hive_use_column_names =
false.
+ std::unordered_map<std::string, ColumnValueRangeType>
_colname_to_value_range_index_read;
std::vector<std::string> _read_columns;
RowRange _whole_range = RowRange(0, 0);
const std::vector<int64_t>* _delete_rows = nullptr;
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
index 1e32e5e1712..eb88dda9512 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
@@ -96,7 +96,7 @@ Status
HudiJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor
}
Status HudiJniReader::init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h
b/be/src/vec/exec/format/table/hudi_jni_reader.h
index bfa0291a610..6fa4b0c836f 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.h
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.h
@@ -58,12 +58,12 @@ public:
std::unordered_set<std::string>* missing_cols) override;
Status init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
const TFileScanRangeParams& _scan_params;
const THudiFileDesc& _hudi_params;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index e4f8145a2fe..7dea5d99617 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -530,7 +530,7 @@ void
IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFil
Status IcebergParquetReader::init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
@@ -603,7 +603,7 @@ Status IcebergParquetReader
::_read_position_delete_file(const TFileRangeDesc* d
Status IcebergOrcReader::init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index ee7dcdd68d2..b057cb0657a 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -150,7 +150,7 @@ protected:
std::unordered_map<std::string, std::string> _file_col_to_table_col;
// table column name to file column name map. For iceberg schema evolution.
std::unordered_map<std::string, std::string> _table_col_to_file_col;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
// copy from _colname_to_value_range with new column name that is in
parquet/orc file, to support schema evolution.
std::unordered_map<std::string, ColumnValueRangeType>
_new_colname_to_value_range;
// column id to name map. Collect from FE slot descriptor.
@@ -205,7 +205,7 @@ public:
Status init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
@@ -251,7 +251,7 @@ public:
Status init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
index c285b72df25..2d03e95c215 100644
--- a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
@@ -74,7 +74,7 @@ Status
LakeSoulJniReader::get_columns(std::unordered_map<std::string, TypeDescri
}
Status LakeSoulJniReader::init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.h
b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
index dc0db6c2c5d..fa6aa062d9f 100644
--- a/be/src/vec/exec/format/table/lakesoul_jni_reader.h
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
@@ -57,14 +57,14 @@ public:
std::unordered_set<std::string>* missing_cols) override;
Status init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
const TLakeSoulFileDesc& _lakesoul_params;
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state;
RuntimeProfile* _profile;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index 665e19b6bce..cf50aad9c22 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -100,7 +100,7 @@ Status MaxComputeJniReader::get_columns(
}
Status MaxComputeJniReader::init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h
b/be/src/vec/exec/format/table/max_compute_jni_reader.h
index 9bfef59432d..56be385f4b6 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.h
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h
@@ -65,13 +65,13 @@ public:
std::unordered_set<std::string>* missing_cols) override;
Status init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
const MaxComputeTableDescriptor* _table_desc = nullptr;
const TMaxComputeFileDesc& _max_compute_params;
const TFileRangeDesc& _range;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index 83705426e89..e902db8bc42 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -92,7 +92,7 @@ Status
PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescript
}
Status PaimonJniReader::init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 220a6f1f2e9..3ac2229e655 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -64,10 +64,10 @@ public:
std::unordered_set<std::string>* missing_cols) override;
Status init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index f840b0af252..8be11f6773a 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -56,7 +56,7 @@
TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader>
Status TransactionalHiveReader::init_reader(
const std::vector<std::string>& column_names,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h
b/be/src/vec/exec/format/table/transactional_hive_reader.h
index a24fee18694..23a691d037b 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.h
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.h
@@ -98,7 +98,7 @@ public:
Status init_reader(
const std::vector<std::string>& column_names,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
index eaced8572b9..3a7b28b91a4 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
@@ -78,7 +78,7 @@ TrinoConnectorJniReader::TrinoConnectorJniReader(
}
Status TrinoConnectorJniReader::init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
RETURN_IF_ERROR(_set_spi_plugins_dir());
return _jni_connector->open(_state, _profile);
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
b/be/src/vec/exec/format/table/trino_connector_jni_reader.h
index de0cf21a881..cb0461bb4a6 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
+++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.h
@@ -58,7 +58,7 @@ public:
std::unordered_set<std::string>* missing_cols) override;
Status init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
Status _set_spi_plugins_dir();
diff --git a/be/src/vec/exec/jni_connector.cpp
b/be/src/vec/exec/jni_connector.cpp
index a7b0d5144ee..9940821fcfc 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -91,7 +91,7 @@ Status JniConnector::open(RuntimeState* state,
RuntimeProfile* profile) {
}
Status JniConnector::init(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
// TODO: This logic need to be changed.
// See the comment of "predicates" field in JniScanner.java
@@ -419,7 +419,7 @@ Status JniConnector::_fill_struct_column(TableMetaAddress&
address, MutableColum
}
void JniConnector::_generate_predicates(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
if (colname_to_value_range == nullptr) {
return;
}
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index a033a652063..df4c85e7614 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -222,7 +222,8 @@ public:
* number_filters(4) | length(4) | column_name | op(4) | scale(4) |
num_values(4) | value_length(4) | value | ...
* Then, pass the byte array address in configuration map, like
"push_down_predicates=${address}"
*/
- Status init(std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ Status init(
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
/**
* Call java side function JniScanner.getNextBatchMeta. The columns
information are stored as long array:
@@ -353,7 +354,7 @@ private:
}
void _generate_predicates(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
template <PrimitiveType primitive_type>
void _parse_value_range(const ColumnValueRange<primitive_type>&
col_val_range,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 5917c3bed76..a22777672b6 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -88,7 +88,7 @@ VFileScanner::VFileScanner(
RuntimeState* state, pipeline::FileScanLocalState* local_state,
int64_t limit,
std::shared_ptr<vectorized::SplitSourceConnector> split_source,
RuntimeProfile* profile,
ShardedKVCache* kv_cache,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const std::unordered_map<std::string, int>* colname_to_slot_id)
: VScanner(state, local_state, limit, profile),
_split_source(split_source),
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 8cb288ac81b..f68cc4a66e3 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -61,11 +61,12 @@ class VFileScanner : public VScanner {
public:
static constexpr const char* NAME = "VFileScanner";
- VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* parent,
int64_t limit,
- std::shared_ptr<vectorized::SplitSourceConnector>
split_source,
- RuntimeProfile* profile, ShardedKVCache* kv_cache,
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
- const std::unordered_map<std::string, int>*
colname_to_slot_id);
+ VFileScanner(
+ RuntimeState* state, pipeline::FileScanLocalState* parent, int64_t
limit,
+ std::shared_ptr<vectorized::SplitSourceConnector> split_source,
RuntimeProfile* profile,
+ ShardedKVCache* kv_cache,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
+ const std::unordered_map<std::string, int>* colname_to_slot_id);
Status open(RuntimeState* state) override;
@@ -99,7 +100,7 @@ protected:
std::unique_ptr<GenericReader> _cur_reader;
bool _cur_reader_eof;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
// File source slot descriptors
std::vector<SlotDescriptor*> _file_slot_descs;
// col names from _file_slot_descs
diff --git a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
index cbc6c0ee3d7..423adfd41ce 100644
--- a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp
@@ -56,10 +56,9 @@ public:
ParquetReaderTest() {}
};
-TEST_F(ParquetReaderTest, normal) {
- TDescriptorTable t_desc_table;
- TTableDescriptor t_table_desc;
-
+static void create_table_desc(TDescriptorTable& t_desc_table,
TTableDescriptor& t_table_desc,
+ std::vector<std::string> table_column_names,
+ std::vector<TPrimitiveType::type> types) {
t_table_desc.id = 0;
t_table_desc.tableType = TTableType::OLAP_TABLE;
t_table_desc.numCols = 0;
@@ -68,10 +67,7 @@ TEST_F(ParquetReaderTest, normal) {
t_desc_table.__isset.tableDescriptors = true;
// init boolean and numeric slot
- std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col",
"smallint_col",
- "int_col", "bigint_col",
"float_col",
- "double_col"};
- for (int i = 0; i < numeric_types.size(); i++) {
+ for (int i = 0; i < table_column_names.size(); i++) {
TSlotDescriptor tslot_desc;
{
tslot_desc.id = i;
@@ -81,7 +77,7 @@ TEST_F(ParquetReaderTest, normal) {
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
- scalar_type.__set_type(TPrimitiveType::type(i + 2));
+ scalar_type.__set_type(types[i]);
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
@@ -90,7 +86,7 @@ TEST_F(ParquetReaderTest, normal) {
tslot_desc.byteOffset = 0;
tslot_desc.nullIndicatorByte = 0;
tslot_desc.nullIndicatorBit = -1;
- tslot_desc.colName = numeric_types[i];
+ tslot_desc.colName = table_column_names[i];
tslot_desc.slotIdx = 0;
tslot_desc.isMaterialized = true;
t_desc_table.slotDescriptors.push_back(tslot_desc);
@@ -108,6 +104,19 @@ TEST_F(ParquetReaderTest, normal) {
t_tuple_desc.__isset.tableId = true;
t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
}
+};
+
+TEST_F(ParquetReaderTest, normal) {
+ TDescriptorTable t_desc_table;
+ TTableDescriptor t_table_desc;
+ std::vector<std::string> table_column_names = {"boolean_col",
"tinyint_col", "smallint_col",
+ "int_col",
"bigint_col", "float_col",
+ "double_col"};
+ std::vector<TPrimitiveType::type> table_column_types = {
+ TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT,
TPrimitiveType::SMALLINT,
+ TPrimitiveType::INT, TPrimitiveType::BIGINT,
TPrimitiveType::FLOAT,
+ TPrimitiveType::DOUBLE};
+ create_table_desc(t_desc_table, t_table_desc, table_column_names,
table_column_types);
DescriptorTbl* desc_tbl;
ObjectPool obj_pool;
static_cast<void>(DescriptorTbl::create(&obj_pool, t_desc_table,
&desc_tbl));
@@ -115,8 +124,10 @@ TEST_F(ParquetReaderTest, normal) {
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
auto local_fs = io::global_local_filesystem();
io::FileReaderSPtr reader;
- static_cast<void>(local_fs->open_file(
- "./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
&reader));
+ static_cast<void>(
+ local_fs->open_file("./be/test/exec/test_data/"
+ "parquet_scanner/type-decoder.parquet",
+ &reader));
cctz::time_zone ctz;
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
@@ -164,5 +175,165 @@ TEST_F(ParquetReaderTest, normal) {
delete p_reader;
}
+static ParquetReader* create_parquet_reader(TFileScanRangeParams& scan_params,
+ std::vector<std::string>
table_column_names,
+ std::vector<TPrimitiveType::type>
types) {
+ TDescriptorTable t_desc_table;
+ TTableDescriptor t_table_desc;
+
+ create_table_desc(t_desc_table, t_table_desc, table_column_names, types);
+ DescriptorTbl* desc_tbl;
+ ObjectPool obj_pool;
+ static_cast<void>(DescriptorTbl::create(&obj_pool, t_desc_table,
&desc_tbl));
+
+ auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
+ auto local_fs = io::global_local_filesystem();
+ io::FileReaderSPtr reader;
+ static_cast<void>(
+ local_fs->open_file("./be/test/exec/test_data/"
+ "parquet_scanner/type-decoder.parquet",
+ &reader));
+
+ cctz::time_zone ctz;
+ TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
+ std::vector<std::string> column_names;
+ std::vector<std::string> missing_column_names;
+ for (int i = 0; i < slot_descs.size(); i++) {
+ column_names.push_back(slot_descs[i]->col_name());
+ }
+ TFileRangeDesc scan_range;
+ {
+ scan_range.start_offset = 0;
+ scan_range.size = 1000;
+ }
+ auto p_reader =
+ new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz,
nullptr, nullptr);
+ p_reader->set_file_reader(reader);
+ return p_reader;
+}
+
+TEST_F(ParquetReaderTest, use_column_name) {
+ bool use_column_name = true;
+
+ std::vector<std::string> table_column_names = {"boolean_col",
"tinyint_col", "smallint_col",
+ "int_col",
"bigint_col", "float_col",
+ "double_col"};
+ std::vector<TPrimitiveType::type> table_column_types = {
+ TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT,
TPrimitiveType::SMALLINT,
+ TPrimitiveType::INT, TPrimitiveType::BIGINT,
TPrimitiveType::FLOAT,
+ TPrimitiveType::DOUBLE};
+ TFileScanRangeParams scan_params;
+
+ auto p_reader = create_parquet_reader(scan_params, table_column_names,
table_column_types);
+ std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
+ colname_to_value_range.emplace("boolean_col",
ColumnValueRange<TYPE_BOOLEAN>("boolean_col"));
+ colname_to_value_range.emplace("tinyint_col",
ColumnValueRange<TYPE_TINYINT>("tinyint_col"));
+ colname_to_value_range.emplace("smallint_col",
ColumnValueRange<TYPE_SMALLINT>("smallint_col"));
+ colname_to_value_range.emplace("int_col",
ColumnValueRange<TYPE_INT>("int_col"));
+
+ static_cast<void>(p_reader->open());
+ static_cast<void>(p_reader->init_reader(table_column_names, {},
&colname_to_value_range, {},
+ nullptr, nullptr, nullptr,
nullptr, nullptr, false,
+ use_column_name));
+
+ std::vector<std::string> read_columns_ans = {"tinyint_col",
"smallint_col", "int_col",
+ "bigint_col", "boolean_col",
"float_col",
+ "double_col"};
+ EXPECT_EQ(p_reader->_read_columns, read_columns_ans);
+
+ std::vector<std::string> miss_columns_ans = {};
+ EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans);
+ std::vector<std::string> colname_to_value_range_names_ans =
{"tinyint_col", "smallint_col",
+ "int_col",
"boolean_col"};
+ for (auto col : colname_to_value_range_names_ans) {
+ EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col));
+ }
+ EXPECT_EQ(p_reader->_colname_to_value_range->size(),
colname_to_value_range_names_ans.size());
+ delete p_reader;
+}
+
+TEST_F(ParquetReaderTest, use_column_name2) {
+ bool use_column_name = true;
+
+ std::vector<std::string> table_column_names = {"boolean_col",
"tinyint_col", "smallint_col",
+ "int_col",
"bigint_col", "float_col",
+ "test1",
"double_col", "test2"};
+ std::vector<TPrimitiveType::type> table_column_types = {
+ TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT,
TPrimitiveType::SMALLINT,
+ TPrimitiveType::INT, TPrimitiveType::BIGINT,
TPrimitiveType::FLOAT,
+ TPrimitiveType::FLOAT, TPrimitiveType::DOUBLE,
TPrimitiveType::DOUBLE};
+ TFileScanRangeParams scan_params;
+
+ auto p_reader = create_parquet_reader(scan_params, table_column_names,
table_column_types);
+ std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
+ colname_to_value_range.emplace("boolean_col",
ColumnValueRange<TYPE_BOOLEAN>("boolean_col"));
+ colname_to_value_range.emplace("tinyint_col",
ColumnValueRange<TYPE_TINYINT>("tinyint_col"));
+ colname_to_value_range.emplace("smallint_col",
ColumnValueRange<TYPE_SMALLINT>("smallint_col"));
+ colname_to_value_range.emplace("int_col",
ColumnValueRange<TYPE_INT>("int_col"));
+
+ static_cast<void>(p_reader->open());
+ static_cast<void>(p_reader->init_reader(table_column_names,
{"boolean_col"},
+ &colname_to_value_range, {},
nullptr, nullptr, nullptr,
+ nullptr, nullptr, false,
use_column_name));
+
+ std::vector<std::string> read_columns_ans = {"tinyint_col",
"smallint_col", "int_col",
+ "bigint_col", "float_col",
"double_col"};
+ EXPECT_EQ(p_reader->_read_columns, read_columns_ans);
+
+ std::vector<std::string> miss_columns_ans = {"boolean_col", "test1",
"test2"};
+ EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans);
+ std::vector<std::string> colname_to_value_range_names_ans =
{"tinyint_col", "smallint_col",
+ "int_col",
"boolean_col"};
+ for (auto col : colname_to_value_range_names_ans) {
+ EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col));
+ }
+ EXPECT_EQ(p_reader->_colname_to_value_range->size(),
colname_to_value_range_names_ans.size());
+ delete p_reader;
+}
+
+TEST_F(ParquetReaderTest, use_column_idx) {
+ bool use_column_name = false;
+
+ std::vector<std::string> table_column_names = {"col0", "col1", "col3",
+ "col7", "col100", "col102"};
+ std::vector<TPrimitiveType::type> table_column_types = {
+ TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT,
TPrimitiveType::SMALLINT,
+ TPrimitiveType::INT, TPrimitiveType::BIGINT,
TPrimitiveType::BIGINT};
+ TFileScanRangeParams scan_params;
+ scan_params.column_idxs.emplace_back(0);
+ scan_params.column_idxs.emplace_back(1);
+ scan_params.column_idxs.emplace_back(3);
+ scan_params.column_idxs.emplace_back(7);
+ scan_params.column_idxs.emplace_back(100);
+ scan_params.column_idxs.emplace_back(102);
+
+ auto p_reader = create_parquet_reader(scan_params, table_column_names,
table_column_types);
+ std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
+ colname_to_value_range.emplace("col0",
ColumnValueRange<TYPE_BOOLEAN>("col0"));
+ colname_to_value_range.emplace("col1",
ColumnValueRange<TYPE_TINYINT>("col1"));
+ colname_to_value_range.emplace("col3",
ColumnValueRange<TYPE_SMALLINT>("col3"));
+ colname_to_value_range.emplace("col102",
ColumnValueRange<TYPE_SMALLINT>("col102"));
+
+ static_cast<void>(p_reader->open());
+ static_cast<void>(p_reader->init_reader(table_column_names, {},
&colname_to_value_range, {},
+ nullptr, nullptr, nullptr,
nullptr, nullptr, false,
+ use_column_name));
+
+ std::vector<std::string> read_columns_ans = {"tinyint_col",
"smallint_col", "bigint_col",
+ "string_col"};
+ EXPECT_EQ(p_reader->_read_columns, read_columns_ans);
+
+ std::vector<std::string> miss_columns_ans = {"col100", "col102"};
+ EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans);
+
+ std::vector<std::string> colname_to_value_range_names_ans =
{"tinyint_col", "smallint_col",
+ "bigint_col"};
+ for (auto col : colname_to_value_range_names_ans) {
+ EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col));
+ }
+ EXPECT_EQ(p_reader->_colname_to_value_range->size(),
colname_to_value_range_names_ans.size());
+ delete p_reader;
+}
+
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]