I found that on a server side the mutex mu is locking only the 
received_notes. But the to_send_notes are not locked and get accessed from 
different functions, that are called by bidirectional core from different 
threads. Slight modification of code made the server stable! (see how I 
placed the mutext lock).
         #include <algorithm>
         #include <chrono>
         #include <cmath>
         #include <iostream>
         #include <memory>
         #include <string>
         #include <thread>
         
         #include "helper.h"
         
         #include <grpc/grpc.h>
         #include <grpcpp/security/server_credentials.h>
         #include <grpcpp/server.h>
         #include <grpcpp/server_builder.h>
         #include <grpcpp/server_context.h>
         #ifdef BAZEL_BUILD
         #include "examples/protos/route_guide.grpc.pb.h"
         #else
         #include "route_guide.grpc.pb.h"
         #endif
         
         using grpc::CallbackServerContext;
         using grpc::Server;
         using grpc::ServerBuilder;
         using grpc::Status;
         using routeguide::Feature;
         using routeguide::Point;
         using routeguide::Rectangle;
         using routeguide::RouteGuide;
         using routeguide::RouteNote;
         using routeguide::RouteSummary;
         using std::chrono::system_clock;
         
         class RouteGuideImpl final : public RouteGuide::CallbackService {
          public:
           explicit RouteGuideImpl(const std::string& db) {
             routeguide::ParseDb(db, &feature_list_);
           }
         
           grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
               CallbackServerContext* context) override {
             class Chatter : public grpc::ServerBidiReactor<RouteNote, 
RouteNote> {
              public:
               Chatter(absl::Mutex* mu, std::vector<RouteNote>* 
received_notes)
                   : mu_(mu), received_notes_(received_notes) {
                 StartRead(&note_);
               }
         
               void OnDone() override { delete this; }
               void OnReadDone(bool ok) override 
               {
                 if (ok) 
                 {
                   // Unlike the other example in this directory that's not 
using
                   // the reactor pattern, we can't grab a local lock to 
secure the
                   // access to the notes vector, because the reactor will 
most likely
                   // make us jump threads, so we'll have to use a 
different locking
                   // strategy. We'll grab the lock locally to build a copy 
of the
                   // list of nodes we're going to send, then we'll grab 
the lock
                   // again to append the received note to the existing 
vector.
                   mu_->Lock();
                   std::copy_if(received_notes_->begin(), 
received_notes_->end(),
                                std::back_inserter(to_send_notes_),
                                [this](const RouteNote& note) {
                                  return note.location().latitude() ==
                                             note_.location().latitude() &&
                                         note.location().longitude() ==
                                             note_.location().longitude();
                                });          
                   notes_iterator_ = to_send_notes_.begin();
                   mu_->Unlock();
                   NextWrite();
                 } else {
                   //std::cout << "some client finished" << std::endl;
                   Finish(Status::OK);
                 }
               }
               void OnWriteDone(bool ok) override 
               { 
                 if (ok)
                 {
                   NextWrite(); 
                 }
                 else
                 {
                   std::cout << "some client finished write" << std::endl;
                   Finish(Status::OK);
                 }
               }
         
              private:
               void NextWrite() 
               {
                 mu_->Lock();
                 if (notes_iterator_ != to_send_notes_.end()) {
                   StartWrite(&*notes_iterator_);
                   notes_iterator_++;
                 } else {          
                   received_notes_->push_back(note_);          
                   StartRead(&note_);
                 }
                 mu_->Unlock();
               }
               RouteNote note_;
               absl::Mutex* mu_;
               std::vector<RouteNote>* received_notes_;
               std::vector<RouteNote> to_send_notes_;
               std::vector<RouteNote>::iterator notes_iterator_;
             };
             return new Chatter(&mu_, &received_notes_);
           }
         
          private:
           std::vector<Feature> feature_list_;
           absl::Mutex mu_;
           std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
         };
         
         void RunServer(const std::string& db_path) {
           std::string server_address("0.0.0.0:50051");
           RouteGuideImpl service(db_path);
         
           ServerBuilder builder;
           builder.AddListeningPort(server_address, 
grpc::InsecureServerCredentials());
           builder.RegisterService(&service);
           std::unique_ptr<Server> server(builder.BuildAndStart());
           std::cout << "Server listening on " << server_address << 
std::endl;
           server->Wait();
         }
         
         int main(int argc, char** argv) {
           // Expect only arg: --db_path=path/to/route_guide_db.json.
           std::string db = routeguide::GetDbFileContent(argc, argv);
           RunServer(db);
         
           return 0;
         }

Problem is still on client side - sometimes it crashes. I had a look in the 
source, but did not find anything strange. So the question is still open 
about the route_guide_callback_client.cc

*and we can return back to my original  question - HOW to make this server 
multi-client? It answers to all of the clients connected simultaneously, 
but I suppose it does not make any difference between them, as, for 
example, the iterator for the to_send_notes is the same. How to distinguish 
different clients in the callback bidirectional grpc mode??*

среда, 22 марта 2023 г. в 02:15:55 UTC+3, Dmitry Gorelov: 

> [image: 2023-03-22 02_14_54-Window.png]
>
> среда, 22 марта 2023 г. в 02:00:25 UTC+3, Dmitry Gorelov: 
>
>> Both of them , the client and server, are  from route_guide example. I 
>> left only bidirectional part.
>> after some attemts to run client, either the client or the server crash.
>>
>> Please help to fix them!
>>
>>
>> среда, 22 марта 2023 г. в 01:59:12 UTC+3, Dmitry Gorelov: 
>>
>>> //and this is client code
>>>
>>> /*
>>>  *
>>>  * Copyright 2021 gRPC authors.
>>>  *
>>>  * Licensed under the Apache License, Version 2.0 (the "License");
>>>  * you may not use this file except in compliance with the License.
>>>  * You may obtain a copy of the License at
>>>  *
>>>  *     http://www.apache.org/licenses/LICENSE-2.0
>>>  *
>>>  * Unless required by applicable law or agreed to in writing, software
>>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
>>> implied.
>>>  * See the License for the specific language governing permissions and
>>>  * limitations under the License.
>>>  *
>>>  */
>>>
>>> #include <chrono>
>>> #include <condition_variable>
>>> #include <iostream>
>>> #include <memory>
>>> #include <mutex>
>>> #include <random>
>>>
>>> #include <string>
>>> #include <thread>
>>>
>>> #include "helper.h"
>>>
>>> #include <grpc/grpc.h>
>>> #include <grpcpp/alarm.h>
>>> #include <grpcpp/channel.h>
>>> #include <grpcpp/client_context.h>
>>> #include <grpcpp/create_channel.h>
>>> #include <grpcpp/security/credentials.h>
>>>
>>> #ifdef BAZEL_BUILD
>>> #include "examples/protos/route_guide.grpc.pb.h"
>>> #else
>>> #include "route_guide.grpc.pb.h"
>>> #endif
>>>
>>> using grpc::Channel;
>>> using grpc::ClientContext;
>>>
>>> using grpc::Status;
>>> using routeguide::Feature;
>>> using routeguide::Point;
>>> using routeguide::Rectangle;
>>> using routeguide::RouteGuide;
>>> using routeguide::RouteNote;
>>> using routeguide::RouteSummary;
>>>
>>> Point MakePoint(long latitude, long longitude) {
>>>   Point p;
>>>   p.set_latitude(latitude);
>>>   p.set_longitude(longitude);
>>>   return p;
>>> }
>>>
>>> Feature MakeFeature(const std::string& name, long latitude, long 
>>> longitude) {
>>>   Feature f;
>>>   f.set_name(name);
>>>   f.mutable_location()->CopyFrom(MakePoint(latitude, longitude));
>>>   return f;
>>> }
>>>
>>> RouteNote MakeRouteNote(const std::string& message, long latitude,
>>>                         long longitude) {
>>>   RouteNote n;
>>>   n.set_message(message);
>>>   n.mutable_location()->CopyFrom(MakePoint(latitude, longitude));
>>>   return n;
>>> }
>>>
>>> class RouteGuideClient {
>>>  public:
>>>   RouteGuideClient(std::shared_ptr<Channel> channel, const std::string& 
>>> db)
>>>       : stub_(RouteGuide::NewStub(channel)) {
>>>     routeguide::ParseDb(db, &feature_list_);
>>>   }
>>>
>>>   void RouteChat() {
>>>     class Chatter : public grpc::ClientBidiReactor<RouteNote, RouteNote> 
>>> {
>>>      public:
>>>       explicit Chatter(RouteGuide::Stub* stub)
>>>           : notes_{MakeRouteNote("First message", 0, 0),
>>>                    MakeRouteNote("Second message", 0, 1),
>>>                    MakeRouteNote("Third message", 1, 0),
>>>                    MakeRouteNote("Fourth message", 0, 0)},
>>>             notes_iterator_(notes_.begin()) {
>>>         stub->async()->RouteChat(&context_, this);
>>>         NextWrite();
>>>         StartRead(&server_note_);
>>>         StartCall();
>>>
>>>       }
>>>       void OnWriteDone(bool /*ok*/) override { NextWrite(); }
>>>       void OnReadDone(bool ok) override {
>>>         if (ok) {
>>>           std::cout << "Got message " << server_note_.message() << " at "
>>>                     << server_note_.location().latitude() << ", "
>>>                     << server_note_.location().longitude() << std::endl;
>>>           StartRead(&server_note_);
>>>         }
>>>       }
>>>       void OnDone(const Status& s) override {
>>>         std::unique_lock<std::mutex> l(mu_);
>>>         status_ = s;
>>>         done_ = true;
>>>         cv_.notify_one();
>>>       }
>>>       Status Await() {
>>>         std::unique_lock<std::mutex> l(mu_);
>>>         cv_.wait(l, [this] { return done_; });
>>>         return std::move(status_);
>>>       }
>>>
>>>      private:
>>>       void NextWrite() {
>>>         if (notes_iterator_ != notes_.end()) {
>>>           const auto& note = *notes_iterator_;
>>>           std::cout << "Sending message " << note.message() << " at "
>>>                     << note.location().latitude() << ", "
>>>                     << note.location().longitude() << std::endl;
>>>           StartWrite(&note);
>>>           notes_iterator_++;
>>>         } else {
>>>           StartWritesDone();
>>>         }
>>>       }
>>>       ClientContext context_;
>>>       const std::vector<RouteNote> notes_;
>>>       std::vector<RouteNote>::const_iterator notes_iterator_;
>>>       RouteNote server_note_;
>>>       std::mutex mu_;
>>>       std::condition_variable cv_;
>>>       Status status_;
>>>       bool done_ = false;
>>>     };
>>>
>>>     Chatter chatter(stub_.get());
>>>     Status status = chatter.Await();
>>>     if (!status.ok()) {
>>>       std::cout << "RouteChat rpc failed." << std::endl;
>>>     }
>>>   }
>>>
>>>  private:  
>>>
>>>   const float kCoordFactor_ = 10000000.0;
>>>   std::unique_ptr<RouteGuide::Stub> stub_;
>>>   std::vector<Feature> feature_list_;
>>>
>>> };
>>>
>>> int main(int argc, char** argv) {
>>>   // Expect only arg: --db_path=path/to/route_guide_db.json.
>>>   std::string db = routeguide::GetDbFileContent(argc, argv);
>>>   RouteGuideClient guide(
>>>       grpc::CreateChannel("localhost:50051",
>>>                           grpc::InsecureChannelCredentials()),
>>>       db);
>>>
>>>   std::cout << "-------------- RouteChat --------------" << std::endl;
>>>   guide.RouteChat();
>>>
>>>   return 0;
>>> }
>>>
>>> среда, 22 марта 2023 г. в 01:42:07 UTC+3, Dmitry Gorelov: 
>>>
>>>> Oh man, it is not working even with *one *client! same problem in 
>>>> proto_utils.h
>>>
>>>
>>>>
>>>> среда, 22 марта 2023 г. в 01:12:55 UTC+3, Dmitry Gorelov: 
>>>>
>>>>> Hi All,
>>>>>
>>>>> please help to modify this peace of server code for bidirectional 
>>>>> stream in order to make it work correclty with *multiple clients* at 
>>>>> one time. Currently it crashes with segmentation fault in the 
>>>>> proto_utils.h.
>>>>>
>>>>> class RouteGuideImpl final : public RouteGuide::CallbackService {
>>>>>  public:
>>>>>   explicit RouteGuideImpl(const std::string& db) {
>>>>>     routeguide::ParseDb(db, &feature_list_);
>>>>>   }  
>>>>>
>>>>>   grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
>>>>>       CallbackServerContext* context) override {
>>>>>     class Chatter : public grpc::ServerBidiReactor<RouteNote, 
>>>>> RouteNote> {
>>>>>      public:
>>>>>       Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
>>>>>           : mu_(mu), received_notes_(received_notes) {
>>>>>         StartRead(&note_);
>>>>>       }
>>>>>
>>>>>       void OnDone() override { delete this; }
>>>>>       void OnReadDone(bool ok) override {
>>>>>         if (ok) {
>>>>>           // Unlike the other example in this directory that's not 
>>>>> using
>>>>>           // the reactor pattern, we can't grab a local lock to secure 
>>>>> the
>>>>>           // access to the notes vector, because the reactor will most 
>>>>> likely
>>>>>           // make us jump threads, so we'll have to use a different 
>>>>> locking
>>>>>           // strategy. We'll grab the lock locally to build a copy of 
>>>>> the
>>>>>           // list of nodes we're going to send, then we'll grab the 
>>>>> lock
>>>>>           // again to append the received note to the existing vector.
>>>>>           mu_->Lock();
>>>>>           std::copy_if(received_notes_->begin(), 
>>>>> received_notes_->end(),
>>>>>                        std::back_inserter(to_send_notes_),
>>>>>                        [this](const RouteNote& note) {
>>>>>                          return note.location().latitude() ==
>>>>>                                     note_.location().latitude() &&
>>>>>                                 note.location().longitude() ==
>>>>>                                     note_.location().longitude();
>>>>>                        });
>>>>>           mu_->Unlock();
>>>>>           notes_iterator_ = to_send_notes_.begin();
>>>>>           NextWrite();
>>>>>         } else {
>>>>>           std::cout << "some client finished" << std::endl;
>>>>>           Finish(Status::OK);
>>>>>         }
>>>>>       }
>>>>>       void OnWriteDone(bool /*ok*/) override { NextWrite(); }
>>>>>
>>>>>      private:
>>>>>       void NextWrite() {
>>>>>         if (notes_iterator_ != to_send_notes_.end()) {
>>>>>           StartWrite(&*notes_iterator_);
>>>>>           notes_iterator_++;
>>>>>         } else {
>>>>>           mu_->Lock();
>>>>>           received_notes_->push_back(note_);
>>>>>           mu_->Unlock();
>>>>>           StartRead(&note_);
>>>>>         }
>>>>>       }
>>>>>       RouteNote note_;
>>>>>       absl::Mutex* mu_;
>>>>>       std::vector<RouteNote>* received_notes_;
>>>>>       std::vector<RouteNote> to_send_notes_;
>>>>>       std::vector<RouteNote>::iterator notes_iterator_;
>>>>>     };
>>>>>     return new Chatter(&mu_, &received_notes_);
>>>>>   }
>>>>>
>>>>>  private:
>>>>>   std::vector<Feature> feature_list_;
>>>>>   absl::Mutex mu_;
>>>>>   std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
>>>>> };
>>>>>
>>>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/dc3df065-4e26-470c-a768-17f592236bd2n%40googlegroups.com.

Reply via email to