This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9986fa747ad [Chore](compatible) adjust register_alternative_function
(#40941)
9986fa747ad is described below
commit 9986fa747ad6f44bd855e2a997bfabbe2e55fa9e
Author: Pxl <[email protected]>
AuthorDate: Thu Sep 19 16:24:40 2024 +0800
[Chore](compatible) adjust register_alternative_function (#40941)
## Proposed changes
adjust register_alternative_function
---
be/src/agent/be_exec_version_manager.cpp | 49 ++++++++++++++--------
be/src/agent/be_exec_version_manager.h | 21 ++++++++--
be/src/olap/rowset/segment_v2/column_reader.cpp | 2 +-
be/src/olap/tablet_schema.cpp | 2 +-
.../aggregate_function_covar.cpp | 8 ++--
.../aggregate_function_percentile.cpp | 16 ++++---
.../aggregate_function_percentile_approx.cpp | 11 +++--
.../aggregate_function_simple_factory.h | 23 ++++------
.../aggregate_function_stddev.cpp | 14 ++++---
.../aggregate_function_window_funnel.cpp | 6 ++-
be/src/vec/data_types/data_type_agg_state.h | 6 +--
be/src/vec/functions/simple_function_factory.h | 8 ----
12 files changed, 97 insertions(+), 69 deletions(-)
diff --git a/be/src/agent/be_exec_version_manager.cpp
b/be/src/agent/be_exec_version_manager.cpp
index 32cbe569892..e44829ae39b 100644
--- a/be/src/agent/be_exec_version_manager.cpp
+++ b/be/src/agent/be_exec_version_manager.cpp
@@ -17,12 +17,9 @@
#include "agent/be_exec_version_manager.h"
-namespace doris {
+#include "common/exception.h"
-const std::map<int, const std::set<std::string>> AGGREGATION_CHANGE_MAP = {
- {AGGREGATION_2_1_VERSION,
- {"window_funnel", "stddev_samp", "variance_samp",
"percentile_approx_weighted",
- "percentile_approx", "covar_samp", "percentile",
"percentile_array"}}};
+namespace doris {
Status BeExecVersionManager::check_be_exec_version(int be_exec_version) {
if (be_exec_version > max_be_exec_version || be_exec_version <
min_be_exec_version) {
@@ -35,19 +32,35 @@ Status BeExecVersionManager::check_be_exec_version(int
be_exec_version) {
return Status::OK();
}
-void BeExecVersionManager::check_agg_state_compatibility(int
current_be_exec_version,
- int
data_be_exec_version,
- std::string
function_name) {
- if (current_be_exec_version > AGGREGATION_2_1_VERSION &&
- data_be_exec_version <= AGGREGATION_2_1_VERSION &&
-
AGGREGATION_CHANGE_MAP.find(AGGREGATION_2_1_VERSION)->second.contains(function_name))
{
- throw Exception(Status::InternalError(
- "agg state data with {} is not supported, "
- "current_be_exec_version={}, data_be_exec_version={}, need to
rebuild the data "
- "or set the be_exec_version={} in fe.conf",
- function_name, current_be_exec_version, data_be_exec_version,
- AGGREGATION_2_1_VERSION));
+int BeExecVersionManager::get_function_compatibility(int be_exec_version,
+ std::string
function_name) {
+ auto it = _function_change_map.find(function_name);
+ if (it == _function_change_map.end()) {
+ // 0 means no compatibility issues need to be dealt with
+ return 0;
+ }
+
+ auto version_it = it->second.lower_bound(be_exec_version);
+ if (version_it == it->second.end()) {
+ return 0;
+ }
+
+ return *version_it;
+}
+
+void BeExecVersionManager::check_function_compatibility(int
current_be_exec_version,
+ int
data_be_exec_version,
+ std::string
function_name) {
+ if (get_function_compatibility(current_be_exec_version, function_name) ==
+ get_function_compatibility(data_be_exec_version, function_name)) {
+ return;
}
+
+ throw Exception(Status::InternalError(
+ "agg state data with {} is not supported, "
+ "current_be_exec_version={}, data_be_exec_version={}, need to
rebuild the data "
+ "or set the be_exec_version={} in fe.conf temporary",
+ function_name, current_be_exec_version, data_be_exec_version,
data_be_exec_version));
}
/**
@@ -88,5 +101,5 @@ void BeExecVersionManager::check_agg_state_compatibility(int
current_be_exec_ver
*/
const int BeExecVersionManager::max_be_exec_version = 7;
const int BeExecVersionManager::min_be_exec_version = 0;
-
+std::map<std::string, std::set<int>>
BeExecVersionManager::_function_change_map {};
} // namespace doris
diff --git a/be/src/agent/be_exec_version_manager.h
b/be/src/agent/be_exec_version_manager.h
index 16092197a3a..7ab3c7de23a 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -25,13 +25,14 @@
namespace doris {
+constexpr static int AGG_FUNCTION_NEW_WINDOW_FUNNEL = 6;
constexpr inline int BITMAP_SERDE = 3;
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility
issues, see pr #32299
constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable
property: PR #37215
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix
PR #38413
constexpr inline int AGGREGATION_2_1_VERSION =
- 5; // some aggregation changed the data format after this version
+ 6; // some aggregation changed the data format after this version
class BeExecVersionManager {
public:
@@ -39,14 +40,28 @@ public:
static Status check_be_exec_version(int be_exec_version);
- static void check_agg_state_compatibility(int current_be_exec_version, int
data_be_exec_version,
- std::string function_name);
+ static int get_function_compatibility(int be_exec_version, std::string
function_name);
+
+ static void check_function_compatibility(int current_be_exec_version, int
data_be_exec_version,
+ std::string function_name);
static int get_newest_version() { return max_be_exec_version; }
+ static std::string get_function_suffix(int be_exec_version) {
+ return "_for_old_version_" + std::to_string(be_exec_version);
+ }
+
+ // For example, there are incompatible changes between version=7 and
version=6, at this time breaking_old_version is 6.
+ static void registe_old_function_compatibility(int breaking_old_version,
+ std::string function_name) {
+ _function_change_map[function_name].insert(breaking_old_version);
+ }
+
private:
static const int max_be_exec_version;
static const int min_be_exec_version;
+ // [function name] -> [breaking change start version]
+ static std::map<std::string, std::set<int>> _function_change_map;
};
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 7b5b39a4c39..3c9b5b7ce7e 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -201,7 +201,7 @@ Status ColumnReader::create_agg_state(const
ColumnReaderOptions& opts, const Col
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(meta);
const auto* agg_state_type = assert_cast<const
vectorized::DataTypeAggState*>(data_type.get());
- agg_state_type->check_agg_state_compatibility(opts.be_exec_version);
+ agg_state_type->check_function_compatibility(opts.be_exec_version);
auto type =
agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type;
if (read_as_string(type)) {
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 813a5e5519f..83b2bd4f702 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -675,7 +675,7 @@ bool TabletColumn::is_row_store_column() const {
vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function_union(
vectorized::DataTypePtr type, int current_be_exec_version) const {
const auto* state_type = assert_cast<const
vectorized::DataTypeAggState*>(type.get());
- BeExecVersionManager::check_agg_state_compatibility(
+ BeExecVersionManager::check_function_compatibility(
current_be_exec_version, _be_exec_version,
state_type->get_nested_function()->get_name());
return
vectorized::AggregateStateUnion::create(state_type->get_nested_function(),
{type}, type);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
index 790d0270aa3..76a2881dd78 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
@@ -81,9 +81,11 @@ void
register_aggregate_function_covar_pop(AggregateFunctionSimpleFactory& facto
void
register_aggregate_function_covar_samp_old(AggregateFunctionSimpleFactory&
factory) {
factory.register_alternative_function(
- "covar_samp",
create_aggregate_function_covariance_samp_old<NOTNULLABLE>);
- factory.register_alternative_function(
- "covar_samp",
create_aggregate_function_covariance_samp_old<NULLABLE>, NULLABLE);
+ "covar_samp",
create_aggregate_function_covariance_samp_old<NOTNULLABLE>, false,
+ AGG_FUNCTION_NULLABLE);
+ factory.register_alternative_function("covar_samp",
+
create_aggregate_function_covariance_samp_old<NULLABLE>,
+ true, AGG_FUNCTION_NULLABLE);
}
void register_aggregate_function_covar_samp(AggregateFunctionSimpleFactory&
factory) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
index a8767e6fae7..00034776607 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
@@ -111,16 +111,20 @@ void
register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact
}
void register_percentile_approx_old_function(AggregateFunctionSimpleFactory&
factory) {
- factory.register_alternative_function(
- "percentile_approx",
create_aggregate_function_percentile_approx_older<false>, false);
- factory.register_alternative_function(
- "percentile_approx",
create_aggregate_function_percentile_approx_older<true>, true);
+ factory.register_alternative_function("percentile_approx",
+
create_aggregate_function_percentile_approx_older<false>,
+ false, AGG_FUNCTION_NULLABLE);
+ factory.register_alternative_function("percentile_approx",
+
create_aggregate_function_percentile_approx_older<true>,
+ true, AGG_FUNCTION_NULLABLE);
factory.register_alternative_function(
"percentile_approx_weighted",
- create_aggregate_function_percentile_approx_weighted_older<false>,
false);
+ create_aggregate_function_percentile_approx_weighted_older<false>,
false,
+ AGG_FUNCTION_NULLABLE);
factory.register_alternative_function(
"percentile_approx_weighted",
- create_aggregate_function_percentile_approx_weighted_older<true>,
true);
+ create_aggregate_function_percentile_approx_weighted_older<true>,
true,
+ AGG_FUNCTION_NULLABLE);
}
void
register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory&
factory) {
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
index 01fdddf6074..5ad1ea8f2d3 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
@@ -24,13 +24,16 @@ namespace doris::vectorized {
void
register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory&
factory) {
factory.register_alternative_function(
- "percentile",
creator_without_type::creator<AggregateFunctionPercentileOld>);
+ "percentile",
creator_without_type::creator<AggregateFunctionPercentileOld>, false,
+ AGG_FUNCTION_NULLABLE);
factory.register_alternative_function(
- "percentile",
creator_without_type::creator<AggregateFunctionPercentileOld>, true);
+ "percentile",
creator_without_type::creator<AggregateFunctionPercentileOld>, true,
+ AGG_FUNCTION_NULLABLE);
factory.register_alternative_function(
- "percentile_array",
creator_without_type::creator<AggregateFunctionPercentileArrayOld>);
+ "percentile_array",
creator_without_type::creator<AggregateFunctionPercentileArrayOld>,
+ false, AGG_FUNCTION_NULLABLE);
factory.register_alternative_function(
"percentile_array",
creator_without_type::creator<AggregateFunctionPercentileArrayOld>,
- true);
+ true, AGG_FUNCTION_NULLABLE);
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
index cc504b9f996..b22504dda9c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
@@ -59,11 +59,6 @@ private:
AggregateFunctions aggregate_functions;
AggregateFunctions nullable_aggregate_functions;
std::unordered_map<std::string, std::string> function_alias;
- /// @TEMPORARY: for be_exec_version=4
- /// in order to solve agg of sum/count is not compatibility during the
upgrade process
- constexpr static int AGG_FUNCTION_NEW = 7;
- /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW. replace function
to old version.
- std::unordered_map<std::string, std::string> function_to_replace;
public:
void register_nullable_function_combinator(const Creator& creator) {
@@ -177,21 +172,19 @@ public:
}
}
- /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW
void register_alternative_function(const std::string& name, const Creator&
creator,
- bool nullable = false) {
- static std::string suffix {"_old_for_version_before_2_0"};
- register_function(name + suffix, creator, nullable);
- function_to_replace[name] = name + suffix;
+ bool nullable, int old_be_exec_version)
{
+ auto new_name = name +
BeExecVersionManager::get_function_suffix(old_be_exec_version);
+ register_function(new_name, creator, nullable);
+
BeExecVersionManager::registe_old_function_compatibility(old_be_exec_version,
name);
}
- /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW
void temporary_function_update(int fe_version_now, std::string& name) {
- // replace if fe is old version.
- if (fe_version_now < AGG_FUNCTION_NEW &&
- function_to_replace.find(name) != function_to_replace.end()) {
- name = function_to_replace[name];
+ int old_version =
BeExecVersionManager::get_function_compatibility(fe_version_now, name);
+ if (!old_version) {
+ return;
}
+ name = name + BeExecVersionManager::get_function_suffix(old_version);
}
static AggregateFunctionSimpleFactory& instance();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
index 1d977c1c528..b9e39552395 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
@@ -109,13 +109,17 @@ void
register_aggregate_function_stddev_variance_pop(AggregateFunctionSimpleFact
void
register_aggregate_function_stddev_variance_samp_old(AggregateFunctionSimpleFactory&
factory) {
factory.register_alternative_function(
- "variance_samp",
create_aggregate_function_variance_samp_older<false, false>);
+ "variance_samp",
create_aggregate_function_variance_samp_older<false, false>, false,
+ AGG_FUNCTION_NULLABLE);
factory.register_alternative_function(
- "variance_samp",
create_aggregate_function_variance_samp_older<false, true>, true);
+ "variance_samp",
create_aggregate_function_variance_samp_older<false, true>, true,
+ AGG_FUNCTION_NULLABLE);
factory.register_alternative_function("stddev_samp",
-
create_aggregate_function_stddev_samp_older<true, false>);
- factory.register_alternative_function(
- "stddev_samp", create_aggregate_function_stddev_samp_older<true,
true>, true);
+
create_aggregate_function_stddev_samp_older<true, false>,
+ false, AGG_FUNCTION_NULLABLE);
+ factory.register_alternative_function("stddev_samp",
+
create_aggregate_function_stddev_samp_older<true, true>,
+ true, AGG_FUNCTION_NULLABLE);
}
void
register_aggregate_function_stddev_variance_samp(AggregateFunctionSimpleFactory&
factory) {
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
index 8bfdcc26f43..598c23eb147 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
@@ -77,8 +77,10 @@ void
register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& f
}
void
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory&
factory) {
factory.register_alternative_function("window_funnel",
-
create_aggregate_function_window_funnel_old, true);
+
create_aggregate_function_window_funnel_old, true,
+ AGG_FUNCTION_NEW_WINDOW_FUNNEL);
factory.register_alternative_function("window_funnel",
-
create_aggregate_function_window_funnel_old, false);
+
create_aggregate_function_window_funnel_old, false,
+ AGG_FUNCTION_NEW_WINDOW_FUNNEL);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_agg_state.h
b/be/src/vec/data_types/data_type_agg_state.h
index d7089503b01..35f86f23b2b 100644
--- a/be/src/vec/data_types/data_type_agg_state.h
+++ b/be/src/vec/data_types/data_type_agg_state.h
@@ -122,9 +122,9 @@ public:
DataTypePtr get_serialized_type() const { return _agg_serialized_type; }
- void check_agg_state_compatibility(int read_be_exec_version) const {
-
BeExecVersionManager::check_agg_state_compatibility(read_be_exec_version,
_be_exec_version,
-
get_nested_function()->get_name());
+ void check_function_compatibility(int read_be_exec_version) const {
+
BeExecVersionManager::check_function_compatibility(read_be_exec_version,
_be_exec_version,
+
get_nested_function()->get_name());
}
private:
diff --git a/be/src/vec/functions/simple_function_factory.h
b/be/src/vec/functions/simple_function_factory.h
index 33a3202c18e..d8b544d5bfd 100644
--- a/be/src/vec/functions/simple_function_factory.h
+++ b/be/src/vec/functions/simple_function_factory.h
@@ -151,14 +151,6 @@ public:
function_alias[alias] = name;
}
- /// @TEMPORARY: for be_exec_version=4
- template <class Function>
- void register_alternative_function() {
- static std::string suffix {"_old_for_version_before_5_0"};
- function_to_replace[Function::name] = Function::name + suffix;
- register_function(Function::name + suffix,
&createDefaultFunction<Function>);
- }
-
FunctionBasePtr get_function(const std::string& name, const
ColumnsWithTypeAndName& arguments,
const DataTypePtr& return_type,
int be_version =
BeExecVersionManager::get_newest_version()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]