poorbarcode commented on code in PR #21203:
URL: https://github.com/apache/pulsar/pull/21203#discussion_r1461132530


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java:
##########
@@ -112,6 +112,70 @@ public void testRetryStartProducerStoppedByTopicRemove() 
throws Exception {
         });
     }
 
+    @Test
+    public void testExitRetryStartProducerAfterReplicatorDisconnect() throws 
Exception {
+        final String localCluster = "localCluster";
+        final String remoteCluster = "remoteCluster";
+        final String topicName = "remoteTopicName";
+        final String replicatorPrefix = "pulsar.repl";
+        final DefaultEventLoop eventLoopGroup = new DefaultEventLoop();
+        // Mock services.
+        final ServiceConfiguration pulsarConfig = 
mock(ServiceConfiguration.class);
+        final PulsarService pulsar = mock(PulsarService.class);
+        final BrokerService broker = mock(BrokerService.class);
+        final Topic localTopic = mock(Topic.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+        when(localClient.getCnxPool()).thenReturn(connectionPool);
+        final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+        when(remoteClient.getCnxPool()).thenReturn(connectionPool);
+        final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+        final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
+        topics.put(topicName, 
CompletableFuture.completedFuture(Optional.of(localTopic)));
+        when(broker.executor()).thenReturn(eventLoopGroup);
+        when(broker.getTopics()).thenReturn(topics);
+        
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
+        when(broker.pulsar()).thenReturn(pulsar);
+        when(pulsar.getClient()).thenReturn(localClient);
+        when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
+        when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
+        when(localTopic.getName()).thenReturn(topicName);
+        when(producerBuilder.topic(any())).thenReturn(producerBuilder);
+        
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
+        
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
+        when(producerBuilder.sendTimeout(anyInt(), 
any())).thenReturn(producerBuilder);
+        
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
+        
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+        // Mock create producer fail.
+        when(producerBuilder.create()).thenThrow(new RuntimeException("mocked 
ex"));

Review Comment:
   - If the topic is closed, the retries should be stopped, 
https://github.com/apache/pulsar/pull/20567 already fixed this issue(before the 
current PR). The test can not be passed because the topic is mocked and not 
closed. `checkTopicActiveAndRetryStartProducer` prevents the next `create 
producer` if the topic is closed.
   - If the replication has been disabled, the retries should be stopped. 
Current PR is trying to fix this issue.
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to