github-actions[bot] commented on code in PR #22053: URL: https://github.com/apache/doris/pull/22053#discussion_r1280404763
########## be/test/runtime/load_stream_test.cpp: ########## @@ -0,0 +1,1133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <brpc/channel.h> +#include <brpc/server.h> +#include <brpc/stream.h> +#include <butil/logging.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/internal_service.pb.h> +#include <gflags/gflags.h> +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <olap/storage_engine.h> +#include <service/internal_service.h> +#include <unistd.h> + +#include <functional> + +#include "common/config.h" +#include "common/status.h" +#include "exec/tablet_info.h" +#include "gen_cpp/BackendService_types.h" +#include "gen_cpp/FrontendService_types.h" +#include "gtest/gtest_pred_impl.h" +#include "olap/olap_define.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/tablet_manager.h" +#include "olap/txn_manager.h" +#include "runtime/descriptor_helper.h" +#include "runtime/exec_env.h" +#include "runtime/load_stream_mgr.h" +#include "util/debug/leakcheck_disabler.h" + +using namespace brpc; + +namespace doris { + +static const uint32_t MAX_PATH_LEN = 1024; +StorageEngine* z_engine = nullptr; +static const std::string zTestDir = "./data_test/data/load_stream_mgr_test"; + +const int64_t NORMAL_TABLET_ID = 10000; +const int64_t ABNORMAL_TABLET_ID = 40000; +const int64_t NORMAL_INDEX_ID = 50000; +const int64_t ABNORMAL_INDEX_ID = 60000; +const int64_t NORMAL_PARTITION_ID = 50000; +const int64_t SCHEMA_HASH = 90000; +const uint32_t NORMAL_SENDER_ID = 0; +const uint32_t ABNORMAL_SENDER_ID = 10000; +const int64_t NORMAL_TXN_ID = 600001; +const UniqueId NORMAL_LOAD_ID(1, 1); +const UniqueId ABNORMAL_LOAD_ID(1, 0); +std::string ABNORMAL_STRING("abnormal"); + +void construct_schema(OlapTableSchemaParam* schema) { + // construct schema + TOlapTableSchemaParam tschema; + tschema.db_id = 1; + tschema.table_id = 2; + tschema.version = 0; + + // descriptor + { + TDescriptorTableBuilder dtb; + { + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("c2") + .column_pos(2) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(10) + .column_name("c3") + .column_pos(3) + .build()); + + tuple_builder.build(&dtb); + } + { + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("c2") + .column_pos(2) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(20) + .column_name("c3") + .column_pos(3) + .build()); + + tuple_builder.build(&dtb); + } + + auto desc_tbl = dtb.desc_tbl(); + tschema.slot_descs = desc_tbl.slotDescriptors; + tschema.tuple_desc = desc_tbl.tupleDescriptors[0]; + } + // index + tschema.indexes.resize(2); + tschema.indexes[0].id = NORMAL_INDEX_ID; + tschema.indexes[0].columns = {"c1", "c2", "c3"}; + + tschema.indexes[1].id = NORMAL_INDEX_ID + 1; + tschema.indexes[1].columns = {"c1", "c2", "c3"}; + + schema->init(tschema); +} + +// copied from delta_writer_test.cpp +static void create_tablet_request(int64_t tablet_id, int32_t schema_hash, + TCreateTabletReq* request) { + request->tablet_id = tablet_id; + request->__set_version(1); + request->tablet_schema.schema_hash = schema_hash; + request->tablet_schema.short_key_column_count = 6; + request->tablet_schema.keys_type = TKeysType::AGG_KEYS; + request->tablet_schema.storage_type = TStorageType::COLUMN; + request->__set_storage_format(TStorageFormat::V2); + + TColumn k1; + + k1.__set_is_key(true); + k1.column_type.type = TPrimitiveType::TINYINT; + request->tablet_schema.columns.push_back(k1); + + TColumn k2; + k2.column_name = "k2"; + k2.__set_is_key(true); + k2.column_type.type = TPrimitiveType::SMALLINT; + request->tablet_schema.columns.push_back(k2); + + TColumn k3; + k3.column_name = "k3"; + k3.__set_is_key(true); + k3.column_type.type = TPrimitiveType::INT; + request->tablet_schema.columns.push_back(k3); + + TColumn k4; + k4.column_name = "k4"; + k4.__set_is_key(true); + k4.column_type.type = TPrimitiveType::BIGINT; + request->tablet_schema.columns.push_back(k4); + + TColumn k5; + k5.column_name = "k5"; + k5.__set_is_key(true); + k5.column_type.type = TPrimitiveType::LARGEINT; + request->tablet_schema.columns.push_back(k5); + + TColumn k6; + k6.column_name = "k6"; + k6.__set_is_key(true); + k6.column_type.type = TPrimitiveType::DATE; + request->tablet_schema.columns.push_back(k6); + + TColumn k7; + k7.column_name = "k7"; + k7.__set_is_key(true); + k7.column_type.type = TPrimitiveType::DATETIME; + request->tablet_schema.columns.push_back(k7); + + TColumn k8; + k8.column_name = "k8"; + k8.__set_is_key(true); + k8.column_type.type = TPrimitiveType::CHAR; + k8.column_type.__set_len(4); + request->tablet_schema.columns.push_back(k8); + + TColumn k9; + k9.column_name = "k9"; + k9.__set_is_key(true); + k9.column_type.type = TPrimitiveType::VARCHAR; + k9.column_type.__set_len(65); + request->tablet_schema.columns.push_back(k9); + + TColumn k10; + k10.column_name = "k10"; + k10.__set_is_key(true); + k10.column_type.type = TPrimitiveType::DECIMALV2; + k10.column_type.__set_precision(6); + k10.column_type.__set_scale(3); + request->tablet_schema.columns.push_back(k10); + + TColumn k11; + k11.column_name = "k11"; + k11.__set_is_key(true); + k11.column_type.type = TPrimitiveType::DATEV2; + request->tablet_schema.columns.push_back(k11); + + TColumn v1; + v1.column_name = "v1"; + v1.__set_is_key(false); + v1.column_type.type = TPrimitiveType::TINYINT; + v1.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v1); + + TColumn v2; + v2.column_name = "v2"; + v2.__set_is_key(false); + v2.column_type.type = TPrimitiveType::SMALLINT; + v2.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v2); + + TColumn v3; + v3.column_name = "v3"; + v3.__set_is_key(false); + v3.column_type.type = TPrimitiveType::INT; + v3.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v3); + + TColumn v4; + v4.column_name = "v4"; + v4.__set_is_key(false); + v4.column_type.type = TPrimitiveType::BIGINT; + v4.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v4); + + TColumn v5; + v5.column_name = "v5"; + v5.__set_is_key(false); + v5.column_type.type = TPrimitiveType::LARGEINT; + v5.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v5); + + TColumn v6; + v6.column_name = "v6"; + v6.__set_is_key(false); + v6.column_type.type = TPrimitiveType::DATE; + v6.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v6); + + TColumn v7; + v7.column_name = "v7"; + v7.__set_is_key(false); + v7.column_type.type = TPrimitiveType::DATETIME; + v7.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v7); + + TColumn v8; + v8.column_name = "v8"; + v8.__set_is_key(false); + v8.column_type.type = TPrimitiveType::CHAR; + v8.column_type.__set_len(4); + v8.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v8); + + TColumn v9; + v9.column_name = "v9"; + v9.__set_is_key(false); + v9.column_type.type = TPrimitiveType::VARCHAR; + v9.column_type.__set_len(65); + v9.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v9); + + TColumn v10; + v10.column_name = "v10"; + v10.__set_is_key(false); + v10.column_type.type = TPrimitiveType::DECIMALV2; + v10.column_type.__set_precision(6); + v10.column_type.__set_scale(3); + v10.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v10); + + TColumn v11; + v11.column_name = "v11"; + v11.__set_is_key(false); + v11.column_type.type = TPrimitiveType::DATEV2; + v11.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v11); +} + +struct ResponseStat { + int32_t num = 0; + std::vector<int64_t> success_tablet_ids; + std::vector<int64_t> failed_tablet_ids; +}; +bthread::Mutex g_stat_lock; +static ResponseStat g_response_stat; + +void reset_response_stat() { + g_response_stat = ResponseStat(); +} + +class LoadStreamMgrTest : public testing::Test { +public: + class Handler : public brpc::StreamInputHandler { + public: + int on_received_messages(StreamId id, butil::IOBuf* const messages[], + size_t size) override { + for (size_t i = 0; i < size; i++) { + PWriteStreamSinkResponse response; + butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]); + response.ParseFromZeroCopyStream(&wrapper); + LOG(INFO) << "response " << response.DebugString(); + std::lock_guard lock_guard(g_stat_lock); + g_response_stat.num++; + for (auto& id : response.success_tablet_ids()) { + g_response_stat.success_tablet_ids.push_back(id); + } + for (auto& id : response.failed_tablet_ids()) { + g_response_stat.failed_tablet_ids.push_back(id); + } + } + + return 0; + } + void on_idle_timeout(StreamId id) override { std::cerr << "on_idle_timeout" << std::endl; } + void on_closed(StreamId id) override { std::cerr << "on_closed" << std::endl; } + }; + + class StreamService : public PBackendService { + public: + StreamService() : _sd(brpc::INVALID_STREAM_ID) {} + virtual ~StreamService() { brpc::StreamClose(_sd); }; + virtual void open_stream_sink(google::protobuf::RpcController* controller, + const POpenStreamSinkRequest* request, + POpenStreamSinkResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + std::unique_ptr<PStatus> status = std::make_unique<PStatus>(); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + brpc::StreamOptions stream_options; + + for (const auto& req : request->tablets()) { + TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager(); + TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id()); + if (tablet == nullptr) { + cntl->SetFailed("Tablet not found"); + status->set_status_code(TStatusCode::NOT_FOUND); + response->set_allocated_status(status.get()); + response->release_status(); + return; + } + auto resp = response->add_tablet_schemas(); + resp->set_index_id(req.index_id()); + resp->set_enable_unique_key_merge_on_write( + tablet->enable_unique_key_merge_on_write()); + tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); + } + + ExecEnv* env = ExecEnv::GetInstance(); + + auto load_stream_mgr = env->get_load_stream_mgr(); + LoadStreamSharedPtr load_stream; + auto st = load_stream_mgr->try_open_load_stream(request, &load_stream); + + stream_options.handler = load_stream.get(); + + StreamId streamid; + if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { + cntl->SetFailed("Fail to accept stream"); + status->set_status_code(TStatusCode::CANCELLED); + response->set_allocated_status(status.get()); + response->release_status(); + return; + } + + load_stream->add_rpc_stream(); + LOG(INFO) << "OOXXOO: get streamid =" << streamid; + + status->set_status_code(TStatusCode::OK); + response->set_allocated_status(status.get()); + response->release_status(); + } + + brpc::StreamId get_stream() { return _sd; } Review Comment: warning: method 'get_stream' can be made const [readability-make-member-function-const] ```suggestion brpc::StreamId get_stream() const { return _sd; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
