This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 18da60c01f4626f245bab157071069728c72e00d Author: fengyubiao <[email protected]> AuthorDate: Wed Dec 24 17:31:45 2025 +0800 [fix][client]Producer stuck or geo-replication stuck due to wrong value of message.numMessagesInBatch (#25106) (cherry picked from commit ab65faa12ab7279a726411152af44d81b6a6704b) --- .../client/api/SimpleProducerConsumerTest.java | 64 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 9 ++- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 662f03de2fb..c716ee23a00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -97,6 +97,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -112,6 +113,7 @@ import org.apache.pulsar.client.impl.schema.writer.AvroWriter; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; +import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.compression.CompressionCodec; @@ -5082,4 +5084,66 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { .createAsync().get(5, TimeUnit.SECONDS); assertNotNull(reader); } + + /** + * The internal producer of replicator will resend messages after reconnected. This test guarantees that the + * internal producer will continuously resent messages even though the client side encounters the following bugs. + * - The client side issue causes `message.metadata.numMessagesInBatch` being `0`, such as + * https://github.com/streamnative/pulsar-rs/issues/376. + * - Before the fix, the resend mechanism relies on `message.metadata.numMessagesInBatch`, after the fix, the + * producer only care about whether there are pending messages. + * see also https://github.com/apache/pulsar/pull/25106. + */ + @Test + public void testResendMessagesWhichNumMessagesInBatchIsZero() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp"); + final String subscriptionName = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest); + + // Create a producer which can be paused to publish. + AtomicBoolean stuckProducerReconnection = new AtomicBoolean(false); + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> + new ClientCnx(conf, eventLoopGroup) { + protected void handleProducerSuccess(CommandProducerSuccess success) { + if (stuckProducerReconnection.get()) { + synchronized (stuckProducerReconnection) { + super.handleProducerSuccess(success); + } + } else { + super.handleProducerSuccess(success); + } + } + }); + ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client.newProducer().topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .enableBatching(false).create(); + + // Trigger a resending by unloading topics. + AtomicReference<CompletableFuture<MessageId>> latestPublishing = new AtomicReference<>(); + synchronized (stuckProducerReconnection) { + stuckProducerReconnection.set(true); + admin.topics().unload(topic); + for (int i = 0; i < 10; i++) { + ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1); + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setUncompressedSize(1); + MessageImpl<byte[]> message1 = MessageImpl.create(topic, null, messageMetadata, payload, + Optional.empty(), null, Schema.BYTES, 0, true, 0); + // Mock bugs, which publish messages with 0 numMessagesInBatch. + message1.getMessageBuilder().setNumMessagesInBatch(0); + latestPublishing.set(producer1.sendAsync(message1)); + } + stuckProducerReconnection.set(false); + } + + // Verify: no messages being stuck. + latestPublishing.get().get(10, TimeUnit.SECONDS); + + // cleanup. + producer1.close(); + client.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 87669b11ee6..c33ae1940a5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -54,7 +54,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; @@ -1682,7 +1681,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne */ protected static class OpSendMsgQueue implements Iterable<OpSendMsg> { @VisibleForTesting - final Queue<OpSendMsg> delegate = new ArrayDeque<>(); + final ArrayDeque<OpSendMsg> delegate = new ArrayDeque<>(); private int forEachDepth = 0; private List<OpSendMsg> postponedOpSendMgs; private final AtomicInteger messagesCount = new AtomicInteger(0); @@ -1737,6 +1736,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return messagesCount.get(); } + public int size() { + return delegate.size(); + } + @Override public Iterator<OpSendMsg> iterator() { Iterator<OpSendMsg> delegateIterator = delegate.iterator(); @@ -2048,7 +2051,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } int messagesToResend = pendingMessages.messagesCount(); - if (messagesToResend == 0) { + if (pendingMessages.size() == 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] No pending messages to resend {}", topic, producerName, messagesToResend); }
