This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 2f3cc5fa100 [improvement](multi-catalog) push down all predicates into
rowgroup/page filtering for ParquetReader (#16388) (#24235)
2f3cc5fa100 is described below
commit 2f3cc5fa100fcd6ae93083a927f83dcce7d04a66
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Sep 13 16:26:00 2023 +0800
[improvement](multi-catalog) push down all predicates into rowgroup/page
filtering for ParquetReader (#16388) (#24235)
Tow improvements:
1. Refactor rowgroup&page filtering in `ParquetReader`, and use the
operator overloading of Doris native c++ type to process comparison.
2. Support decimal/decimal v3/date/datev2/datetime/datetimev2
cherry-pick from #16388
---
be/src/vec/exec/format/parquet/parquet_common.cpp | 4 -
be/src/vec/exec/format/parquet/parquet_common.h | 8 +-
be/src/vec/exec/format/parquet/parquet_pred_cmp.h | 825 ++++++++-------------
.../exec/format/parquet/vparquet_page_index.cpp | 8 +-
.../vec/exec/format/parquet/vparquet_page_index.h | 2 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 6 +-
6 files changed, 337 insertions(+), 516 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 5f8656fcd96..1449d1bdf37 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -29,10 +29,6 @@ const uint32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS =
2440588;
const uint64_t ParquetInt96::MICROS_IN_DAY = 86400000000;
const uint64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000;
-inline uint64_t ParquetInt96::to_timestamp_micros() const {
- return (hi - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY + lo /
NANOS_PER_MICROSECOND;
-}
-
#define FOR_LOGICAL_NUMERIC_TYPES(M) \
M(TypeIndex::Int8, Int8) \
M(TypeIndex::UInt8, UInt8) \
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h
b/be/src/vec/exec/format/parquet/parquet_common.h
index a3d3e0d8a45..28f5797f76d 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -68,7 +68,9 @@ struct ParquetInt96 {
uint64_t lo; // time of nanoseconds in a day
uint32_t hi; // days from julian epoch
- inline uint64_t to_timestamp_micros() const;
+ inline uint64_t to_timestamp_micros() const {
+ return (hi - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY + lo /
NANOS_PER_MICROSECOND;
+ }
static const uint32_t JULIAN_EPOCH_OFFSET_DAYS;
static const uint64_t MICROS_IN_DAY;
@@ -361,7 +363,6 @@ Status
FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() -
select_vector.num_filtered());
size_t dict_index = 0;
- int64_t scale_to_micro = _decode_params->scale_to_nano_factor / 1000;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
@@ -373,7 +374,8 @@ Status
FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
v.from_unixtime(date_value / _decode_params->second_mask,
*_decode_params->ctz);
if constexpr (std::is_same_v<CppType,
DateV2Value<DateTimeV2ValueType>>) {
// nanoseconds will be ignored.
- v.set_microsecond((date_value %
_decode_params->second_mask) * scale_to_micro);
+ v.set_microsecond((date_value %
_decode_params->second_mask) *
+ _decode_params->scale_to_nano_factor /
1000);
// TODO: the precision of datetime v1
}
_FIXED_SHIFT_DATA_OFFSET();
diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
index 06db698a9cd..727e0e57f29 100644
--- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
+++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
@@ -21,541 +21,362 @@
#include <vector>
#include "exec/olap_common.h"
+#include "parquet_common.h"
namespace doris::vectorized {
-#define _PLAIN_DECODE(T, value, min_bytes, max_bytes, out_value, out_min,
out_max) \
- const T out_min = reinterpret_cast<const T*>(min_bytes)[0];
\
- const T out_max = reinterpret_cast<const T*>(max_bytes)[0];
\
- T out_value = *((T*)value);
+class ParquetPredicate {
+#define FOR_REINTERPRET_TYPES(M) \
+ M(TYPE_BOOLEAN, tparquet::Type::BOOLEAN) \
+ M(TYPE_TINYINT, tparquet::Type::INT32) \
+ M(TYPE_SMALLINT, tparquet::Type::INT32) \
+ M(TYPE_INT, tparquet::Type::INT32) \
+ M(TYPE_BIGINT, tparquet::Type::INT64) \
+ M(TYPE_FLOAT, tparquet::Type::FLOAT) \
+ M(TYPE_DOUBLE, tparquet::Type::DOUBLE)
-#define _PLAIN_DECODE_SINGLE(T, value, bytes, conjunct_value, out) \
- const T out = reinterpret_cast<const T*>(bytes)[0]; \
- T conjunct_value = *((T*)value);
+private:
+ struct ScanPredicate {
+ ScanPredicate() = default;
+ ~ScanPredicate() = default;
+ SQLFilterOp op;
+ std::vector<const void*> values;
+ int scale;
-#define _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) \
- if (conjunct_value < min || conjunct_value > max) { \
- return true; \
- }
-
-#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
- std::vector<T> in_values; \
- for (auto val : in_pred_values) { \
- T value = reinterpret_cast<T*>(val)[0]; \
- in_values.emplace_back(value); \
- } \
- if (in_values.empty()) { \
- return false; \
- } \
- auto result = std::minmax_element(in_values.begin(), in_values.end()); \
- T in_min = *result.first; \
- T in_max = *result.second; \
- const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \
- const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \
- if (in_max < group_min || in_min > group_max) { \
- return true; \
- }
-
-struct ColumnMinMaxParams {
- PrimitiveType conjunct_type;
- tparquet::Type::type parquet_type;
- void* value;
- // Use for decimal type
- int32_t parquet_precision;
- int32_t parquet_scale;
- int32_t parquet_type_length;
- // Use for in predicate
- std::vector<void*> in_pred_values;
- const char* min_bytes;
- const char* max_bytes;
-};
-
-template <typename T>
-static void _align_decimal_v2_scale(T* conjunct_value, int32_t value_scale, T*
parquet_value,
- int32_t parquet_scale) {
- if (value_scale > parquet_scale) {
- *parquet_value = *parquet_value * common::exp10_i32(value_scale -
parquet_scale);
- } else if (value_scale < parquet_scale) {
- *conjunct_value = *conjunct_value * common::exp10_i32(parquet_scale -
value_scale);
- }
-}
-
-template <typename T>
-static void _decode_decimal_v2_to_primary(const ColumnMinMaxParams& params,
- const char* raw_parquet_val, T*
out_value,
- T* parquet_val) {
- *parquet_val = reinterpret_cast<const T*>(raw_parquet_val)[0];
- DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
- *out_value = conjunct_value.value();
- _align_decimal_v2_scale(out_value, conjunct_value.scale(), parquet_val,
params.parquet_scale);
-}
-
-// todo: support decimal128 after the test passes
-//static Int128 _decode_value_to_int128(const ColumnMinMaxParams& params,
-// const char* raw_parquet_val) {
-// const uint8_t* buf = reinterpret_cast<const uint8_t*>(raw_parquet_val);
-// int32_t length = params.parquet_type_length;
-// Int128 value = buf[0] & 0x80 ? -1 : 0;
-// memcpy(reinterpret_cast<uint8_t*>(&value) + sizeof(value) - length, buf,
length);
-// return BigEndian::ToHost128(value);
-//}
-
-static bool _eval_in_val(const ColumnMinMaxParams& params) {
- switch (params.conjunct_type) {
- case TYPE_TINYINT: {
- _FILTER_GROUP_BY_IN(int8_t, params.in_pred_values, params.min_bytes,
params.max_bytes)
- break;
- }
- case TYPE_SMALLINT: {
- _FILTER_GROUP_BY_IN(int16_t, params.in_pred_values, params.min_bytes,
params.max_bytes)
- break;
- }
- case TYPE_DECIMAL32:
- case TYPE_INT: {
- _FILTER_GROUP_BY_IN(int32_t, params.in_pred_values, params.min_bytes,
params.max_bytes)
- break;
- }
- case TYPE_DECIMAL64:
- case TYPE_BIGINT: {
- _FILTER_GROUP_BY_IN(int64_t, params.in_pred_values, params.min_bytes,
params.max_bytes)
- break;
- }
- case TYPE_DECIMALV2: {
- break;
- }
- case TYPE_STRING:
- case TYPE_VARCHAR:
- case TYPE_CHAR: {
- std::vector<std::string> in_values;
- for (auto val : params.in_pred_values) {
- in_values.emplace_back(((StringValue*)val)->to_string());
+ ScanPredicate(const ScanPredicate& other) {
+ op = other.op;
+ for (auto v : other.values) {
+ values.emplace_back(v);
+ }
+ scale = other.scale;
}
- if (in_values.empty()) {
- return false;
- }
- auto result = std::minmax_element(in_values.begin(), in_values.end());
- std::string& in_min = *result.first;
- std::string& in_max = *result.second;
- if (strcmp(in_max.data(), params.min_bytes) < 0 ||
- strcmp(in_min.data(), params.max_bytes) > 0) {
- return true;
- }
- break;
- }
- default:
- return false;
- }
- return false;
-}
+ };
-static bool _eval_eq(const ColumnMinMaxParams& params) {
- switch (params.conjunct_type) {
- case TYPE_TINYINT: {
- _PLAIN_DECODE(int16_t, params.value, params.min_bytes,
params.max_bytes, conjunct_value,
- min, max)
- _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
- break;
- }
- case TYPE_SMALLINT: {
- _PLAIN_DECODE(int16_t, params.value, params.min_bytes,
params.max_bytes, conjunct_value,
- min, max)
- _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
- break;
- }
- case TYPE_DECIMAL32:
- case TYPE_INT: {
- _PLAIN_DECODE(int32_t, params.value, params.min_bytes,
params.max_bytes, conjunct_value,
- min, max)
- _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
- break;
- }
- case TYPE_DECIMAL64:
- case TYPE_BIGINT: {
- _PLAIN_DECODE(int64_t, params.value, params.min_bytes,
params.max_bytes, conjunct_value,
- min, max)
- _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
- break;
- }
- case TYPE_DECIMALV2: {
- if (params.parquet_type == tparquet::Type::INT32) {
- int32_t min_value = reinterpret_cast<const
int32_t*>(params.min_bytes)[0];
- int32_t max_value = reinterpret_cast<const
int32_t*>(params.max_bytes)[0];
- DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
- int32_t conjunct_int_value = conjunct_value.value();
- _align_decimal_v2_scale(&conjunct_int_value,
conjunct_value.scale(), &min_value,
- params.parquet_scale);
- _align_decimal_v2_scale(&conjunct_int_value,
conjunct_value.scale(), &max_value,
- params.parquet_scale);
- _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value)
- } else if (params.parquet_type == tparquet::Type::INT64) {
- int64_t min_value = reinterpret_cast<const
int64_t*>(params.min_bytes)[0];
- int64_t max_value = reinterpret_cast<const
int64_t*>(params.max_bytes)[0];
- DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
- int64_t conjunct_int_value = conjunct_value.value();
- _align_decimal_v2_scale(&conjunct_int_value,
conjunct_value.scale(), &min_value,
- params.parquet_scale);
- _align_decimal_v2_scale(&conjunct_int_value,
conjunct_value.scale(), &max_value,
- params.parquet_scale);
- _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value)
+ template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
+ static DecimalPrimitiveType _decode_primitive_decimal(const FieldSchema*
col_schema,
+ const std::string&
encoded_data,
+ int dest_scale) {
+ int scale = col_schema->parquet_schema.scale;
+ Int128 value = *reinterpret_cast<const
DecimalPhysicalType*>(encoded_data.data());
+ if (dest_scale > scale) {
+ value *=
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale);
+ } else if (dest_scale < scale) {
+ value /=
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale);
}
- break;
- // When precision exceeds 18, decimal will use
tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
- // todo: support decimal128 after the test passes
- // else if (params.parquet_type ==
tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
- // DecimalV2Value conjunct_value =
*((DecimalV2Value*)params.value);
- // Int128 conjunct_int_value = conjunct_value.value();
- // Int128 max = _decode_value_to_int128(params,
params.max_bytes);
- // _align_decimal_v2_scale(&conjunct_int_value,
conjunct_value.scale(), &max,
- // params.parquet_scale);
- // Int128 min = _decode_value_to_int128(params,
params.min_bytes);
- // _align_decimal_v2_scale(&conjunct_int_value,
conjunct_value.scale(), &min,
- // params.parquet_scale);
- // _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min, max)
- // }
+ return (DecimalPrimitiveType)value;
}
- case TYPE_STRING:
- case TYPE_VARCHAR:
- case TYPE_CHAR: {
- std::string conjunct_value = ((StringValue*)params.value)->to_string();
- if (strcmp(conjunct_value.data(), params.min_bytes) < 0 ||
- strcmp(conjunct_value.data(), params.max_bytes) > 0) {
- return true;
- }
- break;
- }
- default:
- return false;
- }
- return false;
-}
-template <typename T>
-static bool _filter_group_by_gt_or_ge(T conjunct_value, T max, bool is_ge) {
- if (!is_ge) {
- if (max <= conjunct_value) {
- return true;
- }
- } else {
- if (max < conjunct_value) {
- return true;
+ template <typename DecimalPrimitiveType>
+ static DecimalPrimitiveType _decode_binary_decimal(const FieldSchema*
col_schema,
+ const std::string&
encoded_data,
+ int dest_scale) {
+ int scale = col_schema->parquet_schema.scale;
+ const char* buf_start = encoded_data.data();
+ Int128 value = buf_start[0] & 0x80 ? -1 : 0;
+ memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) -
encoded_data.size(), buf_start,
+ encoded_data.size());
+ value = BigEndian::ToHost128(value);
+ if (dest_scale > scale) {
+ value *=
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale);
+ } else if (dest_scale < scale) {
+ value /=
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale);
}
+ return (DecimalPrimitiveType)value;
}
- return false;
-}
-static bool _eval_gt(const ColumnMinMaxParams& params, bool is_eq) {
- switch (params.conjunct_type) {
- case TYPE_TINYINT: {
- _PLAIN_DECODE_SINGLE(int8_t, params.value, params.max_bytes,
conjunct_value, max)
- return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
- }
- case TYPE_SMALLINT: {
- _PLAIN_DECODE_SINGLE(int16_t, params.value, params.max_bytes,
conjunct_value, max)
- return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
- }
- case TYPE_DECIMAL32:
- case TYPE_INT: {
- _PLAIN_DECODE_SINGLE(int32_t, params.value, params.max_bytes,
conjunct_value, max)
- return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
- }
- case TYPE_DECIMAL64:
- case TYPE_BIGINT: {
- _PLAIN_DECODE_SINGLE(int64_t, params.value, params.max_bytes,
conjunct_value, max)
- return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
- }
- case TYPE_DECIMALV2: {
- if (params.parquet_type == tparquet::Type::INT32) {
- int32_t conjunct_int_value = 0;
- int32_t parquet_value = 0;
- _decode_decimal_v2_to_primary(params, params.max_bytes,
&conjunct_int_value,
- &parquet_value);
- return _filter_group_by_gt_or_ge(conjunct_int_value,
parquet_value, is_eq);
- } else if (params.parquet_type == tparquet::Type::INT64) {
- int64_t conjunct_int_value = 0;
- int64_t parquet_value = 0;
- _decode_decimal_v2_to_primary(params, params.max_bytes,
&conjunct_int_value,
- &parquet_value);
- return _filter_group_by_gt_or_ge(conjunct_int_value,
parquet_value, is_eq);
+ template <typename CppType>
+ static bool _filter_by_min_max(const SQLFilterOp op,
+ const std::vector<CppType>&
predicate_values, CppType& min_value,
+ CppType& max_value) {
+ if (predicate_values.empty()) {
+ return false;
}
- break;
- // When precision exceeds 18, decimal will use
tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
- // todo: support decimal128 after the test passes
- // else if (params.parquet_type ==
tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
- // DecimalV2Value conjunct_value =
*((DecimalV2Value*)params.value);
- // Int128 conjunct_int_value = conjunct_value.value();
- // Int128 max = _decode_value_to_int128(params,
params.max_bytes);
- // _align_decimal_v2_scale(&conjunct_int_value,
conjunct_value.scale(), &max,
- // params.parquet_scale);
- // return _filter_group_by_gt_or_ge(conjunct_int_value,
max, is_eq);
- // }
- }
- case TYPE_STRING:
- case TYPE_VARCHAR:
- case TYPE_CHAR: {
- std::string conjunct_value = ((StringValue*)params.value)->to_string();
- if (!is_eq && strcmp(params.max_bytes, conjunct_value.data()) <= 0) {
- return true;
- } else if (strcmp(params.max_bytes, conjunct_value.data()) < 0) {
+ switch (op) {
+ case FILTER_IN:
+ for (const CppType& in_value : predicate_values) {
+ if (in_value >= min_value && in_value <= max_value) {
+ return false;
+ }
+ }
return true;
+ case FILTER_LESS:
+ return min_value >= predicate_values[0];
+ case FILTER_LESS_OR_EQUAL:
+ return min_value > predicate_values[0];
+ case FILTER_LARGER:
+ return max_value <= predicate_values[0];
+ case FILTER_LARGER_OR_EQUAL:
+ return max_value < predicate_values[0];
+ default:
+ return false;
}
- break;
- }
- default:
- return false;
}
- return false;
-}
-template <typename T>
-static bool _filter_group_by_lt_or_le(T conjunct_value, T min, bool is_le) {
- if (!is_le) {
- if (min >= conjunct_value) {
- return true;
- }
- } else {
- if (min > conjunct_value) {
- return true;
+ template <PrimitiveType primitive_type>
+ static bool _filter_by_min_max(const ColumnValueRange<primitive_type>&
col_val_range,
+ const ScanPredicate& predicate, const
FieldSchema* col_schema,
+ const std::string& encoded_min, const
std::string& encoded_max,
+ const cctz::time_zone& ctz) {
+ using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType;
+ std::vector<CppType> predicate_values;
+ for (const void* v : predicate.values) {
+ predicate_values.emplace_back(*reinterpret_cast<const
CppType*>(v));
}
- }
- return false;
-}
-static bool _eval_lt(const ColumnMinMaxParams& params, bool is_eq) {
- switch (params.conjunct_type) {
- case TYPE_TINYINT: {
- _PLAIN_DECODE_SINGLE(int8_t, params.value, params.min_bytes,
conjunct_value, min)
- return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
- }
- case TYPE_SMALLINT: {
- _PLAIN_DECODE_SINGLE(int16_t, params.value, params.min_bytes,
conjunct_value, min)
- return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
- }
- case TYPE_DECIMAL32:
- case TYPE_INT: {
- _PLAIN_DECODE_SINGLE(int32_t, params.value, params.min_bytes,
conjunct_value, min)
- return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
- }
- case TYPE_DECIMAL64:
- case TYPE_BIGINT: {
- _PLAIN_DECODE_SINGLE(int64_t, params.value, params.min_bytes,
conjunct_value, min)
- return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
- }
- case TYPE_STRING:
- case TYPE_VARCHAR:
- case TYPE_CHAR: {
- std::string conjunct_value = ((StringValue*)params.value)->to_string();
- if (!is_eq && strcmp(params.min_bytes, conjunct_value.data()) >= 0) {
- return true;
- } else if (strcmp(params.min_bytes, conjunct_value.data()) > 0) {
- return true;
- }
- break;
- }
- case TYPE_DECIMALV2: {
- if (params.parquet_type == tparquet::Type::INT32) {
- int32_t conjunct_int_value = 0;
- int32_t parquet_value = 0;
- _decode_decimal_v2_to_primary(params, params.min_bytes,
&conjunct_int_value,
- &parquet_value);
- return _filter_group_by_lt_or_le(conjunct_int_value,
parquet_value, is_eq);
- } else if (params.parquet_type == tparquet::Type::INT64) {
- int64_t conjunct_int_value = 0;
- int64_t parquet_value = 0;
- _decode_decimal_v2_to_primary(params, params.min_bytes,
&conjunct_int_value,
- &parquet_value);
- return _filter_group_by_lt_or_le(conjunct_int_value,
parquet_value, is_eq);
- }
+ CppType min_value;
+ CppType max_value;
+ tparquet::Type::type physical_type = col_schema->physical_type;
+ switch (col_val_range.type()) {
+#define DISPATCH(REINTERPRET_TYPE, PARQUET_TYPE) \
+ case REINTERPRET_TYPE: \
+ if (col_schema->physical_type != PARQUET_TYPE) return false; \
+ min_value = *reinterpret_cast<const CppType*>(encoded_min.data()); \
+ max_value = *reinterpret_cast<const CppType*>(encoded_max.data()); \
break;
- // When precision exceeds 18, decimal will use
tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
- // todo: support decimal128 after the test passes
- // else if (params.parquet_type ==
tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
- // DecimalV2Value conjunct_value =
*((DecimalV2Value*)params.value);
- // Int128 conjunct_int_value = conjunct_value.value();
- // Int128 min = _decode_value_to_int128(params,
params.min_bytes);
- // _align_decimal_v2_scale(&conjunct_int_value,
conjunct_value.scale(), &min,
- // params.parquet_scale);
- // return _filter_group_by_lt_or_le(conjunct_int_value,
min, is_eq);
- // }
- }
- case TYPE_DATE: {
- // doris::DateTimeValue* min_date =
(doris::DateTimeValue*)params.value;
- // LOG(INFO) << min_date->debug_string();
- return false;
- }
- default:
- return false;
- }
- return false;
-}
+ FOR_REINTERPRET_TYPES(DISPATCH)
+#undef DISPATCH
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING:
+ if constexpr (std::is_same_v<CppType, StringRef>) {
+ min_value = StringRef(encoded_min);
+ max_value = StringRef(encoded_max);
+ } else {
+ return false;
+ };
+ break;
+ case TYPE_DECIMALV2:
+ if constexpr (std::is_same_v<CppType, DecimalV2Value>) {
+ size_t max_precision =
max_decimal_precision<Decimal<__int128_t>>();
+ if (col_schema->parquet_schema.precision < 1 ||
+ col_schema->parquet_schema.precision > max_precision ||
+ col_schema->parquet_schema.scale > max_precision) {
+ return false;
+ }
+ int v2_scale = DecimalV2Value::SCALE;
+ if (physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ min_value = DecimalV2Value(
+ _decode_binary_decimal<Int128>(col_schema,
encoded_min, v2_scale));
+ max_value = DecimalV2Value(
+ _decode_binary_decimal<Int128>(col_schema,
encoded_max, v2_scale));
+ } else if (physical_type == tparquet::Type::INT32) {
+ min_value =
DecimalV2Value(_decode_primitive_decimal<Int128, Int32>(
+ col_schema, encoded_min, v2_scale));
+ max_value =
DecimalV2Value(_decode_primitive_decimal<Int128, Int32>(
+ col_schema, encoded_max, v2_scale));
+ } else if (physical_type == tparquet::Type::INT64) {
+ min_value =
DecimalV2Value(_decode_primitive_decimal<Int128, Int64>(
+ col_schema, encoded_min, v2_scale));
+ max_value =
DecimalV2Value(_decode_primitive_decimal<Int128, Int64>(
+ col_schema, encoded_max, v2_scale));
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ break;
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I:
+ if constexpr (std::is_same_v<CppType, int32_t> ||
std::is_same_v<CppType, int64_t> ||
+ std::is_same_v<CppType, __int128_t>) {
+ size_t max_precision =
max_decimal_precision<Decimal<CppType>>();
+ if (col_schema->parquet_schema.precision < 1 ||
+ col_schema->parquet_schema.precision > max_precision ||
+ col_schema->parquet_schema.scale > max_precision) {
+ return false;
+ }
+ if (physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ min_value = _decode_binary_decimal<CppType>(col_schema,
encoded_min,
+
predicate.scale);
+ max_value = _decode_binary_decimal<CppType>(col_schema,
encoded_max,
+
predicate.scale);
+ } else if (physical_type == tparquet::Type::INT32) {
+ min_value = _decode_primitive_decimal<CppType,
Int32>(col_schema, encoded_min,
+
predicate.scale);
+ max_value = _decode_primitive_decimal<CppType,
Int32>(col_schema, encoded_max,
+
predicate.scale);
+ } else if (physical_type == tparquet::Type::INT64) {
+ min_value = _decode_primitive_decimal<CppType,
Int64>(col_schema, encoded_min,
+
predicate.scale);
+ max_value = _decode_primitive_decimal<CppType,
Int64>(col_schema, encoded_max,
+
predicate.scale);
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ break;
+ case TYPE_DATE:
+ case TYPE_DATEV2:
+ if (physical_type == tparquet::Type::INT32) {
+ int64_t min_date_value =
+ static_cast<int64_t>(*reinterpret_cast<const
int32_t*>(encoded_min.data()));
+ int64_t max_date_value =
+ static_cast<int64_t>(*reinterpret_cast<const
int32_t*>(encoded_max.data()));
+ if constexpr (std::is_same_v<CppType, DateTimeValue> ||
+ std::is_same_v<CppType,
DateV2Value<DateV2ValueType>>) {
+ min_value.from_unixtime(min_date_value * 24 * 60 * 60,
ctz);
+ max_value.from_unixtime(max_date_value * 24 * 60 * 60,
ctz);
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ break;
+ case TYPE_DATETIME:
+ case TYPE_DATETIMEV2:
+ if (physical_type == tparquet::Type::INT96) {
+ ParquetInt96 datetime96_min =
+ *reinterpret_cast<const
ParquetInt96*>(encoded_min.data());
+ int64_t micros_min = datetime96_min.to_timestamp_micros();
+ ParquetInt96 datetime96_max =
+ *reinterpret_cast<const
ParquetInt96*>(encoded_max.data());
+ int64_t micros_max = datetime96_max.to_timestamp_micros();
+ if constexpr (std::is_same_v<CppType, DateTimeValue> ||
+ std::is_same_v<CppType,
DateV2Value<DateTimeV2ValueType>>) {
+ min_value.from_unixtime(micros_min / 1000000, ctz);
+ max_value.from_unixtime(micros_max / 1000000, ctz);
+ if constexpr (std::is_same_v<CppType,
DateV2Value<DateTimeV2ValueType>>) {
+ min_value.set_microsecond(micros_min % 1000000);
+ max_value.set_microsecond(micros_max % 1000000);
+ }
+ } else {
+ return false;
+ }
+ } else if (physical_type == tparquet::Type::INT64) {
+ int64_t date_value_min = *reinterpret_cast<const
int64_t*>(encoded_min.data());
+ int64_t date_value_max = *reinterpret_cast<const
int64_t*>(encoded_max.data());
-struct ScanPredicate {
- ScanPredicate() = default;
- ~ScanPredicate() = default;
- std::string _col_name;
- TExprOpcode::type _op;
- std::vector<void*> _values;
- bool _null_op = false;
- bool _is_null = false;
- int _scale;
+ int64_t second_mask = 1;
+ int64_t scale_to_nano_factor = 1;
+ cctz::time_zone resolved_ctz = ctz;
+ const auto& schema = col_schema->parquet_schema;
+ if (schema.__isset.logicalType &&
schema.logicalType.__isset.TIMESTAMP) {
+ const auto& timestamp_info = schema.logicalType.TIMESTAMP;
+ if (!timestamp_info.isAdjustedToUTC) {
+ // should set timezone to utc+0
+ resolved_ctz = cctz::utc_time_zone();
+ }
+ const auto& time_unit = timestamp_info.unit;
+ if (time_unit.__isset.MILLIS) {
+ second_mask = 1000;
+ scale_to_nano_factor = 1000000;
+ } else if (time_unit.__isset.MICROS) {
+ second_mask = 1000000;
+ scale_to_nano_factor = 1000;
+ } else if (time_unit.__isset.NANOS) {
+ second_mask = 1000000000;
+ scale_to_nano_factor = 1;
+ }
+ } else if (schema.__isset.converted_type) {
+ const auto& converted_type = schema.converted_type;
+ if (converted_type ==
tparquet::ConvertedType::TIMESTAMP_MILLIS) {
+ second_mask = 1000;
+ scale_to_nano_factor = 1000000;
+ } else if (converted_type ==
tparquet::ConvertedType::TIMESTAMP_MICROS) {
+ second_mask = 1000000;
+ scale_to_nano_factor = 1000;
+ }
+ }
- ScanPredicate(const ScanPredicate& other) {
- _col_name = other._col_name;
- _op = other._op;
- for (void* v : other._values) {
- _values.push_back(v);
+ if constexpr (std::is_same_v<CppType, DateTimeValue> ||
+ std::is_same_v<CppType,
DateV2Value<DateTimeV2ValueType>>) {
+ min_value.from_unixtime(date_value_min / second_mask,
resolved_ctz);
+ max_value.from_unixtime(date_value_max / second_mask,
resolved_ctz);
+ if constexpr (std::is_same_v<CppType,
DateV2Value<DateTimeV2ValueType>>) {
+ min_value.set_microsecond((date_value_min %
second_mask) *
+ scale_to_nano_factor / 1000);
+ max_value.set_microsecond((date_value_max %
second_mask) *
+ scale_to_nano_factor / 1000);
+ }
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ break;
+ default:
+ return false;
}
- _null_op = other._null_op;
- _is_null = other._is_null;
- _scale = other._scale;
+ return _filter_by_min_max(predicate.op, predicate_values, min_value,
max_value);
}
-};
-template <PrimitiveType primitive_type>
-static void to_filter(const ColumnValueRange<primitive_type>& col_val_range,
- std::vector<ScanPredicate>& filters) {
- using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType;
- const auto& high_value = col_val_range.get_range_max_value();
- const auto& low_value = col_val_range.get_range_min_value();
- const auto& high_op = col_val_range.get_range_high_op();
- const auto& low_op = col_val_range.get_range_low_op();
+ template <PrimitiveType primitive_type>
+ static std::vector<ScanPredicate> _value_range_to_predicate(
+ const ColumnValueRange<primitive_type>& col_val_range) {
+ using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType;
+ std::vector<ScanPredicate> predicates;
- // todo: process equals
- if (col_val_range.is_fixed_value_range()) {
- // 1. convert to in filter condition
- ScanPredicate condition;
- condition._col_name = col_val_range.column_name();
- condition._op = TExprOpcode::FILTER_NEW_IN;
- condition._scale = col_val_range.scale();
- if (col_val_range.get_fixed_value_set().empty()) {
- return;
- }
- for (const auto& value : col_val_range.get_fixed_value_set()) {
- condition._values.push_back(const_cast<CppType*>(&value));
- }
- filters.push_back(condition);
- } else if (low_value < high_value) {
- // 2. convert to min max filter condition
- ScanPredicate null_pred;
- if (col_val_range.is_high_value_maximum() && high_op ==
SQLFilterOp::FILTER_LESS_OR_EQUAL &&
- col_val_range.is_low_value_mininum() && low_op ==
SQLFilterOp::FILTER_LARGER_OR_EQUAL &&
- !col_val_range.contain_null()) {
- null_pred._col_name = col_val_range.column_name();
- null_pred._null_op = true;
- null_pred._is_null = false;
- filters.push_back(null_pred);
- return;
- }
- ScanPredicate low;
- if (!col_val_range.is_low_value_mininum() ||
- SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) {
- low._col_name = col_val_range.column_name();
- low._op = (low_op == SQLFilterOp::FILTER_LARGER_OR_EQUAL ?
TExprOpcode::GE
- :
TExprOpcode::GT);
- // NOTICE: use get_range_min_value_ptr, not "low_value"'s addr,
- // to avoid stack-use-after-return bug
-
low._values.push_back(const_cast<CppType*>(col_val_range.get_range_min_value_ptr()));
- low._scale = col_val_range.scale();
- filters.push_back(low);
+ if (col_val_range.is_fixed_value_range()) {
+ ScanPredicate in_predicate;
+ in_predicate.op = SQLFilterOp::FILTER_IN;
+ in_predicate.scale = col_val_range.scale();
+ for (const auto& value : col_val_range.get_fixed_value_set()) {
+ in_predicate.values.emplace_back(&value);
+ }
+ if (!in_predicate.values.empty()) {
+ predicates.emplace_back(in_predicate);
+ }
+ return predicates;
}
- ScanPredicate high;
- if (!col_val_range.is_high_value_maximum() ||
- SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) {
- high._col_name = col_val_range.column_name();
- high._op = (high_op == SQLFilterOp::FILTER_LESS_OR_EQUAL ?
TExprOpcode::LE
- :
TExprOpcode::LT);
- // NOTICE: use get_range_max_value_ptr, not "high_value"'s addr,
- // to avoid stack-use-after-return bug
-
high._values.push_back(const_cast<CppType*>(col_val_range.get_range_max_value_ptr()));
- high._scale = col_val_range.scale();
- filters.push_back(high);
- }
- } else {
- // 3. convert to is null and is not null filter condition
- ScanPredicate null_pred;
- if (col_val_range.is_low_value_maximum() &&
col_val_range.is_high_value_mininum() &&
- col_val_range.contain_null()) {
- null_pred._col_name = col_val_range.column_name();
- null_pred._null_op = true;
- null_pred._is_null = true;
- filters.push_back(null_pred);
- }
- }
-}
+ const CppType high_value = col_val_range.get_range_max_value();
+ const CppType low_value = col_val_range.get_range_min_value();
+ const SQLFilterOp high_op = col_val_range.get_range_high_op();
+ const SQLFilterOp low_op = col_val_range.get_range_low_op();
-static void _eval_predicate(const ScanPredicate& filter, ColumnMinMaxParams*
params,
- bool* need_filter) {
- if (filter._values.empty()) {
- return;
- }
- if (filter._op == TExprOpcode::FILTER_NEW_IN) {
- if (filter._values.size() == 1) {
- params->value = filter._values[0];
- *need_filter = _eval_eq(*params);
- return;
+ // orc can only push down is_null. When col_value_range._contain_null
= true, only indicating that
+ // value can be null, not equals null, so ignore _contain_null in
col_value_range
+ if (col_val_range.is_high_value_maximum() && high_op ==
SQLFilterOp::FILTER_LESS_OR_EQUAL &&
+ col_val_range.is_low_value_mininum() && low_op ==
SQLFilterOp::FILTER_LARGER_OR_EQUAL) {
+ return predicates;
}
- params->in_pred_values = filter._values;
- *need_filter = _eval_in_val(*params);
- return;
- }
- // preserve TExprOpcode::FILTER_NEW_NOT_IN
- params->value = filter._values[0];
- switch (filter._op) {
- case TExprOpcode::EQ:
- *need_filter = _eval_eq(*params);
- break;
- case TExprOpcode::NE:
- break;
- case TExprOpcode::GT:
- *need_filter = _eval_gt(*params, false);
- break;
- case TExprOpcode::GE:
- *need_filter = _eval_gt(*params, true);
- break;
- case TExprOpcode::LT:
- *need_filter = _eval_lt(*params, false);
- break;
- case TExprOpcode::LE:
- *need_filter = _eval_lt(*params, true);
- break;
- default:
- break;
- }
-}
-static bool determine_filter_min_max(const ColumnValueRangeType& col_val_range,
- const FieldSchema* col_schema, const
std::string& encoded_min,
- const std::string& encoded_max) {
- const char* min_bytes = encoded_min.data();
- const char* max_bytes = encoded_max.data();
- bool need_filter = false;
- std::vector<ScanPredicate> filters;
- PrimitiveType col_type;
- std::visit(
- [&](auto&& range) {
- col_type = range.type();
- to_filter(range, filters);
- },
- col_val_range);
- if (filters.empty()) {
- return false;
+ if (low_value < high_value) {
+ if (!col_val_range.is_low_value_mininum() ||
+ SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) {
+ ScanPredicate low_predicate;
+ low_predicate.scale = col_val_range.scale();
+ low_predicate.op = low_op;
+
low_predicate.values.emplace_back(col_val_range.get_range_min_value_ptr());
+ predicates.emplace_back(low_predicate);
+ }
+ if (!col_val_range.is_high_value_maximum() ||
+ SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) {
+ ScanPredicate high_predicate;
+ high_predicate.scale = col_val_range.scale();
+ high_predicate.op = high_op;
+
high_predicate.values.emplace_back(col_val_range.get_range_max_value_ptr());
+ predicates.emplace_back(high_predicate);
+ }
+ }
+ return predicates;
}
- ColumnMinMaxParams params;
- params.conjunct_type = col_type;
- params.parquet_type = col_schema->physical_type;
- params.parquet_precision = col_schema->parquet_schema.precision;
- params.parquet_scale = col_schema->parquet_schema.scale;
- params.parquet_type_length = col_schema->parquet_schema.type_length;
- params.min_bytes = min_bytes;
- params.max_bytes = max_bytes;
- for (int i = 0; i < filters.size(); i++) {
- _eval_predicate(filters[i], ¶ms, &need_filter);
- if (need_filter) {
- break;
- }
+public:
+ static bool filter_by_min_max(const ColumnValueRangeType& col_val_range,
+ const FieldSchema* col_schema, const
std::string& encoded_min,
+ const std::string& encoded_max, const
cctz::time_zone& ctz) {
+ bool need_filter = false;
+ std::visit(
+ [&](auto&& range) {
+ std::vector<ScanPredicate> filters =
_value_range_to_predicate(range);
+ for (auto& filter : filters) {
+ need_filter |= _filter_by_min_max(range, filter,
col_schema, encoded_min,
+ encoded_max, ctz);
+ if (need_filter) {
+ break;
+ }
+ }
+ },
+ col_val_range);
+ return need_filter;
}
- return need_filter;
-}
+};
-} // namespace doris::vectorized
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
index acb56d0c2a8..ad70d8f8bcb 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
@@ -40,15 +40,17 @@ Status
PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex*
column_index,
ColumnValueRangeType&
col_val_range,
const FieldSchema* col_schema,
- std::vector<int>& skipped_ranges)
{
+ std::vector<int>& skipped_ranges,
+ const cctz::time_zone& ctz) {
const std::vector<std::string>& encoded_min_vals =
column_index->min_values;
const std::vector<std::string>& encoded_max_vals =
column_index->max_values;
DCHECK_EQ(encoded_min_vals.size(), encoded_max_vals.size());
const int num_of_pages = column_index->null_pages.size();
for (int page_id = 0; page_id < num_of_pages; page_id++) {
- if (determine_filter_min_max(col_val_range, col_schema,
encoded_min_vals[page_id],
- encoded_max_vals[page_id])) {
+ if (ParquetPredicate::filter_by_min_max(col_val_range, col_schema,
+ encoded_min_vals[page_id],
+ encoded_max_vals[page_id],
ctz)) {
skipped_ranges.emplace_back(page_id);
}
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h
b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index 4a27593abf0..00cbbdcfdd9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -33,7 +33,7 @@ public:
Status collect_skipped_page_range(tparquet::ColumnIndex* column_index,
ColumnValueRangeType& col_val_range,
const FieldSchema* col_schema,
- std::vector<int>& skipped_ranges);
+ std::vector<int>& skipped_ranges, const
cctz::time_zone& ctz);
bool check_and_get_page_index_ranges(const
std::vector<tparquet::ColumnChunk>& columns);
Status parse_column_index(const tparquet::ColumnChunk& chunk, const
uint8_t* buff,
tparquet::ColumnIndex* column_index);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 56775a53d32..aa7eb25bd73 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -567,7 +567,7 @@ Status ParquetReader::_process_page_index(const
tparquet::RowGroup& row_group,
std::vector<int> skipped_page_range;
const FieldSchema* col_schema =
schema_desc.get_column(read_col._file_slot_name);
page_index.collect_skipped_page_range(&column_index, conjuncts,
col_schema,
- skipped_page_range);
+ skipped_page_range, *_ctz);
if (skipped_page_range.empty()) {
continue;
}
@@ -648,8 +648,8 @@ Status ParquetReader::_process_column_stat_filter(const
std::vector<tparquet::Co
}
const FieldSchema* col_schema = schema_desc.get_column(col_name);
// Min-max of statistic is plain-encoded value
- *filter_group = determine_filter_min_max(slot_iter->second,
col_schema, statistic.min,
- statistic.max);
+ *filter_group = ParquetPredicate::filter_by_min_max(slot_iter->second,
col_schema,
+ statistic.min,
statistic.max, *_ctz);
if (*filter_group) {
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]