This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 283f7733d2d [fix][broker] Expect msgs after server initiated 
CloseProducer (#19446)
283f7733d2d is described below

commit 283f7733d2db0e98b0f4f42536f6a8fcc25ebccf
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 7 10:12:14 2023 -0600

    [fix][broker] Expect msgs after server initiated CloseProducer (#19446)
    
    (cherry picked from commit 524288cfbf0b83690b69e89344a809a001393228)
    (cherry picked from commit 4cbe68e778323ab4f3325be0ffe2766ff43f54c0)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  22 ++++
 .../pulsar/broker/service/ServerCnxTest.java       | 124 ++++++++++++++++++---
 2 files changed, 133 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1f08db86aaf..f5647ec6680 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -161,6 +162,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private final BrokerService service;
     private final SchemaRegistryService schemaService;
     private final String listenerName;
+    private final HashMap<Long, Long> recentlyClosedProducers;
     private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
     private final BrokerInterceptor brokerInterceptor;
@@ -256,6 +258,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 .expectedItems(8)
                 .concurrencyLevel(1)
                 .build();
+        this.recentlyClosedProducers = new HashMap<>();
         this.replicatorPrefix = conf.getReplicatorPrefix();
         this.maxNonPersistentPendingMessages = 
conf.getMaxConcurrentNonPersistentMessagePerConnection();
         this.proxyRoles = conf.getProxyRoles();
@@ -1482,6 +1485,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         CompletableFuture<Producer> producerFuture = 
producers.get(send.getProducerId());
 
         if (producerFuture == null || !producerFuture.isDone() || 
producerFuture.isCompletedExceptionally()) {
+            if (recentlyClosedProducers.containsKey(send.getProducerId())) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Received message, but the producer was 
recently closed : {}. Ignoring message.",
+                            remoteAddress, send.getProducerId());
+                }
+                // We expect these messages because we recently closed the 
producer. Do not close the connection.
+                return;
+            }
             log.warn("[{}] Received message, but the producer is not ready : 
{}. Closing the connection.",
                     remoteAddress, send.getProducerId());
             close();
@@ -2536,6 +2547,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         safelyRemoveProducer(producer);
         if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
             
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
+            // The client does not necessarily know that the producer is 
closed, but the connection is still
+            // active, and there could be messages in flight already. We want 
to ignore these messages for a time
+            // because they are expected. Once the interval has passed, the 
client should have received the
+            // CloseProducer command and should not send any additional 
messages until it sends a create Producer
+            // command.
+            final long epoch = producer.getEpoch();
+            final long producerId = producer.getProducerId();
+            recentlyClosedProducers.put(producerId, epoch);
+            ctx.executor().schedule(() -> {
+                recentlyClosedProducers.remove(producerId, epoch);
+            }, service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
         } else {
             close();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b4661cbe287..ee1789b322f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -94,6 +94,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
@@ -653,15 +654,7 @@ public class ServerCnxTest {
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
         // test SEND success
-        MessageMetadata messageMetadata = new MessageMetadata()
-                .setPublishTime(System.currentTimeMillis())
-                .setProducerName("prod-name")
-                .setSequenceId(0);
-        ByteBuf data = Unpooled.buffer(1024);
-
-        clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, 
ChecksumType.None, messageMetadata, data));
-        channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
-        clientCommand.release();
+        sendMessage();
 
         assertTrue(getResponse() instanceof CommandSendReceipt);
         channel.finish();
@@ -673,6 +666,115 @@ public class ServerCnxTest {
         setChannelConnected();
 
         // test SEND before producer is created
+        sendMessage();
+
+        // Then expect channel to close
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
!channel.isActive());
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void testSendCommandAfterBrokerClosedProducer() throws Exception {
+        resetChannel();
+        setChannelConnected();
+        setConnectionVersion(ProtocolVersion.v5.getValue());
+        serverCnx.cancelKeepAliveTask();
+
+        String producerName = "my-producer";
+
+        ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(clientCommand1);
+        assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+        // Call disconnect method on producer to trigger activity similar to 
unloading
+        Producer producer = serverCnx.getProducers().get(1).get();
+        assertNotNull(producer);
+        producer.disconnect();
+        channel.runPendingTasks();
+        assertTrue(getResponse() instanceof CommandCloseProducer);
+
+        // Send message and expect no response
+        sendMessage();
+
+        // Move clock forward to trigger scheduled clean up task
+        channel.advanceTimeBy(svcConfig.getKeepAliveIntervalSeconds(), 
TimeUnit.SECONDS);
+        channel.runScheduledPendingTasks();
+        assertTrue(channel.outboundMessages().isEmpty());
+        assertTrue(channel.isActive());
+
+        // Send message and expect closed connection
+        sendMessage();
+
+        // Then expect channel to close
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
!channel.isActive());
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void 
testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws 
Exception {
+        resetChannel();
+        setChannelConnected();
+        setConnectionVersion(ProtocolVersion.v5.getValue());
+        serverCnx.cancelKeepAliveTask();
+
+        String producerName = "my-producer";
+
+        ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(clientCommand1);
+        assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+        // Call disconnect method on producer to trigger activity similar to 
unloading
+        Producer producer = serverCnx.getProducers().get(1).get();
+        assertNotNull(producer);
+        producer.disconnect();
+        channel.runPendingTasks();
+        assertTrue(getResponse() instanceof CommandCloseProducer);
+
+        // Send message and expect no response
+        sendMessage();
+
+        assertTrue(channel.outboundMessages().isEmpty());
+
+        // Move clock forward to trigger scheduled clean up task
+        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(createProducer2);
+        assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+        // Send message and expect success
+        sendMessage();
+
+        assertTrue(getResponse() instanceof CommandSendReceipt);
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws 
Exception {
+        resetChannel();
+        setChannelConnected();
+        setConnectionVersion(ProtocolVersion.v5.getValue());
+        serverCnx.cancelKeepAliveTask();
+
+        String producerName = "my-producer";
+
+        ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(clientCommand1);
+        assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+        ByteBuf closeProducer = Commands.newCloseProducer(1,2);
+        channel.writeInbound(closeProducer);
+        assertTrue(getResponse() instanceof CommandSuccess);
+
+        // Send message and get disconnected
+        sendMessage();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
!channel.isActive());
+        channel.finish();
+    }
+
+    private void sendMessage() {
         MessageMetadata messageMetadata = new MessageMetadata()
                 .setPublishTime(System.currentTimeMillis())
                 .setProducerName("prod-name")
@@ -683,10 +785,6 @@ public class ServerCnxTest {
                 ChecksumType.None, messageMetadata, data));
         channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
         clientCommand.release();
-
-        // Then expect channel to close
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
!channel.isActive());
-        channel.finish();
     }
 
     @Test(timeOut = 30000)

Reply via email to