Hi,
We are build a cache system based on libprocess, and find when send/receive 
message at a high frequency, there always a decode error in libprocess make we 
loss the message.
So we do write a testcase. it’s just a ping-pong test, including a server on 
one node, and several clients on the other node. The clients send messages to 
server on parallel. when the server receives a message, it just gives a 
response to the client. We count the response numbers on the client side to see 
whether there is a message loss. And finally, we got the “Decoder error while 
receiving” error on the client side. It happens when we use two clients, and 
sends 200 messages each.
[cid:image004.png@01D2285F.D604AB00]

Are we use libprocess in a wrong way or there is bug in librpcess?

Here’s our test codes.
Client:
class ClientProcess : public ProtobufProcess<ClientProcess>
{
public:
  ClientProcess(
    int index_,
    int requestNum_)
    : ProcessBase("ClientProcess"+stringify(index_)),
      index(index_),
      requestNum(requestNum_) {}

  ~ClientProcess() {}

  virtual void initialize()
  {
    LOG(INFO) << "ClientProcess" << stringify(index) << " initialize";
    install("pong", &ClientProcess::pong);
    sendToServer();
  }

  void pong(const UPID& from, const string& body)
  {
    responseNum++;

    LOG(INFO) << "------" << self().id  << " recv response " << body;

    if (responseNum == requestNum) {
      LOG(INFO) << "ClientProcess" << stringify(index) << " receives "
                << responseNum << " responses";
    }
  }

  void sendToServer()
  {
    Address serverAddr = Address(net::IP::parse("server ip", AF_INET).get(), 
server port);
    UPID serverUpid = UPID("ServerProcess", serverAddr);
    link(serverUpid);

    for (int i = 0; i < requestNum; i++) {
      string msg = stringify(i);
      send(serverUpid, "ping", msg.c_str(), msg.size());
      LOG(INFO) << self().id << " send msg = " << msg;
    }
  }

  int index;
  int requestNum;
  int responseNum;
};

int main(int argc, char** argv)
{
  os::setenv("LIBPROCESS_IP", "client ip");
  os::setenv("LIBPROCESS_PORT", "client port");

  int requestNum = stoi(argv[1]);
  int concurrent = stoi(argv[2]);
  ClientProcess* client = NULL;
  UPID clientUpid;

  for (int i = 0; i < concurrent; i++) {
    client = new ClientProcess(i, requestNum);
    clientUpid = spawn(client);
  }

  wait(clientUpid);
  return 0;
}


Server:
class ServerProcess : public ProtobufProcess<ServerProcess>
{
public:
  ServerProcess() : ProcessBase("ServerProcess") {}

  ~ServerProcess() {}

  virtual void initialize()
  {
    LOG(INFO) << "ServerProcess initialize";
    install("ping", &ServerProcess::ping);
  }

  void ping(const UPID& from, const string& body)
  {
    if (!links.contains(from)) {
      link(from);
      links.insert(from);
    }
    LOG(INFO) << "recv from " << from.id << ", msg = " << body;
    send(from, "pong", body.c_str(), body.size());
  }

  hashset<UPID> links;
};

int main(int argc, char** argv)
{
  os::setenv("LIBPROCESS_IP", "server ip");
  os::setenv("LIBPROCESS_PORT", "server port");

  ServerProcess server;
  UPID serverUpid = spawn(&server);

  wait(serverUpid);
  return 0;
}


Follows is the message inner libprocess buffer.
The correct HTTP message
[cid:image001.png@01D2285E.86F0F1A0]


The error HTTP message
[cid:image002.png@01D2285E.86F0F1A0]



[cid:image003.png@01D2285E.86F0F1A0]


Su Teng  00241668


Distributed and Parallel Software Lab
Huawei Technologies Co., Ltd.
Email:sut...@huawei.com<mailto:sut...@huawei.com>



Reply via email to