This is an automated email from the ASF dual-hosted git repository.

aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ba58095  [ServerCnx] Close connection after receiving unexpected 
SendCommand (#12780)
ba58095 is described below

commit ba5809553344f074c5dce15618a70f3b20d368c7
Author: Michael Marshall <[email protected]>
AuthorDate: Sat Nov 13 17:09:39 2021 -0600

    [ServerCnx] Close connection after receiving unexpected SendCommand (#12780)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  4 +++-
 .../pulsar/broker/service/ServerCnxTest.java       | 22 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ba87469..90f0af0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1352,7 +1352,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         CompletableFuture<Producer> producerFuture = 
producers.get(send.getProducerId());
 
         if (producerFuture == null || !producerFuture.isDone() || 
producerFuture.isCompletedExceptionally()) {
-            log.warn("[{}] Producer had already been closed: {}", 
remoteAddress, send.getProducerId());
+            log.warn("[{}] Received message, but the producer is not ready : 
{}. Closing the connection.",
+                    remoteAddress, send.getProducerId());
+            close();
             return;
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 2dc57b5..3d3a30e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -649,6 +649,28 @@ public class ServerCnxTest {
     }
 
     @Test(timeOut = 30000)
+    public void testSendCommandBeforeCreatingProducer() throws Exception {
+        resetChannel();
+        setChannelConnected();
+
+        // test SEND before producer is created
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setPublishTime(System.currentTimeMillis())
+                .setProducerName("prod-name")
+                .setSequenceId(0);
+        ByteBuf data = Unpooled.buffer(1024);
+
+        ByteBuf clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1,
+                ChecksumType.None, messageMetadata, data));
+        channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
+        clientCommand.release();
+
+        // Then expect channel to close
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
!channel.isActive());
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
     public void testUseSameProducerName() throws Exception {
         resetChannel();
         setChannelConnected();

Reply via email to