Hi,
We are build a cache system based on libprocess, and find when send/receive
message at a high frequency, there always a decode error in libprocess make we
loss the message.
So we do write a testcase. it’s just a ping-pong test, including a server on
one node, and several clients on the other node. The clients send messages to
server on parallel. when the server receives a message, it just gives a
response to the client. We count the response numbers on the client side to see
whether there is a message loss. And finally, we got the “Decoder error while
receiving” error on the client side. It happens when we use two clients, and
sends 200 messages each.
[cid:[email protected]]
Are we use libprocess in a wrong way or there is bug in librpcess?
Here’s our test codes.
Client:
class ClientProcess : public ProtobufProcess<ClientProcess>
{
public:
ClientProcess(
int index_,
int requestNum_)
: ProcessBase("ClientProcess"+stringify(index_)),
index(index_),
requestNum(requestNum_) {}
~ClientProcess() {}
virtual void initialize()
{
LOG(INFO) << "ClientProcess" << stringify(index) << " initialize";
install("pong", &ClientProcess::pong);
sendToServer();
}
void pong(const UPID& from, const string& body)
{
responseNum++;
LOG(INFO) << "------" << self().id << " recv response " << body;
if (responseNum == requestNum) {
LOG(INFO) << "ClientProcess" << stringify(index) << " receives "
<< responseNum << " responses";
}
}
void sendToServer()
{
Address serverAddr = Address(net::IP::parse("server ip", AF_INET).get(),
server port);
UPID serverUpid = UPID("ServerProcess", serverAddr);
link(serverUpid);
for (int i = 0; i < requestNum; i++) {
string msg = stringify(i);
send(serverUpid, "ping", msg.c_str(), msg.size());
LOG(INFO) << self().id << " send msg = " << msg;
}
}
int index;
int requestNum;
int responseNum;
};
int main(int argc, char** argv)
{
os::setenv("LIBPROCESS_IP", "client ip");
os::setenv("LIBPROCESS_PORT", "client port");
int requestNum = stoi(argv[1]);
int concurrent = stoi(argv[2]);
ClientProcess* client = NULL;
UPID clientUpid;
for (int i = 0; i < concurrent; i++) {
client = new ClientProcess(i, requestNum);
clientUpid = spawn(client);
}
wait(clientUpid);
return 0;
}
Server:
class ServerProcess : public ProtobufProcess<ServerProcess>
{
public:
ServerProcess() : ProcessBase("ServerProcess") {}
~ServerProcess() {}
virtual void initialize()
{
LOG(INFO) << "ServerProcess initialize";
install("ping", &ServerProcess::ping);
}
void ping(const UPID& from, const string& body)
{
if (!links.contains(from)) {
link(from);
links.insert(from);
}
LOG(INFO) << "recv from " << from.id << ", msg = " << body;
send(from, "pong", body.c_str(), body.size());
}
hashset<UPID> links;
};
int main(int argc, char** argv)
{
os::setenv("LIBPROCESS_IP", "server ip");
os::setenv("LIBPROCESS_PORT", "server port");
ServerProcess server;
UPID serverUpid = spawn(&server);
wait(serverUpid);
return 0;
}
Follows is the message inner libprocess buffer.
The correct HTTP message
[cid:[email protected]]
The error HTTP message
[cid:[email protected]]
[cid:[email protected]]
Su Teng 00241668
Distributed and Parallel Software Lab
Huawei Technologies Co., Ltd.
Email:[email protected]<mailto:[email protected]>