equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r710671150



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
##########
@@ -462,8 +463,97 @@ private void subscribeFailDoesNotFailOtherConsumer(String 
topic1, String topic2)
         mockBrokerService.resetHandleSubscribe();
     }
 
-    // if a producer fails to connect while creating partitioned producer, it 
should close all successful connections of
-    // other producers and fail
+    // failed to connect to partition at initialization step if a producer 
which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnInitialization() throws Throwable 
{
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 1) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
+        });
+
+        try {
+            client.newProducer()
+                    .enableLazyStartPartitionedProducers(true)
+                    .accessMode(ProducerAccessMode.Shared)
+                    .topic("persistent://prop/use/ns/multi-part-t1").create();
+            fail("Should have failed with an authorization error");
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.AuthorizationException);
+        }
+
+        mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
+        client.close();
+    }
+
+    // failed to connect to partition at sending step if a producer which 
connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnSending() throws Throwable {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+        final AtomicInteger closeCounter = new AtomicInteger(0);
+        final String topicName = "persistent://prop/use/ns/multi-part-t1";
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 2) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
+        });
+
+        mockBrokerService.setHandleSend((ctx, send, headersAndPayload) ->
+            ctx.writeAndFlush(Commands.newSendReceipt(send.getProducerId(), 
send.getSequenceId(), send.getHighestSequenceId(), 0L, 0L))
+        );
+
+        mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+            
ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
+            closeCounter.incrementAndGet();
+        });
+
+        final PartitionedProducerImpl<byte[]> producer = 
(PartitionedProducerImpl<byte[]>) client.newProducer()
+                .enableLazyStartPartitionedProducers(true)
+                .accessMode(ProducerAccessMode.Shared)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        try {
+            producer.send("msg".getBytes());
+            fail("Should have failed with an not connected exception");

Review comment:
       > is it because when the routing mode is RoundRobinPartition, a 
partition different from the one when the producer was created is always 
selected when sending the first message?
   
   Yes. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to