hanmz commented on code in PR #21203:
URL: https://github.com/apache/pulsar/pull/21203#discussion_r1330951850
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java:
##########
@@ -112,6 +112,70 @@ public void testRetryStartProducerStoppedByTopicRemove()
throws Exception {
});
}
+ @Test
+ public void testExitRetryStartProducerAfterReplicatorDisconnect() throws
Exception {
+ final String localCluster = "localCluster";
+ final String remoteCluster = "remoteCluster";
+ final String topicName = "remoteTopicName";
+ final String replicatorPrefix = "pulsar.repl";
+ final DefaultEventLoop eventLoopGroup = new DefaultEventLoop();
+ // Mock services.
+ final ServiceConfiguration pulsarConfig =
mock(ServiceConfiguration.class);
+ final PulsarService pulsar = mock(PulsarService.class);
+ final BrokerService broker = mock(BrokerService.class);
+ final Topic localTopic = mock(Topic.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+ when(localClient.getCnxPool()).thenReturn(connectionPool);
+ final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+ when(remoteClient.getCnxPool()).thenReturn(connectionPool);
+ final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+ final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
+ topics.put(topicName,
CompletableFuture.completedFuture(Optional.of(localTopic)));
+ when(broker.executor()).thenReturn(eventLoopGroup);
+ when(broker.getTopics()).thenReturn(topics);
+
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
+ when(broker.pulsar()).thenReturn(pulsar);
+ when(pulsar.getClient()).thenReturn(localClient);
+ when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
+ when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
+ when(localTopic.getName()).thenReturn(topicName);
+ when(producerBuilder.topic(any())).thenReturn(producerBuilder);
+
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
+
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
+ when(producerBuilder.sendTimeout(anyInt(),
any())).thenReturn(producerBuilder);
+
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
+
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+ // Mock create producer fail.
+ when(producerBuilder.create()).thenThrow(new RuntimeException("mocked
ex"));
Review Comment:
> > The problem we encountered is that after the replicator is closed, the
method replicator.startProducer is still executed repeatedly.
>
> Once the producer creates successful, the task will be stopped due to read
entries failing, right?
The simulated situation is that the producer creation fails, and disconnect
will still try to create the producer and cannot stop it.

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]