gungod2000 opened a new issue, #210:
URL: https://github.com/apache/pulsar-dotpulsar/issues/210

   hello,
   
   I found anthoer issue,simple code under here.
   
   thanks
   
   
----------------------------------------------------------------------------------------
   
   private class ConsumerEntity
   {
       public IConsumer<string> Consumer { get; set; }
   
       public string ThreadId {  get; set; } 
   }
   
   ConcurrentBag<ConsumerEntity> ConsumerPool = new 
ConcurrentBag<ConsumerEntity>();
   
   private void Form2_Load(object sender, EventArgs e)
   {
       //just a demo 
       //like this,it works normal,all messages will be received, speed is fast
       for (int i = 0; i < 10; i++)
       {
           Task.Factory.StartNew(() =>
           {
               ReceiveMesage(Thread.CurrentThread.ManagedThreadId.ToString());
           });
       }
   
       //****************[issue here]*******************
       //but like this ,receive message in task-child task,it works bad,receive 
message is very slowly,many consumer not receive message!!!!!
       for (int i = 0; i < 10; i++)
       {
           Task.Factory.StartNew(() =>
           {
               for (int j = 0; j < 10; j++)
               {
                   Task.Factory.StartNew(() =>
                   {
                       //receive message is very slowly,many consumer not 
receive message!!!!!I don't understand why???
                       
ReceiveMesage(Thread.CurrentThread.ManagedThreadId.ToString());
                   });
               }
           });
       }
   }
   
   private IMessage<string> ReceiveMesage(string threadId)
   {
       //demo code
   
       ConsumerEntity consumerEntity = new ConsumerEntity();
   
       //check consumerpool exist consumer by threadId
       //if not exist
       if (!ConsumerPool.Contains(consumerEntity))
       {
           //create new consumer
           consumerEntity.ThreadId = threadId;
           consumerEntity.Consumer = create Consumer();
           ConsumerPool.Add(consumerEntity);
       }
   
       CancellationToken cancellationToken = new CancellationToken();
       
       //use 
       return  
consumerEntity.Consumer.Receive(cancellationToken).GetAwaiter().GetResult();
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to