I found that on a server side the mutex mu is locking only the received_notes. But the to_send_notes are not locked and get accessed from different functions, that are called by bidirectional core from different threads. Slight modification of code made the server stable! (see how I placed the mutext lock). #include <algorithm> #include <chrono> #include <cmath> #include <iostream> #include <memory> #include <string> #include <thread> #include "helper.h" #include <grpc/grpc.h> #include <grpcpp/security/server_credentials.h> #include <grpcpp/server.h> #include <grpcpp/server_builder.h> #include <grpcpp/server_context.h> #ifdef BAZEL_BUILD #include "examples/protos/route_guide.grpc.pb.h" #else #include "route_guide.grpc.pb.h" #endif using grpc::CallbackServerContext; using grpc::Server; using grpc::ServerBuilder; using grpc::Status; using routeguide::Feature; using routeguide::Point; using routeguide::Rectangle; using routeguide::RouteGuide; using routeguide::RouteNote; using routeguide::RouteSummary; using std::chrono::system_clock; class RouteGuideImpl final : public RouteGuide::CallbackService { public: explicit RouteGuideImpl(const std::string& db) { routeguide::ParseDb(db, &feature_list_); } grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat( CallbackServerContext* context) override { class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> { public: Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes) : mu_(mu), received_notes_(received_notes) { StartRead(¬e_); } void OnDone() override { delete this; } void OnReadDone(bool ok) override { if (ok) { // Unlike the other example in this directory that's not using // the reactor pattern, we can't grab a local lock to secure the // access to the notes vector, because the reactor will most likely // make us jump threads, so we'll have to use a different locking // strategy. We'll grab the lock locally to build a copy of the // list of nodes we're going to send, then we'll grab the lock // again to append the received note to the existing vector. mu_->Lock(); std::copy_if(received_notes_->begin(), received_notes_->end(), std::back_inserter(to_send_notes_), [this](const RouteNote& note) { return note.location().latitude() == note_.location().latitude() && note.location().longitude() == note_.location().longitude(); }); notes_iterator_ = to_send_notes_.begin(); mu_->Unlock(); NextWrite(); } else { //std::cout << "some client finished" << std::endl; Finish(Status::OK); } } void OnWriteDone(bool ok) override { if (ok) { NextWrite(); } else { std::cout << "some client finished write" << std::endl; Finish(Status::OK); } } private: void NextWrite() { mu_->Lock(); if (notes_iterator_ != to_send_notes_.end()) { StartWrite(&*notes_iterator_); notes_iterator_++; } else { received_notes_->push_back(note_); StartRead(¬e_); } mu_->Unlock(); } RouteNote note_; absl::Mutex* mu_; std::vector<RouteNote>* received_notes_; std::vector<RouteNote> to_send_notes_; std::vector<RouteNote>::iterator notes_iterator_; }; return new Chatter(&mu_, &received_notes_); } private: std::vector<Feature> feature_list_; absl::Mutex mu_; std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_); }; void RunServer(const std::string& db_path) { std::string server_address("0.0.0.0:50051"); RouteGuideImpl service(db_path); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr<Server> server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; server->Wait(); } int main(int argc, char** argv) { // Expect only arg: --db_path=path/to/route_guide_db.json. std::string db = routeguide::GetDbFileContent(argc, argv); RunServer(db); return 0; }
Problem is still on client side - sometimes it crashes. I had a look in the source, but did not find anything strange. So the question is still open about the route_guide_callback_client.cc *and we can return back to my original question - HOW to make this server multi-client? It answers to all of the clients connected simultaneously, but I suppose it does not make any difference between them, as, for example, the iterator for the to_send_notes is the same. How to distinguish different clients in the callback bidirectional grpc mode??* среда, 22 марта 2023 г. в 02:15:55 UTC+3, Dmitry Gorelov: > [image: 2023-03-22 02_14_54-Window.png] > > среда, 22 марта 2023 г. в 02:00:25 UTC+3, Dmitry Gorelov: > >> Both of them , the client and server, are from route_guide example. I >> left only bidirectional part. >> after some attemts to run client, either the client or the server crash. >> >> Please help to fix them! >> >> >> среда, 22 марта 2023 г. в 01:59:12 UTC+3, Dmitry Gorelov: >> >>> //and this is client code >>> >>> /* >>> * >>> * Copyright 2021 gRPC authors. >>> * >>> * Licensed under the Apache License, Version 2.0 (the "License"); >>> * you may not use this file except in compliance with the License. >>> * You may obtain a copy of the License at >>> * >>> * http://www.apache.org/licenses/LICENSE-2.0 >>> * >>> * Unless required by applicable law or agreed to in writing, software >>> * distributed under the License is distributed on an "AS IS" BASIS, >>> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >>> implied. >>> * See the License for the specific language governing permissions and >>> * limitations under the License. >>> * >>> */ >>> >>> #include <chrono> >>> #include <condition_variable> >>> #include <iostream> >>> #include <memory> >>> #include <mutex> >>> #include <random> >>> >>> #include <string> >>> #include <thread> >>> >>> #include "helper.h" >>> >>> #include <grpc/grpc.h> >>> #include <grpcpp/alarm.h> >>> #include <grpcpp/channel.h> >>> #include <grpcpp/client_context.h> >>> #include <grpcpp/create_channel.h> >>> #include <grpcpp/security/credentials.h> >>> >>> #ifdef BAZEL_BUILD >>> #include "examples/protos/route_guide.grpc.pb.h" >>> #else >>> #include "route_guide.grpc.pb.h" >>> #endif >>> >>> using grpc::Channel; >>> using grpc::ClientContext; >>> >>> using grpc::Status; >>> using routeguide::Feature; >>> using routeguide::Point; >>> using routeguide::Rectangle; >>> using routeguide::RouteGuide; >>> using routeguide::RouteNote; >>> using routeguide::RouteSummary; >>> >>> Point MakePoint(long latitude, long longitude) { >>> Point p; >>> p.set_latitude(latitude); >>> p.set_longitude(longitude); >>> return p; >>> } >>> >>> Feature MakeFeature(const std::string& name, long latitude, long >>> longitude) { >>> Feature f; >>> f.set_name(name); >>> f.mutable_location()->CopyFrom(MakePoint(latitude, longitude)); >>> return f; >>> } >>> >>> RouteNote MakeRouteNote(const std::string& message, long latitude, >>> long longitude) { >>> RouteNote n; >>> n.set_message(message); >>> n.mutable_location()->CopyFrom(MakePoint(latitude, longitude)); >>> return n; >>> } >>> >>> class RouteGuideClient { >>> public: >>> RouteGuideClient(std::shared_ptr<Channel> channel, const std::string& >>> db) >>> : stub_(RouteGuide::NewStub(channel)) { >>> routeguide::ParseDb(db, &feature_list_); >>> } >>> >>> void RouteChat() { >>> class Chatter : public grpc::ClientBidiReactor<RouteNote, RouteNote> >>> { >>> public: >>> explicit Chatter(RouteGuide::Stub* stub) >>> : notes_{MakeRouteNote("First message", 0, 0), >>> MakeRouteNote("Second message", 0, 1), >>> MakeRouteNote("Third message", 1, 0), >>> MakeRouteNote("Fourth message", 0, 0)}, >>> notes_iterator_(notes_.begin()) { >>> stub->async()->RouteChat(&context_, this); >>> NextWrite(); >>> StartRead(&server_note_); >>> StartCall(); >>> >>> } >>> void OnWriteDone(bool /*ok*/) override { NextWrite(); } >>> void OnReadDone(bool ok) override { >>> if (ok) { >>> std::cout << "Got message " << server_note_.message() << " at " >>> << server_note_.location().latitude() << ", " >>> << server_note_.location().longitude() << std::endl; >>> StartRead(&server_note_); >>> } >>> } >>> void OnDone(const Status& s) override { >>> std::unique_lock<std::mutex> l(mu_); >>> status_ = s; >>> done_ = true; >>> cv_.notify_one(); >>> } >>> Status Await() { >>> std::unique_lock<std::mutex> l(mu_); >>> cv_.wait(l, [this] { return done_; }); >>> return std::move(status_); >>> } >>> >>> private: >>> void NextWrite() { >>> if (notes_iterator_ != notes_.end()) { >>> const auto& note = *notes_iterator_; >>> std::cout << "Sending message " << note.message() << " at " >>> << note.location().latitude() << ", " >>> << note.location().longitude() << std::endl; >>> StartWrite(¬e); >>> notes_iterator_++; >>> } else { >>> StartWritesDone(); >>> } >>> } >>> ClientContext context_; >>> const std::vector<RouteNote> notes_; >>> std::vector<RouteNote>::const_iterator notes_iterator_; >>> RouteNote server_note_; >>> std::mutex mu_; >>> std::condition_variable cv_; >>> Status status_; >>> bool done_ = false; >>> }; >>> >>> Chatter chatter(stub_.get()); >>> Status status = chatter.Await(); >>> if (!status.ok()) { >>> std::cout << "RouteChat rpc failed." << std::endl; >>> } >>> } >>> >>> private: >>> >>> const float kCoordFactor_ = 10000000.0; >>> std::unique_ptr<RouteGuide::Stub> stub_; >>> std::vector<Feature> feature_list_; >>> >>> }; >>> >>> int main(int argc, char** argv) { >>> // Expect only arg: --db_path=path/to/route_guide_db.json. >>> std::string db = routeguide::GetDbFileContent(argc, argv); >>> RouteGuideClient guide( >>> grpc::CreateChannel("localhost:50051", >>> grpc::InsecureChannelCredentials()), >>> db); >>> >>> std::cout << "-------------- RouteChat --------------" << std::endl; >>> guide.RouteChat(); >>> >>> return 0; >>> } >>> >>> среда, 22 марта 2023 г. в 01:42:07 UTC+3, Dmitry Gorelov: >>> >>>> Oh man, it is not working even with *one *client! same problem in >>>> proto_utils.h >>> >>> >>>> >>>> среда, 22 марта 2023 г. в 01:12:55 UTC+3, Dmitry Gorelov: >>>> >>>>> Hi All, >>>>> >>>>> please help to modify this peace of server code for bidirectional >>>>> stream in order to make it work correclty with *multiple clients* at >>>>> one time. Currently it crashes with segmentation fault in the >>>>> proto_utils.h. >>>>> >>>>> class RouteGuideImpl final : public RouteGuide::CallbackService { >>>>> public: >>>>> explicit RouteGuideImpl(const std::string& db) { >>>>> routeguide::ParseDb(db, &feature_list_); >>>>> } >>>>> >>>>> grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat( >>>>> CallbackServerContext* context) override { >>>>> class Chatter : public grpc::ServerBidiReactor<RouteNote, >>>>> RouteNote> { >>>>> public: >>>>> Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes) >>>>> : mu_(mu), received_notes_(received_notes) { >>>>> StartRead(¬e_); >>>>> } >>>>> >>>>> void OnDone() override { delete this; } >>>>> void OnReadDone(bool ok) override { >>>>> if (ok) { >>>>> // Unlike the other example in this directory that's not >>>>> using >>>>> // the reactor pattern, we can't grab a local lock to secure >>>>> the >>>>> // access to the notes vector, because the reactor will most >>>>> likely >>>>> // make us jump threads, so we'll have to use a different >>>>> locking >>>>> // strategy. We'll grab the lock locally to build a copy of >>>>> the >>>>> // list of nodes we're going to send, then we'll grab the >>>>> lock >>>>> // again to append the received note to the existing vector. >>>>> mu_->Lock(); >>>>> std::copy_if(received_notes_->begin(), >>>>> received_notes_->end(), >>>>> std::back_inserter(to_send_notes_), >>>>> [this](const RouteNote& note) { >>>>> return note.location().latitude() == >>>>> note_.location().latitude() && >>>>> note.location().longitude() == >>>>> note_.location().longitude(); >>>>> }); >>>>> mu_->Unlock(); >>>>> notes_iterator_ = to_send_notes_.begin(); >>>>> NextWrite(); >>>>> } else { >>>>> std::cout << "some client finished" << std::endl; >>>>> Finish(Status::OK); >>>>> } >>>>> } >>>>> void OnWriteDone(bool /*ok*/) override { NextWrite(); } >>>>> >>>>> private: >>>>> void NextWrite() { >>>>> if (notes_iterator_ != to_send_notes_.end()) { >>>>> StartWrite(&*notes_iterator_); >>>>> notes_iterator_++; >>>>> } else { >>>>> mu_->Lock(); >>>>> received_notes_->push_back(note_); >>>>> mu_->Unlock(); >>>>> StartRead(¬e_); >>>>> } >>>>> } >>>>> RouteNote note_; >>>>> absl::Mutex* mu_; >>>>> std::vector<RouteNote>* received_notes_; >>>>> std::vector<RouteNote> to_send_notes_; >>>>> std::vector<RouteNote>::iterator notes_iterator_; >>>>> }; >>>>> return new Chatter(&mu_, &received_notes_); >>>>> } >>>>> >>>>> private: >>>>> std::vector<Feature> feature_list_; >>>>> absl::Mutex mu_; >>>>> std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_); >>>>> }; >>>>> >>>> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/dc3df065-4e26-470c-a768-17f592236bd2n%40googlegroups.com.