This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 4cbe68e7783 [fix][broker] Expect msgs after server initiated
CloseProducer (#19446)
4cbe68e7783 is described below
commit 4cbe68e778323ab4f3325be0ffe2766ff43f54c0
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Feb 7 10:12:14 2023 -0600
[fix][broker] Expect msgs after server initiated CloseProducer (#19446)
(cherry picked from commit 524288cfbf0b83690b69e89344a809a001393228)
---
.../apache/pulsar/broker/service/ServerCnx.java | 22 ++++
.../pulsar/broker/service/ServerCnxTest.java | 124 ++++++++++++++++++---
2 files changed, 133 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0ed20998196..3912005db98 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,6 +41,7 @@ import io.prometheus.client.Gauge;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@@ -164,6 +165,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final String listenerName;
+ private final HashMap<Long, Long> recentlyClosedProducers;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private final boolean enableSubscriptionPatternEvaluation;
@@ -261,6 +263,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.expectedItems(8)
.concurrencyLevel(1)
.build();
+ this.recentlyClosedProducers = new HashMap<>();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages =
conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
@@ -1537,6 +1540,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Producer> producerFuture =
producers.get(send.getProducerId());
if (producerFuture == null || !producerFuture.isDone() ||
producerFuture.isCompletedExceptionally()) {
+ if (recentlyClosedProducers.containsKey(send.getProducerId())) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received message, but the producer was
recently closed : {}. Ignoring message.",
+ remoteAddress, send.getProducerId());
+ }
+ // We expect these messages because we recently closed the
producer. Do not close the connection.
+ return;
+ }
log.warn("[{}] Received message, but the producer is not ready :
{}. Closing the connection.",
remoteAddress, send.getProducerId());
close();
@@ -2660,6 +2671,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
+ // The client does not necessarily know that the producer is
closed, but the connection is still
+ // active, and there could be messages in flight already. We want
to ignore these messages for a time
+ // because they are expected. Once the interval has passed, the
client should have received the
+ // CloseProducer command and should not send any additional
messages until it sends a create Producer
+ // command.
+ final long epoch = producer.getEpoch();
+ final long producerId = producer.getProducerId();
+ recentlyClosedProducers.put(producerId, epoch);
+ ctx.executor().schedule(() -> {
+ recentlyClosedProducers.remove(producerId, epoch);
+ }, service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
} else {
close();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 78e994568e8..0c916988662 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -678,15 +679,7 @@ public class ServerCnxTest {
assertTrue(getResponse() instanceof CommandProducerSuccess);
// test SEND success
- MessageMetadata messageMetadata = new MessageMetadata()
- .setPublishTime(System.currentTimeMillis())
- .setProducerName("prod-name")
- .setSequenceId(0);
- ByteBuf data = Unpooled.buffer(1024);
-
- clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1,
ChecksumType.None, messageMetadata, data));
- channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
- clientCommand.release();
+ sendMessage();
assertTrue(getResponse() instanceof CommandSendReceipt);
channel.finish();
@@ -698,6 +691,115 @@ public class ServerCnxTest {
setChannelConnected();
// test SEND before producer is created
+ sendMessage();
+
+ // Then expect channel to close
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
!channel.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testSendCommandAfterBrokerClosedProducer() throws Exception {
+ resetChannel();
+ setChannelConnected();
+ setConnectionVersion(ProtocolVersion.v5.getValue());
+ serverCnx.cancelKeepAliveTask();
+
+ String producerName = "my-producer";
+
+ ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(clientCommand1);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+ // Call disconnect method on producer to trigger activity similar to
unloading
+ Producer producer = serverCnx.getProducers().get(1).get();
+ assertNotNull(producer);
+ producer.disconnect();
+ channel.runPendingTasks();
+ assertTrue(getResponse() instanceof CommandCloseProducer);
+
+ // Send message and expect no response
+ sendMessage();
+
+ // Move clock forward to trigger scheduled clean up task
+ channel.advanceTimeBy(svcConfig.getKeepAliveIntervalSeconds(),
TimeUnit.SECONDS);
+ channel.runScheduledPendingTasks();
+ assertTrue(channel.outboundMessages().isEmpty());
+ assertTrue(channel.isActive());
+
+ // Send message and expect closed connection
+ sendMessage();
+
+ // Then expect channel to close
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
!channel.isActive());
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void
testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws
Exception {
+ resetChannel();
+ setChannelConnected();
+ setConnectionVersion(ProtocolVersion.v5.getValue());
+ serverCnx.cancelKeepAliveTask();
+
+ String producerName = "my-producer";
+
+ ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(clientCommand1);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+ // Call disconnect method on producer to trigger activity similar to
unloading
+ Producer producer = serverCnx.getProducers().get(1).get();
+ assertNotNull(producer);
+ producer.disconnect();
+ channel.runPendingTasks();
+ assertTrue(getResponse() instanceof CommandCloseProducer);
+
+ // Send message and expect no response
+ sendMessage();
+
+ assertTrue(channel.outboundMessages().isEmpty());
+
+ // Move clock forward to trigger scheduled clean up task
+ ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(createProducer2);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+ // Send message and expect success
+ sendMessage();
+
+ assertTrue(getResponse() instanceof CommandSendReceipt);
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws
Exception {
+ resetChannel();
+ setChannelConnected();
+ setConnectionVersion(ProtocolVersion.v5.getValue());
+ serverCnx.cancelKeepAliveTask();
+
+ String producerName = "my-producer";
+
+ ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(clientCommand1);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+
+ ByteBuf closeProducer = Commands.newCloseProducer(1,2);
+ channel.writeInbound(closeProducer);
+ assertTrue(getResponse() instanceof CommandSuccess);
+
+ // Send message and get disconnected
+ sendMessage();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
!channel.isActive());
+ channel.finish();
+ }
+
+ private void sendMessage() {
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
@@ -708,10 +810,6 @@ public class ServerCnxTest {
ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();
-
- // Then expect channel to close
- Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() ->
!channel.isActive());
- channel.finish();
}
@Test(timeOut = 30000)