lucasrpb opened a new issue #11883:
URL: https://github.com/apache/pulsar/issues/11883


   **Describe the bug**
   
   In the official documentation of Apache Pulsar it says that an exclusive 
subscription sees a consistent order for a single consumer. Suppose we have 
multiple consumers each of them with its own exclusive subscription and they 
are reading from a partitioned topic. As presented in the docs:
   
   "Decisions about routing and subscription modes **can be made separately in 
most cases**. In general, throughput concerns should guide partitioning/routing 
decisions while subscription decisions should be guided by application 
semantics.
   
   **There is no difference between partitioned topics and normal topics in 
terms of how subscription modes work**, as partitioning only determines what 
happens between when a message is published by a producer and processed and 
acknowledged by a consumer."
   
   <img 
src="https://user-images.githubusercontent.com/1669154/131706634-655d2fd8-6ee1-429f-8eda-47728bea3a51.png";
 width="400" >
   
   Those statements lead us to infer that the readers would get a consistent 
global ordering among topic partitions. But that is not the case in my tests so 
far:
   
   <img 
src="https://user-images.githubusercontent.com/1669154/131708602-554d35ea-e8d3-4b65-8b75-0c2171b5c799.PNG";
 width="800" >
   
   
   The code I have used to test it (Scala): 
   
   **Producer:**
   
   
         package pulsar
         
             import org.apache.pulsar.client.api.PulsarClient
             import java.util.UUID
         
             object Producer {
         
               def main(args: Array[String]): Unit = {
         
                 val client = PulsarClient.builder()
                   .serviceUrl(SERVICE_URL)
                   .allowTlsInsecureConnection(true)
                   .build()
         
                 val producer = client.newProducer()
                   .topic(TOPIC)
                   .enableBatching(true)
                   //.accessMode(ProducerAccessMode.Exclusive)
                   .create()
         
                 for(i<-0 until 100){
                   val key = UUID.randomUUID().toString.getBytes()
                   //val key = s"Hello-${i}".getBytes()
                   
producer.newMessage().orderingKey("k0".getBytes()).value(key).send()
         
                   println(s"produced msg: ${i.toString}")
                 }
         
                 producer.flush()
         
                 producer.close()
                 client.close()
               }
         
             }
   
   
   
   **Consumer:**
   
      ```
    import org.apache.pulsar.client.api.{PulsarClient, 
SubscriptionInitialPosition, SubscriptionType}
   
       object Consumer  {
       
             def main(args: Array[String]): Unit = {
           
               val client = PulsarClient.builder()
                 .serviceUrl(SERVICE_URL)
                 .allowTlsInsecureConnection(true)
                 .build()
           
               var l1 = Seq.empty[String]
               var l2 = Seq.empty[String]
           
               val c1 = client.newConsumer()
                 .topic(TOPIC)
                 .subscriptionType(SubscriptionType.Exclusive)
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionName("c1")
                 .subscribe()
           
               // To stop the consuption I put a limit (100) - this limit is 
known
               while(l1.length < 100){
                 val msg = c1.receive()
                 val str = new String(msg.getData)
           
                 println(s"${Console.MAGENTA_B}$str${Console.RESET}")
           
                 l1 = l1 :+ str
           
                 //c1.acknowledge(msg.getMessageId)
               }
           
               val c2 = client.newConsumer()
                 .topic(TOPIC)
                 .subscriptionType(SubscriptionType.Exclusive)
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionName("c2")
                 .subscribe()
           
               while(l2.length < 100){
                 val msg = c2.receive()
                 val str = new String(msg.getData)
           
                 println(s"${Console.GREEN_B}$str${Console.RESET}")
           
                 l2 = l2 :+ str
           
                 //c2.acknowledge(msg.getMessageId)
               }
           
               println()
               println(l1)
               println()
           
               println()
               println(l2)
               println()
           
               try {
                 assert(l1 == l2)
               } finally {
                 c1.close()
                 c2.close()
                 client.close()
               }
             }
       
       }
   ```
   
   
   Am I wrong about it or Pulsar does not support the described behavior I 
expect? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to