This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2c3cffc8a4de7c91ae10655631955a58d805d3dd Author: fengyubiao <[email protected]> AuthorDate: Sun Apr 27 17:56:31 2025 +0800 [fix][client] Fix incorrect producer.getPendingQueueSize due to incomplete queue implementation (#24184) Co-authored-by: Lari Hotari <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit a4a34091cbf58bfa080b42c101bd5094d82c41f3) --- .../java/org/apache/pulsar/schema/SchemaTest.java | 33 ++++++++++++++++++++-- .../apache/pulsar/client/impl/ProducerImpl.java | 33 ++++++++++++++++++++-- .../pulsar/client/impl/OpSendMsgQueueTest.java | 18 ++++++++++++ 3 files changed, 80 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index c38580ec955..237f629e67a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Cleanup; @@ -65,6 +66,7 @@ import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -76,6 +78,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -94,6 +97,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -1452,9 +1456,9 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { } /** - * This test validates that consumer/producers should recover on topic whose + * This test validates that consumer/producers should recover on topic whose * schema ledgers are not able to open due to non-recoverable error. - * + * * @throws Exception */ @Test @@ -1521,4 +1525,29 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { consumer.close(); producer.close(); } + + @Test + public void testPendingQueueSizeIfIncompatible() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns"); + admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME)); + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp"); + admin.topics().createNonPartitionedTopic(topic); + + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) + .maxPendingMessages(50).enableBatching(false).topic(topic).create(); + producer.newMessage(Schema.STRING).value("msg").sendAsync(); + AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>(); + for (int i = 0; i < 100; i++) { + latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync()); + } + Awaitility.await().untilAsserted(() -> { + assertTrue(latestSend.get().isDone()); + assertEquals(producer.getPendingQueueSize(), 0); + }); + + // cleanup. + producer.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e311048a4a9..3119e5d535d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1717,7 +1717,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne * This queue is not thread safe. */ protected static class OpSendMsgQueue implements Iterable<OpSendMsg> { - private final Queue<OpSendMsg> delegate = new ArrayDeque<>(); + @VisibleForTesting + final Queue<OpSendMsg> delegate = new ArrayDeque<>(); private int forEachDepth = 0; private List<OpSendMsg> postponedOpSendMgs; private final AtomicInteger messagesCount = new AtomicInteger(0); @@ -1774,7 +1775,35 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne @Override public Iterator<OpSendMsg> iterator() { - return delegate.iterator(); + Iterator<OpSendMsg> delegateIterator = delegate.iterator(); + return new Iterator<OpSendMsg>() { + OpSendMsg currentOp; + + @Override + public boolean hasNext() { + return delegateIterator.hasNext(); + } + + @Override + public OpSendMsg next() { + currentOp = delegateIterator.next(); + return currentOp; + } + + @Override + public void remove() { + delegateIterator.remove(); + if (currentOp != null) { + messagesCount.addAndGet(-currentOp.numMessagesInBatch); + currentOp = null; + } + } + + @Override + public void forEachRemaining(Consumer<? super OpSendMsg> action) { + delegateIterator.forEachRemaining(action); + } + }; } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java index efcc06bede3..28787568dd1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import com.google.common.collect.Lists; import java.util.Arrays; +import java.util.Iterator; import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -83,4 +84,21 @@ public class OpSendMsgQueueTest { // then assertEquals(Lists.newArrayList(queue), Arrays.asList(opSendMsg, opSendMsg2, opSendMsg3, opSendMsg4)); } + + @Test + public void testIteratorRemove() { + ProducerImpl.OpSendMsgQueue queue = new ProducerImpl.OpSendMsgQueue(); + for (int i = 0; i < 10; i++) { + queue.add(createDummyOpSendMsg()); + } + + Iterator<ProducerImpl.OpSendMsg> iterator = queue.iterator(); + while (iterator.hasNext()) { + iterator.next(); + iterator.remove(); + } + // Verify: the result of "messagesCount()" is 0 after removed all items. + assertEquals(queue.delegate.size(), 0); + assertEquals(queue.messagesCount(), 0); + } }
