poorbarcode commented on code in PR #21203:
URL: https://github.com/apache/pulsar/pull/21203#discussion_r1330208404


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1826,7 +1826,10 @@ CompletableFuture<Void> removeReplicator(String 
remoteCluster) {
             ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
                 @Override
                 public void deleteCursorComplete(Object ctx) {
-                    replicators.remove(remoteCluster);
+                    Replicator replicator = replicators.remove(remoteCluster);
+                    if (replicator != null) {
+                        replicator.close();

Review Comment:
   Should we set the stat to `closed` in the method `disconnect`? Because there 
is a scenario like this:
   
   | time | thread `removeReplicator` | thread `startProducer` |
   | --- | --- | --- |
   | 1 | close producer |
   | 2 | set state to `Stopped` |
   | 3 | | start producer |
   | 4 | set close state to `closed` |
   
   There is an orphan producer created in step 3.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java:
##########
@@ -112,6 +112,70 @@ public void testRetryStartProducerStoppedByTopicRemove() 
throws Exception {
         });
     }
 
+    @Test
+    public void testExitRetryStartProducerAfterReplicatorDisconnect() throws 
Exception {
+        final String localCluster = "localCluster";
+        final String remoteCluster = "remoteCluster";
+        final String topicName = "remoteTopicName";
+        final String replicatorPrefix = "pulsar.repl";
+        final DefaultEventLoop eventLoopGroup = new DefaultEventLoop();
+        // Mock services.
+        final ServiceConfiguration pulsarConfig = 
mock(ServiceConfiguration.class);
+        final PulsarService pulsar = mock(PulsarService.class);
+        final BrokerService broker = mock(BrokerService.class);
+        final Topic localTopic = mock(Topic.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+        when(localClient.getCnxPool()).thenReturn(connectionPool);
+        final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+        when(remoteClient.getCnxPool()).thenReturn(connectionPool);
+        final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+        final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
+        topics.put(topicName, 
CompletableFuture.completedFuture(Optional.of(localTopic)));
+        when(broker.executor()).thenReturn(eventLoopGroup);
+        when(broker.getTopics()).thenReturn(topics);
+        
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
+        when(broker.pulsar()).thenReturn(pulsar);
+        when(pulsar.getClient()).thenReturn(localClient);
+        when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
+        when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
+        when(localTopic.getName()).thenReturn(topicName);
+        when(producerBuilder.topic(any())).thenReturn(producerBuilder);
+        
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
+        
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
+        when(producerBuilder.sendTimeout(anyInt(), 
any())).thenReturn(producerBuilder);
+        
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
+        
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+        // Mock create producer fail.
+        when(producerBuilder.create()).thenThrow(new RuntimeException("mocked 
ex"));

Review Comment:
   1. Why do you make the producer creation always fail?
   2. After the cursor was closed. The method `readEntries | readMoreEntries` 
will got an `CursorAlreadyClosedException`, then the producer will be closed. 
see 
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L440
   
   ```java
   if (exception instanceof CursorAlreadyClosedException) {
       log.error("[{}] Error reading entries because replicator is"
                       + " already deleted and cursor is already closed {}, 
({})",
               replicatorId, ctx, exception.getMessage(), exception);
       // replicator is already deleted and cursor is already closed so, 
producer should also be stopped
       closeProducerAsync();
       return;
   }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -116,6 +118,11 @@ public String getRemoteCluster() {
     // This method needs to be synchronized with disconnects else if there is 
a disconnect followed by startProducer
     // the end result can be disconnect.
     public synchronized void startProducer() {
+        if (isClosed) {

Review Comment:
   Instead of adding a new field `isClosed`, we can use `cursor.iaActive` to 
check if the replication task was closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to