hi all:

    I have 3 grpc c++ servers(A/B/C) and they work as in broadcast model:
the client sent every request to serverA, and serverA then broadcast the
request the serverB & serverC in an asynchronous way.

                   Client
                      |
                     A
                   /    \
                 /       \
                B        C

but ServerA seems get a memory leak problem:   every 40k requests from
client will make its memory gain about 5m~8m.   I examined a lot on serverA
's code, but cannot find anything wrong.

Can anyone help take a look at my code(attchmented)?

Env:

os: win10
grpc:1.21.x
lang: c++

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/CAKymdqP9QCYQhLQUw7JFh5FWYa9rwnTwHVGh3T-VyJeB%2BPFHAg%40mail.gmail.com.
#include <memory>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <list>
#include <random>
#include <atomic>

#include <grpc++/grpc++.h>
#include <grpc/support/log.h>

#include "helloworld.grpc.pb.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientAsyncReaderWriter;
using grpc::ClientAsyncResponseReader;

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerCompletionQueue;
using grpc::CompletionQueue;
using grpc::Status;
using grpc::StatusCode;

using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;

int g_thread_num = 1;
int g_cq_num = 1;
int g_ins_pool = 1;
int g_channel_pool = 1;

CompletionQueue *g_client_cq;

std::atomic<void*>**    g_instance_pool = nullptr;

typedef std::shared_ptr<::grpc::Channel>  ShpChannel;
typedef std::vector<ShpChannel>  VecChannel;
typedef std::unique_ptr<VecChannel>  UptrVecChannel;

std::vector<std::string>  g_vec_backends{"192.168.0.100:50052","192.168.0.100:50053"};

struct ChannelPool {

    ChannelPool() {

        for (const auto &_svr : g_vec_backends) {
            this->m_backend_pool.insert(std::pair<std::string, UptrVecChannel>(_svr,UptrVecChannel(new VecChannel())));
            for (int i = 0; i < g_channel_pool; ++i)
                this->m_backend_pool[_svr]->emplace_back(grpc::CreateChannel(_svr, grpc::InsecureChannelCredentials()));
        }
    }

    ShpChannel PickupOneChannel(const std::string &svr) {
        if (m_backend_pool.find(svr) == m_backend_pool.cend())
            return ShpChannel();

        auto &_uptr_vec = m_backend_pool[svr];
        return _uptr_vec->operator[](this->GenerateRandom(0, g_channel_pool-1));
    }

    uint32_t GenerateRandom(uint32_t from, uint32_t to) {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<unsigned long> dis(from,to);
        return dis(gen);
    }

    //Read only after initialized.
    std::map<std::string, UptrVecChannel>     m_backend_pool;
} *g_channel;

class CallDataBase {

public:
    CallDataBase() {}
    virtual void Proceed(bool ok) = 0;
};

class CallDataServerBase :public CallDataBase {
public:

    CallDataServerBase(Greeter::AsyncService* service, ServerCompletionQueue* cq) : service_(service), cq_(cq){
    }

protected:

  Greeter::AsyncService* service_;
  ServerCompletionQueue* cq_;

  ServerContext ctx_;

  HelloRequest request_;
  HelloReply reply_;
};

class CallDataUnary;

class AsyncUnaryGreeterClient : public CallDataBase{
    enum class ClientStatus {
        PROCESS = 1,
        FINISH = 2
    };

public:
    AsyncUnaryGreeterClient(std::shared_ptr<Channel> channel, CompletionQueue *cq, CallDataUnary* pcd);

    ~AsyncUnaryGreeterClient();

    void AsyncSayHello(const std::string& user);

    void Proceed(bool ok);

private:

    ClientContext   context_;

    ClientStatus    status_;

    std::unique_ptr<Greeter::Stub> stub_;

    std::unique_ptr<ClientAsyncResponseReader<HelloReply>> stream_;

    CompletionQueue *cq_;

    HelloRequest request_;
    HelloReply response_;

    grpc::Status finish_status_ = grpc::Status::OK;

    CallDataUnary*  m_parent_call_data = nullptr;
};

class CallDataUnary : CallDataServerBase {

 public:

  CallDataUnary(Greeter::AsyncService* service, ServerCompletionQueue* cq) : CallDataServerBase(service,cq),responder_(&ctx_), status_(CREATE) {

      for (const auto &_svr : g_vec_backends) {
          auto _shp_channel =  g_channel->PickupOneChannel(_svr);
          m_backends.emplace_back(new AsyncUnaryGreeterClient(_shp_channel, g_client_cq, this));
      }

      status_ = PROCESS;
      service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this);

      this->m_done_counter.store(0);

      //std::cout << "pos_1_1" << std::endl;
  }

  ~CallDataUnary() {
      //std::cout << "pos_1_2" << std::endl;
  }

  void NotifyOneDone() {

      return ;

      int _pre = m_done_counter.fetch_add(1);
      if (_pre+1 == g_vec_backends.size()) {
        //reply_.set_message(this->request_.name());
        //status_ = FINISH;
        //responder_.Finish(reply_, Status::OK, this);
          //delete this;
          //for (auto* _p_svr : m_backends)
          //    delete _p_svr;
      }
  }

  void Proceed(bool ok) {

    if (status_ == PROCESS) {
        new CallDataUnary(service_, cq_);

        for (auto* _p_svr : m_backends)
            _p_svr->AsyncSayHello(request_.name());

        reply_.set_message(this->request_.name());
        status_ = FINISH;
        responder_.Finish(reply_, Status::OK, this);
    } else {
         GPR_ASSERT(status_ == FINISH);
         delete this;
    }
  }

 private:

    std::list<AsyncUnaryGreeterClient*>   m_backends;

    std::atomic<int>    m_done_counter;

    ServerAsyncResponseWriter<HelloReply> responder_;

    enum CallStatus { CREATE = 1, PROCESS, FINISH };
    CallStatus status_;
};

AsyncUnaryGreeterClient::AsyncUnaryGreeterClient(std::shared_ptr<Channel> channel, CompletionQueue *cq, CallDataUnary* pcd) {
    stub_ = Greeter::NewStub(channel);
    m_parent_call_data = pcd;
    cq_ = cq;
      //std::cout << "pos_2_1" << std::endl;
}

AsyncUnaryGreeterClient::~AsyncUnaryGreeterClient() {
      //std::cout << "pos_2_2" << std::endl;
}

void AsyncUnaryGreeterClient::AsyncSayHello(const std::string& user) {
    request_.set_name(user);

    status_ = ClientStatus::PROCESS;

    stream_ = stub_->PrepareAsyncSayHello(&context_, request_, cq_);
    stream_->StartCall();
    stream_->Finish(&response_, &finish_status_, this);
}

void AsyncUnaryGreeterClient::Proceed(bool ok) {
    if (!ok) {
        std::cout << "Unary client get non-ok result from peer:" << context_.peer() << std::endl;
        return;
    }

    switch (status_) {
        case ClientStatus::PROCESS:
            //std::cout << "Read a new message:" << response_.message() << " from peer:"  << context_.peer() << std::endl;
            m_parent_call_data->NotifyOneDone();
            delete this;
            break;
        default:
            std::cerr << "ClientUnexpected status:" << int(status_)  << std::endl;
            assert(false);
    }
}

class ServerImpl final {
 public:
  ~ServerImpl() {
    server_->Shutdown();
    for (const auto& _cq : m_cq)
        _cq->Shutdown();
  }

  void Run() {
    std::string server_address("0.0.0.0:50051");

    ServerBuilder builder;
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    builder.RegisterService(&service_);

    for (int i = 0; i < g_cq_num; ++i)
        m_cq.emplace_back(builder.AddCompletionQueue());

    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    //Fidxed #threads polling on the g_client_cq

    auto _lambda = [&](CompletionQueue* cq) {
        void* tag;
        bool ok;
        while (true) {
          GPR_ASSERT(cq->Next(&tag, &ok));
          CallDataBase* _p_ins = (CallDataBase*)tag;
          _p_ins->Proceed(ok);
        }
    };

    std::vector<std::thread*> _vec_client_threads;
    for (int i = 0; i < 2; ++i)
        _vec_client_threads.emplace_back(new std::thread(_lambda,g_client_cq));

    std::vector<std::thread*> _vec_threads;

    for (int i = 0; i < g_thread_num; ++i) {
        int _cq_idx = i % g_cq_num;
        for (int j = 0; j < g_ins_pool; ++j)
            new CallDataUnary(&service_, m_cq[_cq_idx].get());

        _vec_threads.emplace_back(new std::thread(&ServerImpl::HandleRpcs, this, _cq_idx));
    }

    std::cout << g_thread_num << " working aysnc threads spawned" << std::endl;

    for (const auto& _t : _vec_threads)
        _t->join();
  }

 private:

  void HandleRpcs(int cq_idx) {
    void* tag;
    bool ok;
    while (true) {
      GPR_ASSERT(m_cq[cq_idx]->Next(&tag, &ok));

      CallDataBase* _p_ins = (CallDataBase*)tag;
      _p_ins->Proceed(ok);
    }
  }

  std::vector<std::unique_ptr<ServerCompletionQueue>>  m_cq;

  Greeter::AsyncService service_;
  std::unique_ptr<Server> server_;
};

const char* ParseCmdPara( char* argv,const char* para) {
    auto p_target = std::strstr(argv,para);
    if (p_target == nullptr) {
        printf("para error argv[%s] should be %s \n",argv,para);
        return nullptr;
    }
    p_target += std::strlen(para);
    return p_target;
}

int main(int argc, char** argv) {
  if (argc != 5) {
      std::cout << "Usage:./program --thread=xx --cq=xx --pool=xx --channel_pool=xx";
      return 0;
  }

  g_client_cq = new CompletionQueue();

  g_thread_num = std::atoi(ParseCmdPara(argv[1],"--thread="));
  g_cq_num = std::atoi(ParseCmdPara(argv[2],"--cq="));
  g_ins_pool = std::atoi(ParseCmdPara(argv[3],"--pool="));
  g_channel_pool = std::atoi(ParseCmdPara(argv[4],"--channel_pool="));

  g_channel = new ChannelPool();

  ServerImpl server;
  server.Run();

  return 0;
}

Reply via email to