This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 25f9bc98fb0 [Cloud](Variant) support DESC merge variants's schema for
cloud mode … (#38143)
25f9bc98fb0 is described below
commit 25f9bc98fb0cd99e9ecd1b1ec0b5f29609f2ac1d
Author: lihangyu <[email protected]>
AuthorDate: Fri Jul 19 16:49:22 2024 +0800
[Cloud](Variant) support DESC merge variants's schema for cloud mode …
(#38143)
…(#37955)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/cloud/cloud_tablet.cpp | 16 ++++++++++++++++
be/src/cloud/cloud_tablet.h | 3 +++
be/src/olap/base_tablet.h | 4 ++++
be/src/service/internal_service.cpp | 16 ++++++++--------
be/src/service/internal_service.h | 10 +++++-----
regression-test/suites/variant_p0/desc.groovy | 20 +++++++++++++++++---
6 files changed, 53 insertions(+), 16 deletions(-)
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 50c8765a18d..17ec1fe22b0 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -28,6 +28,7 @@
#include <memory>
#include <shared_mutex>
#include <unordered_map>
+#include <vector>
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
@@ -43,8 +44,10 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/debug_points.h"
+#include "vec/common/schema_util.h"
namespace doris {
using namespace ErrorCode;
@@ -132,6 +135,19 @@ Status CloudTablet::sync_rowsets(int64_t query_version,
bool warmup_delta_data)
return st;
}
+TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
+ std::shared_lock rdlock(_meta_lock);
+ TabletSchemaSPtr target_schema;
+ std::vector<TabletSchemaSPtr> schemas;
+ for (const auto& [_, rowset] : _rs_version_map) {
+ schemas.push_back(rowset->tablet_schema());
+ }
+ // get the max version schema and merge all schema
+ static_cast<void>(
+ vectorized::schema_util::get_least_common_schema(schemas, nullptr,
target_schema));
+ return target_schema;
+}
+
// Sync tablet meta and all rowset meta if not running.
// This could happen when BE didn't finish schema change job and another BE
committed this schema change job.
// It should be a quite rare situation.
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index ca05759cdbf..2e6938444d1 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -206,6 +206,9 @@ public:
int64_t last_cumu_compaction_success_time_ms = 0;
int64_t last_cumu_no_suitable_version_ms = 0;
+ // Return merged extended schema
+ TabletSchemaSPtr merged_tablet_schema() const override;
+
private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by
version
void update_base_size(const Rowset& rs);
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index f625ecf4a0a..cefb31ccd11 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -28,6 +28,7 @@
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
#include "olap/version_graph.h"
#include "util/metrics.h"
@@ -252,6 +253,9 @@ public:
const std::vector<RowsetSharedPtr>&
candidate_rowsets,
int limit);
+ // Return the merged schema of all rowsets
+ virtual TabletSchemaSPtr merged_tablet_schema() const { return
_max_version_schema; }
+
protected:
// Find the missed versions until the spec_version.
//
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 04abf2b09ce..9611e1a93cb 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1044,11 +1044,11 @@ struct AsyncRPCContext {
brpc::CallId cid;
};
-void
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController*
controller,
- const
PFetchRemoteSchemaRequest* request,
-
PFetchRemoteSchemaResponse* response,
-
google::protobuf::Closure* done) {
- bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
+void
PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcController*
controller,
+ const
PFetchRemoteSchemaRequest* request,
+ PFetchRemoteSchemaResponse*
response,
+ google::protobuf::Closure*
done) {
+ bool ret = _heavy_work_pool.try_offer([request, response, done]() {
brpc::ClosureGuard closure_guard(done);
Status st = Status::OK();
if (request->is_coordinator()) {
@@ -1120,13 +1120,13 @@ void
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcContr
if (!target_tablets.empty()) {
std::vector<TabletSchemaSPtr> tablet_schemas;
for (int64_t tablet_id : target_tablets) {
- TabletSharedPtr tablet =
_engine.tablet_manager()->get_tablet(tablet_id, false);
- if (tablet == nullptr) {
+ auto res = ExecEnv::get_tablet(tablet_id);
+ if (!res.has_value()) {
// just ignore
LOG(WARNING) << "tablet does not exist, tablet id is "
<< tablet_id;
continue;
}
- tablet_schemas.push_back(tablet->tablet_schema());
+
tablet_schemas.push_back(res.value()->merged_tablet_schema());
}
if (!tablet_schemas.empty()) {
// merge all
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 9cad429107a..7f3a2ca6f30 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -225,6 +225,11 @@ public:
PJdbcTestConnectionResult* result,
google::protobuf::Closure* done) override;
+ void fetch_remote_tablet_schema(google::protobuf::RpcController*
controller,
+ const PFetchRemoteSchemaRequest* request,
+ PFetchRemoteSchemaResponse* response,
+ google::protobuf::Closure* done) override;
+
private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
@@ -287,11 +292,6 @@ public:
PGetTabletVersionsResponse* response,
google::protobuf::Closure* done) override;
- void fetch_remote_tablet_schema(google::protobuf::RpcController*
controller,
- const PFetchRemoteSchemaRequest* request,
- PFetchRemoteSchemaResponse* response,
- google::protobuf::Closure* done) override;
-
private:
void _response_pull_slave_rowset(const std::string& remote_host, int64_t
brpc_port,
int64_t txn_id, int64_t tablet_id,
int64_t node_id,
diff --git a/regression-test/suites/variant_p0/desc.groovy
b/regression-test/suites/variant_p0/desc.groovy
index dfb5b40794e..5efcda3a043 100644
--- a/regression-test/suites/variant_p0/desc.groovy
+++ b/regression-test/suites/variant_p0/desc.groovy
@@ -16,9 +16,9 @@
// under the License.
suite("regression_test_variant_desc", "nonConcurrent"){
- if (isCloudMode()) {
- return
- }
+ // if (isCloudMode()) {
+ // return
+ // }
def load_json_data = {table_name, file_name ->
// load the json data
@@ -101,10 +101,13 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """set describe_extend_variant_column = true"""
sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
union all select 0, '{"a": 1123}' as json_str union all select 0,
'{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096")
limit 4096 ;"""
+ // select for sync rowsets
+ sql "select * from sparse_columns limit 1"
qt_sql_1 """desc ${table_name}"""
sql "truncate table sparse_columns"
sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str
union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" :
{"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" =
"4096") limit 4096 ;"""
+ sql "select * from sparse_columns limit 1"
qt_sql_2 """desc ${table_name}"""
sql "truncate table sparse_columns"
@@ -115,6 +118,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1.0")
sql """insert into ${table_name} select 0, '{"a": 11245, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
union all select 0, '{"a": 1123}' as json_str union all select 0,
'{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096")
limit 4096 ;"""
+ sql "select * from no_sparse_columns limit 1"
qt_sql_3 """desc ${table_name}"""
sql "truncate table ${table_name}"
@@ -128,6 +132,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """insert into ${table_name} select 45000, '{"a": 11245, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str
union all select 45000, '{"a": 1123}' as json_str union all
select 45000, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from
numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${table_name} values(95000, '{"a": 11245, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
+ sql "select * from partition_data limit 1"
qt_sql_6_1 """desc ${table_name} partition p1"""
qt_sql_6_2 """desc ${table_name} partition p2"""
qt_sql_6_3 """desc ${table_name} partition p3"""
@@ -145,6 +150,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """insert into ${table_name} values(95000, '{"a": 11245, "b" :
[123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')"""
// drop p1
sql """alter table ${table_name} drop partition p1"""
+ sql "select * from drop_partition limit 1"
qt_sql_7 """desc ${table_name}"""
qt_sql_7_1 """desc ${table_name} partition p2"""
qt_sql_7_2 """desc ${table_name} partition p3"""
@@ -165,6 +171,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
properties("replication_num" = "1", "disable_auto_compaction" =
"false");
"""
sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 11245, "xxxx" : "kaana"}',
'{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" :
7.111}}')"""
+ sql "select * from ${table_name} limit 1"
qt_sql_8 """desc ${table_name}"""
sql "truncate table ${table_name}"
@@ -181,6 +188,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
properties("replication_num" = "1", "disable_auto_compaction" =
"false");
"""
sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}')"""
+ sql "select * from ${table_name} limit 1"
qt_sql_9 """desc ${table_name}"""
sql """set describe_extend_variant_column = true"""
qt_sql_9_1 """desc ${table_name}"""
@@ -191,12 +199,14 @@ suite("regression_test_variant_desc", "nonConcurrent"){
create_table.call(table_name, "5")
// add, drop columns
sql """INSERT INTO ${table_name} values(0, '{"k1":1, "k2": "hello
world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}')"""
+ sql "select * from ${table_name} limit 1"
sql """set describe_extend_variant_column = true"""
qt_sql_10 """desc ${table_name}"""
// add column
sql "alter table ${table_name} add column v2 variant default null"
sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}',
'{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" :
null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" :
123}}}')"""
+ sql "select * from ${table_name} limit 1"
qt_sql_10_1 """desc ${table_name}"""
// drop cloumn
sql "alter table ${table_name} drop column v2"
@@ -205,6 +215,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql "alter table ${table_name} add column v3 variant default null"
sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123,
{"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" :
{"akakaka" : null, "xxxx" : {"xxx" : 123}}}',
'{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456,
"d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" :
{"xxx" : 123}}}')"""
+ sql "select * from ${table_name} limit 1"
qt_sql_10_3 """desc ${table_name}"""
//sql "truncate table ${table_name}"
@@ -221,6 +232,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
"""
sql """ insert into ${table_name} values (0, '{"名字" : "jack",
"!@#^&*()": "11111", "金额" : 200, "画像" : {"地址" : "北京", "\\\u4E2C\\\u6587":
"unicode"}}')"""
sql """set describe_extend_variant_column = true"""
+ sql "select * from ${table_name} limit 1"
qt_sql_11 """desc ${table_name}"""
// varaint subcolumn: empty
@@ -237,6 +249,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """ insert into ${table_name} values (0, '{}')"""
sql """ insert into ${table_name} values (0, '100')"""
sql """set describe_extend_variant_column = true"""
+ sql "select * from ${table_name} limit 1"
qt_sql_12 """desc ${table_name}"""
@@ -247,6 +260,7 @@ suite("regression_test_variant_desc", "nonConcurrent"){
sql """insert into large_tablets values (3001, '{"b" : 10}')"""
sql """insert into large_tablets values (50001, '{"c" : 10}')"""
sql """insert into large_tablets values (99999, '{"d" : 10}')"""
+ sql "select * from ${table_name} limit 1"
sql """set max_fetch_remote_schema_tablet_count = 2"""
sql "desc large_tablets"
sql """set max_fetch_remote_schema_tablet_count = 128"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]