This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 64f574a706fb88c3fece21c3e30ee691ddce7505 Author: fengyubiao <[email protected]> AuthorDate: Sun Apr 27 17:56:31 2025 +0800 [fix][client] Fix incorrect producer.getPendingQueueSize due to incomplete queue implementation (#24184) Co-authored-by: Lari Hotari <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit a4a34091cbf58bfa080b42c101bd5094d82c41f3) --- .../java/org/apache/pulsar/schema/SchemaTest.java | 29 +++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 33 ++++++++++++++++++++-- .../pulsar/client/impl/OpSendMsgQueueTest.java | 18 ++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 21999989603..8908b9e96c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Cleanup; @@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -73,6 +75,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -88,6 +91,7 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -1445,4 +1449,29 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } } + + @Test + public void testPendingQueueSizeIfIncompatible() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns"); + admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME)); + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp"); + admin.topics().createNonPartitionedTopic(topic); + + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) + .maxPendingMessages(50).enableBatching(false).topic(topic).create(); + producer.newMessage(Schema.STRING).value("msg").sendAsync(); + AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>(); + for (int i = 0; i < 100; i++) { + latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync()); + } + Awaitility.await().untilAsserted(() -> { + assertTrue(latestSend.get().isDone()); + assertEquals(producer.getPendingQueueSize(), 0); + }); + + // cleanup. + producer.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 3383acfded3..4faaf18dd78 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1645,7 +1645,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne * This queue is not thread safe. */ protected static class OpSendMsgQueue implements Iterable<OpSendMsg> { - private final Queue<OpSendMsg> delegate = new ArrayDeque<>(); + @VisibleForTesting + final Queue<OpSendMsg> delegate = new ArrayDeque<>(); private int forEachDepth = 0; private List<OpSendMsg> postponedOpSendMgs; private final AtomicInteger messagesCount = new AtomicInteger(0); @@ -1702,7 +1703,35 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne @Override public Iterator<OpSendMsg> iterator() { - return delegate.iterator(); + Iterator<OpSendMsg> delegateIterator = delegate.iterator(); + return new Iterator<OpSendMsg>() { + OpSendMsg currentOp; + + @Override + public boolean hasNext() { + return delegateIterator.hasNext(); + } + + @Override + public OpSendMsg next() { + currentOp = delegateIterator.next(); + return currentOp; + } + + @Override + public void remove() { + delegateIterator.remove(); + if (currentOp != null) { + messagesCount.addAndGet(-currentOp.numMessagesInBatch); + currentOp = null; + } + } + + @Override + public void forEachRemaining(Consumer<? super OpSendMsg> action) { + delegateIterator.forEachRemaining(action); + } + }; } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java index 2db23782640..6dc25f42960 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import com.google.common.collect.Lists; import java.util.Arrays; +import java.util.Iterator; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -82,4 +83,21 @@ public class OpSendMsgQueueTest { // then assertEquals(Lists.newArrayList(queue), Arrays.asList(opSendMsg, opSendMsg2, opSendMsg3, opSendMsg4)); } + + @Test + public void testIteratorRemove() { + ProducerImpl.OpSendMsgQueue queue = new ProducerImpl.OpSendMsgQueue(); + for (int i = 0; i < 10; i++) { + queue.add(createDummyOpSendMsg()); + } + + Iterator<ProducerImpl.OpSendMsg> iterator = queue.iterator(); + while (iterator.hasNext()) { + iterator.next(); + iterator.remove(); + } + // Verify: the result of "messagesCount()" is 0 after removed all items. + assertEquals(queue.delegate.size(), 0); + assertEquals(queue.messagesCount(), 0); + } }
