GOAL: send a lot of messages to all subscribers in one iteration. I have
40k-100k messages. I have started to use PUB/SUB socket type.
PROBLEM: number of received messages on subscribers is lower than number of
sent messages on publisher. If I add a Thread.Sleep(1) after sending each
message, then all messages get delivered, but with high number of messages
that needs to be delivered, this means 40-100 seconds delay. Which is
unacceptable.
The code below is in NetMQ (3.0.0) which is alpha build, but it is only for
example, as I have implemented same code in c using libzmq 3.2.4 (stable).
And simptoms are the same.
*Is there something I'm missing*? I had tried to change
publisher/subscriber socket options (higher ZMQ_SNDHWM aka
SendHighWatermark), to 100.000, but no efects.
Publisher and subscribers are on same network.
*Publisher/server side:*
using (var dbConn = new
OracleConnection(ConfigurationManager.AppSettings["ConnString"]))
using (NetMQContext ctx = NetMQContext.Create())
{
using (var publisher = ctx.CreatePublisherSocket())
{
publisher.Bind(ConfigurationManager.AppSettings["PubSocket"]);
dbConn.Open();
NetMQMessage m = new NetMQMessage();
while (true)
{
var updateIds = new List<int>();
var deletedIds = new List<int>();
var changedRules = GetChangedItems(dbConn, ref updateIds);
var deletedRules = GetDeletedItems(dbConn, ref deletedIds);
foreach (var kvPair in changedRules)
{
var item= kvPair.Value;
publisher.Send(ToCsvLine(item));
//Thread.Sleep(1);
}
foreach (var kvPair in deletedRules)
{
var item = kvPair.Value;
publisher.Send(ToCsvLine(item));
//Thread.Sleep(1);
}
Thread.Sleep(1);
publisher.Send("end");
Console.WriteLine("Sent updated: {0}", updateIds.Count);
Console.WriteLine("Sent deleted: {0}", deletedIds.Count);
Thread.Sleep(6000);
}
}
*Subscriber/client side:*
using (NetMQContext ctx = NetMQContext.Create())
{
using (var consumer = ctx.CreateSubscriberSocket())
{
consumer.Connect("tcp://192.168.1.122:6005");
consumer.Subscribe("");
int count = 0;
while (true)
{
try
{
count++;
string msg = consumer.ReceiveString();
if (msg == "end")
{
Console.WriteLine("Count: {0}", count);
count = 0;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.ReadLine();
}
}
}
}
Best gregards,
PL
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.