This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 283f7733d2d [fix][broker] Expect msgs after server initiated
CloseProducer (#19446)
283f7733d2d is described below
commit 283f7733d2db0e98b0f4f42536f6a8fcc25ebccf
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 7 10:12:14 2023 -0600
[fix][broker] Expect msgs after server initiated CloseProducer (#19446)
(cherry picked from commit 524288cfbf0b83690b69e89344a809a001393228)
(cherry picked from commit 4cbe68e778323ab4f3325be0ffe2766ff43f54c0)
---
.../apache/pulsar/broker/service/ServerCnx.java | 22 ++++
.../pulsar/broker/service/ServerCnxTest.java | 124 ++++++++++++++++++---
2 files changed, 133 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1f08db86aaf..f5647ec6680 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -161,6 +162,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final String listenerName;
+ private final HashMap<Long, Long> recentlyClosedProducers;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private final BrokerInterceptor brokerInterceptor;
@@ -256,6 +258,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.expectedItems(8)
.concurrencyLevel(1)
.build();
+ this.recentlyClosedProducers = new HashMap<>();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages =
conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
@@ -1482,6 +1485,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Producer> producerFuture =
producers.get(send.getProducerId());
if (producerFuture == null || !producerFuture.isDone() ||
producerFuture.isCompletedExceptionally()) {
+ if (recentlyClosedProducers.containsKey(send.getProducerId())) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received message, but the producer was
recently closed : {}. Ignoring message.",
+ remoteAddress, send.getProducerId());
+ }
+ // We expect these messages because we recently closed the
producer. Do not close the connection.
+ return;
+ }
log.warn("[{}] Received message, but the producer is not ready :
{}. Closing the connection.",
remoteAddress, send.getProducerId());
close();
@@ -2536,6 +2547,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
+ // The client does not necessarily know that the producer is
closed, but the connection is still
+ // active, and there could be messages in flight already. We want
to ignore these messages for a time
+ // because they are expected. Once the interval has passed, the
client should have received the
+ // CloseProducer command and should not send any additional
messages until it sends a create Producer
+ // command.
+ final long epoch = producer.getEpoch();
+ final long producerId = producer.getProducerId();
+ recentlyClosedProducers.put(producerId, epoch);
+ ctx.executor().schedule(() -> {
+ recentlyClosedProducers.remove(producerId, epoch);
+ }, service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
} else {
close();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b4661cbe287..ee1789b322f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -94,6 +94,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
@@ -653,15 +654,7 @@ public class ServerCnxTest {
assertTrue(getResponse() instanceof CommandProducerSuccess);
// test SEND success
- MessageMetadata messageMetadata = new MessageMetadata()
- .setPublishTime(System.currentTimeMillis())
- .setProducerName("prod-name")
- .setSequenceId(0);
- ByteBuf data = Unpooled.buffer(1024);
-
- clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1,
ChecksumType.None, messageMetadata, data));
- channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
- clientCommand.release();
+ sendMessage();
assertTrue(getResponse() instanceof CommandSendReceipt);
channel.finish();
@@ -673,6 +666,115 @@ public class ServerCnxTest {
setChannelConnected();
// test SEND before producer is created
+ sendMessage();
+
+ // Then expect channel to close
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
!channel.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testSendCommandAfterBrokerClosedProducer() throws Exception {
+ resetChannel();
+ setChannelConnected();
+ setConnectionVersion(ProtocolVersion.v5.getValue());
+ serverCnx.cancelKeepAliveTask();
+
+ String producerName = "my-producer";
+
+ ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(clientCommand1);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+ // Call disconnect method on producer to trigger activity similar to
unloading
+ Producer producer = serverCnx.getProducers().get(1).get();
+ assertNotNull(producer);
+ producer.disconnect();
+ channel.runPendingTasks();
+ assertTrue(getResponse() instanceof CommandCloseProducer);
+
+ // Send message and expect no response
+ sendMessage();
+
+ // Move clock forward to trigger scheduled clean up task
+ channel.advanceTimeBy(svcConfig.getKeepAliveIntervalSeconds(),
TimeUnit.SECONDS);
+ channel.runScheduledPendingTasks();
+ assertTrue(channel.outboundMessages().isEmpty());
+ assertTrue(channel.isActive());
+
+ // Send message and expect closed connection
+ sendMessage();
+
+ // Then expect channel to close
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
!channel.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void
testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws
Exception {
+ resetChannel();
+ setChannelConnected();
+ setConnectionVersion(ProtocolVersion.v5.getValue());
+ serverCnx.cancelKeepAliveTask();
+
+ String producerName = "my-producer";
+
+ ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(clientCommand1);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+ // Call disconnect method on producer to trigger activity similar to
unloading
+ Producer producer = serverCnx.getProducers().get(1).get();
+ assertNotNull(producer);
+ producer.disconnect();
+ channel.runPendingTasks();
+ assertTrue(getResponse() instanceof CommandCloseProducer);
+
+ // Send message and expect no response
+ sendMessage();
+
+ assertTrue(channel.outboundMessages().isEmpty());
+
+ // Move clock forward to trigger scheduled clean up task
+ ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(createProducer2);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+ // Send message and expect success
+ sendMessage();
+
+ assertTrue(getResponse() instanceof CommandSendReceipt);
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws
Exception {
+ resetChannel();
+ setChannelConnected();
+ setConnectionVersion(ProtocolVersion.v5.getValue());
+ serverCnx.cancelKeepAliveTask();
+
+ String producerName = "my-producer";
+
+ ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(clientCommand1);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+ ByteBuf closeProducer = Commands.newCloseProducer(1,2);
+ channel.writeInbound(closeProducer);
+ assertTrue(getResponse() instanceof CommandSuccess);
+
+ // Send message and get disconnected
+ sendMessage();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
!channel.isActive());
+ channel.finish();
+ }
+
+ private void sendMessage() {
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
@@ -683,10 +785,6 @@ public class ServerCnxTest {
ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();
-
- // Then expect channel to close
- Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
!channel.isActive());
- channel.finish();
}
@Test(timeOut = 30000)