lucasrpb opened a new issue #11883: URL: https://github.com/apache/pulsar/issues/11883
**Describe the bug** In the official documentation of Apache Pulsar it says that an exclusive subscription sees a consistent order for a single consumer. Suppose we have multiple consumers each of them with its own exclusive subscription and they are reading from a partitioned topic. As presented in the docs: "Decisions about routing and subscription modes **can be made separately in most cases**. In general, throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics. **There is no difference between partitioned topics and normal topics in terms of how subscription modes work**, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer." <img src="https://user-images.githubusercontent.com/1669154/131706634-655d2fd8-6ee1-429f-8eda-47728bea3a51.png" width="400" > Those statements lead us to infer that the readers would get a consistent global ordering among topic partitions. But that is not the case in my tests so far: <img src="https://user-images.githubusercontent.com/1669154/131708602-554d35ea-e8d3-4b65-8b75-0c2171b5c799.PNG" width="800" > The code I have used to test it (Scala): **Producer:** package pulsar import org.apache.pulsar.client.api.PulsarClient import java.util.UUID object Producer { def main(args: Array[String]): Unit = { val client = PulsarClient.builder() .serviceUrl(SERVICE_URL) .allowTlsInsecureConnection(true) .build() val producer = client.newProducer() .topic(TOPIC) .enableBatching(true) //.accessMode(ProducerAccessMode.Exclusive) .create() for(i<-0 until 100){ val key = UUID.randomUUID().toString.getBytes() //val key = s"Hello-${i}".getBytes() producer.newMessage().orderingKey("k0".getBytes()).value(key).send() println(s"produced msg: ${i.toString}") } producer.flush() producer.close() client.close() } } **Consumer:** ``` import org.apache.pulsar.client.api.{PulsarClient, SubscriptionInitialPosition, SubscriptionType} object Consumer { def main(args: Array[String]): Unit = { val client = PulsarClient.builder() .serviceUrl(SERVICE_URL) .allowTlsInsecureConnection(true) .build() var l1 = Seq.empty[String] var l2 = Seq.empty[String] val c1 = client.newConsumer() .topic(TOPIC) .subscriptionType(SubscriptionType.Exclusive) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName("c1") .subscribe() // To stop the consuption I put a limit (100) - this limit is known while(l1.length < 100){ val msg = c1.receive() val str = new String(msg.getData) println(s"${Console.MAGENTA_B}$str${Console.RESET}") l1 = l1 :+ str //c1.acknowledge(msg.getMessageId) } val c2 = client.newConsumer() .topic(TOPIC) .subscriptionType(SubscriptionType.Exclusive) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName("c2") .subscribe() while(l2.length < 100){ val msg = c2.receive() val str = new String(msg.getData) println(s"${Console.GREEN_B}$str${Console.RESET}") l2 = l2 :+ str //c2.acknowledge(msg.getMessageId) } println() println(l1) println() println() println(l2) println() try { assert(l1 == l2) } finally { c1.close() c2.close() client.close() } } } ``` Am I wrong about it or Pulsar does not support the described behavior I expect? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
