Hi Su Teng,

Glad to hear you're making use of libprocess, be aware that we currently
bundle it in the mesos repository and development occurs within the mesos
project at the current time.

This issue sounds like https://issues.apache.org/jira/browse/MESOS-5943

Are you obtaining libprocess from the mesos repository? Do you have
the following patch in the version of libprocess you are running?

https://reviews.apache.org/r/50634/

(Image attachments are dropped by the mail servers by the way)

Ben

On Sunday, October 16, 2016, Suteng <sut...@huawei.com> wrote:

> Hi,
>
> We are build a cache system based on libprocess, and find when
> send/receive message at a high frequency, there always a decode error in
> libprocess make we loss the message.
>
> So we do write a testcase. it’s just a ping-pong test, including a server
> on one node, and several clients on the other node. The clients send
> messages to server on parallel. when the server receives a message, it just
> gives a response to the client. We count the response numbers on the client
> side to see whether there is a message loss. And finally, we got the
> “Decoder error while receiving” error on the client side. It happens when
> we use two clients, and sends 200 messages each.
>
>
>
> Are we use libprocess in a wrong way or there is bug in librpcess?
>
>
>
> Here’s our test codes.
>
> Client:
>
> class ClientProcess : public ProtobufProcess<ClientProcess>
>
> {
>
> public:
>
>   ClientProcess(
>
>     int index_,
>
>     int requestNum_)
>
>     : ProcessBase("ClientProcess"+stringify(index_)),
>
>       index(index_),
>
>       requestNum(requestNum_) {}
>
>
>
>   ~ClientProcess() {}
>
>
>
>   virtual void initialize()
>
>   {
>
>     LOG(INFO) << "ClientProcess" << stringify(index) << " initialize";
>
>     install("pong", &ClientProcess::pong);
>
>     sendToServer();
>
>   }
>
>
>
>   void pong(const UPID& from, const string& body)
>
>   {
>
>     responseNum++;
>
>
>
>     LOG(INFO) << "------" << self().id  << " recv response " << body;
>
>
>
>     if (responseNum == requestNum) {
>
>       LOG(INFO) << "ClientProcess" << stringify(index) << " receives "
>
>                 << responseNum << " responses";
>
>     }
>
>   }
>
>
>
>   void sendToServer()
>
>   {
>
>     Address serverAddr = Address(net::IP::parse("server ip",
> AF_INET).get(), server port);
>
>     UPID serverUpid = UPID("ServerProcess", serverAddr);
>
>     link(serverUpid);
>
>
>
>     for (int i = 0; i < requestNum; i++) {
>
>       string msg = stringify(i);
>
>       send(serverUpid, "ping", msg.c_str(), msg.size());
>
>       LOG(INFO) << self().id << " send msg = " << msg;
>
>     }
>
>   }
>
>
>
>   int index;
>
>   int requestNum;
>
>   int responseNum;
>
> };
>
>
>
> int main(int argc, char** argv)
>
> {
>
>   os::setenv("LIBPROCESS_IP", "client ip");
>
>   os::setenv("LIBPROCESS_PORT", "client port");
>
>
>
>   int requestNum = stoi(argv[1]);
>
>   int concurrent = stoi(argv[2]);
>
>   ClientProcess* client = NULL;
>
>   UPID clientUpid;
>
>
>
>   for (int i = 0; i < concurrent; i++) {
>
>     client = new ClientProcess(i, requestNum);
>
>     clientUpid = spawn(client);
>
>   }
>
>
>
>   wait(clientUpid);
>
>   return 0;
>
> }
>
>
>
>
>
> Server:
>
> class ServerProcess : public ProtobufProcess<ServerProcess>
>
> {
>
> public:
>
>   ServerProcess() : ProcessBase("ServerProcess") {}
>
>
>
>   ~ServerProcess() {}
>
>
>
>   virtual void initialize()
>
>   {
>
>     LOG(INFO) << "ServerProcess initialize";
>
>     install("ping", &ServerProcess::ping);
>
>   }
>
>
>
>   void ping(const UPID& from, const string& body)
>
>   {
>
>     if (!links.contains(from)) {
>
>       link(from);
>
>       links.insert(from);
>
>     }
>
>     LOG(INFO) << "recv from " << from.id << ", msg = " << body;
>
>     send(from, "pong", body.c_str(), body.size());
>
>   }
>
>
>
>   hashset<UPID> links;
>
> };
>
>
>
> int main(int argc, char** argv)
>
> {
>
>   os::setenv("LIBPROCESS_IP", "server ip");
>
>   os::setenv("LIBPROCESS_PORT", "server port");
>
>
>
>   ServerProcess server;
>
>   UPID serverUpid = spawn(&server);
>
>
>
>   wait(serverUpid);
>
>   return 0;
>
> }
>
>
>
>
>
> Follows is the message inner libprocess buffer.
>
> The correct HTTP message
>
>
>
>
>
> The error HTTP message
>
>
>
>
>
>
>
>
>
> Su Teng  00241668
>
>
>
> Distributed and Parallel Software Lab
>
> Huawei Technologies Co., Ltd.
>
> Email:sut...@huawei.com
> <javascript:_e(%7B%7D,'cvml','sut...@huawei.com');>
>
>
>
>
>

Reply via email to