This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cad93713f7267b420784446a775c2caaaa036619 Author: Michael Marshall <[email protected]> AuthorDate: Sat Nov 13 17:09:39 2021 -0600 [ServerCnx] Close connection after receiving unexpected SendCommand (#12780) (cherry picked from commit ba5809553344f074c5dce15618a70f3b20d368c7) --- .../apache/pulsar/broker/service/ServerCnx.java | 4 +++- .../pulsar/broker/service/ServerCnxTest.java | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5be21cc..7762c22 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1352,7 +1352,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId()); if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) { - log.warn("[{}] Producer had already been closed: {}", remoteAddress, send.getProducerId()); + log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.", + remoteAddress, send.getProducerId()); + close(); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 2dc57b5..3d3a30e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -649,6 +649,28 @@ public class ServerCnxTest { } @Test(timeOut = 30000) + public void testSendCommandBeforeCreatingProducer() throws Exception { + resetChannel(); + setChannelConnected(); + + // test SEND before producer is created + MessageMetadata messageMetadata = new MessageMetadata() + .setPublishTime(System.currentTimeMillis()) + .setProducerName("prod-name") + .setSequenceId(0); + ByteBuf data = Unpooled.buffer(1024); + + ByteBuf clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, + ChecksumType.None, messageMetadata, data)); + channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); + clientCommand.release(); + + // Then expect channel to close + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); + channel.finish(); + } + + @Test(timeOut = 30000) public void testUseSameProducerName() throws Exception { resetChannel(); setChannelConnected();
