This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c6bbfc6d59c83ee78688b4b7dd177277aa8e4c72 Author: hangc0276 <[email protected]> AuthorDate: Mon May 11 08:41:11 2020 +0800 fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize (#6862) Fix #6854 ### Bug description The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking: ``` protected boolean hasEnoughMessagesForBatchReceive() { if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) { return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()); } ``` ### Changes When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive` * fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize * throw log warn and add test case (cherry picked from commit 561868d4c441d654ab795a9386d8e9a5e28f03dd) --- .../client/api/ConsumerBatchReceiveTest.java | 124 ++++++++++++++++----- .../pulsar/client/api/BatchReceivePolicy.java | 2 +- .../apache/pulsar/client/impl/ConsumerBase.java | 29 ++++- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- 4 files changed, 128 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index 19ec983..54a6e52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -55,96 +55,166 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { return new Object[][] { // Default batch receive policy. - { BatchReceivePolicy.DEFAULT_POLICY, true }, + { BatchReceivePolicy.DEFAULT_POLICY, true, 1000}, // Only receive timeout limitation. { BatchReceivePolicy.builder() .timeout(50, TimeUnit.MILLISECONDS) - .build(), true + .build(), true, 1000 }, // Only number of messages in a single batch receive limitation. { BatchReceivePolicy.builder() .maxNumMessages(10) - .build(), true + .build(), true, 1000 }, // Number of messages and timeout limitation { BatchReceivePolicy.builder() .maxNumMessages(13) .timeout(50, TimeUnit.MILLISECONDS) - .build(), true + .build(), true, 1000 }, // Size of messages and timeout limitation { BatchReceivePolicy.builder() .maxNumBytes(64) .timeout(50, TimeUnit.MILLISECONDS) - .build(), true + .build(), true, 1000 }, // Default batch receive policy. - { BatchReceivePolicy.DEFAULT_POLICY, false }, + { BatchReceivePolicy.DEFAULT_POLICY, false, 1000 }, // Only receive timeout limitation. { BatchReceivePolicy.builder() .timeout(50, TimeUnit.MILLISECONDS) - .build(), false + .build(), false, 1000 }, // Only number of messages in a single batch receive limitation. { BatchReceivePolicy.builder() .maxNumMessages(10) - .build(), false + .build(), false, 1000 }, // Number of messages and timeout limitation { BatchReceivePolicy.builder() .maxNumMessages(13) .timeout(50, TimeUnit.MILLISECONDS) - .build(), false + .build(), false, 1000 }, // Size of messages and timeout limitation { BatchReceivePolicy.builder() .maxNumBytes(64) .timeout(50, TimeUnit.MILLISECONDS) - .build(), false + .build(), false, 1000 + }, + // Number of message limitation exceed receiverQueue size + { + BatchReceivePolicy.builder() + .maxNumMessages(70) + .build(), true, 50 + }, + // Number of message limitation exceed receiverQueue size and timeout limitation + { + BatchReceivePolicy.builder() + .maxNumMessages(50) + .timeout(10, TimeUnit.MILLISECONDS) + .build(), true, 30 + }, + // Number of message limitation is negative and timeout limitation + { + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .timeout(10, TimeUnit.MILLISECONDS) + .build(), true, 10 + }, + // Size of message limitation is negative and timeout limitation + { + BatchReceivePolicy.builder() + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), true, 30 + }, + // Number of message limitation and size of message limitation are both negative and timeout limitation + { + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), true, 30 + }, + // Number of message limitation exceed receiverQueue size + { + BatchReceivePolicy.builder() + .maxNumMessages(70) + .build(), false, 50 + }, + // Number of message limitation exceed receiverQueue size and timeout limitation + { + BatchReceivePolicy.builder() + .maxNumMessages(50) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 + }, + // Number of message limitation is negative and timeout limitation + { + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 + }, + // Size of message limitation is negative and timeout limitation + { + BatchReceivePolicy.builder() + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 + }, + // Number of message limitation and size of message limitation are both negative and timeout limitation + { + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 } }; } @Test(dataProvider = "batchReceivePolicy") - public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { + public void testBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { final String topic = "persistent://my-property/my-ns/batch-receive-non-partition-" + UUID.randomUUID(); - testBatchReceive(topic, batchReceivePolicy, batchProduce); + testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize); } @Test(dataProvider = "batchReceivePolicy") - public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { + public void testBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); - testBatchReceive(topic, batchReceivePolicy, batchProduce); + testBatchReceive(topic, batchReceivePolicy, batchProduce, receiverQueueSize); } @Test(dataProvider = "batchReceivePolicy") - public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { + public void testAsyncBatchReceiveNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { final String topic = "persistent://my-property/my-ns/batch-receive-non-partition-async-" + UUID.randomUUID(); - testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce); + testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize); } @Test(dataProvider = "batchReceivePolicy") - public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { + public void testAsyncBatchReceivePartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { final String topic = "persistent://my-property/my-ns/batch-receive-async-" + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); - testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce); + testBatchReceiveAsync(topic, batchReceivePolicy, batchProduce, receiverQueueSize); } @Test(dataProvider = "batchReceivePolicy") - public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { + public void testBatchReceiveAndRedeliveryNonPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { final String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-non-partition-" + UUID.randomUUID(); - testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce); + testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize); } @Test(dataProvider = "batchReceivePolicy") - public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { + public void testBatchReceiveAndRedeliveryPartitionedTopic(BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { final String topic = "persistent://my-property/my-ns/batch-receive-and-redelivery-" + UUID.randomUUID(); admin.topics().createPartitionedTopic(topic, 3); - testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce); + testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize); } - private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { + private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic); if (!batchProduce) { producerBuilder.enableBatching(false); @@ -155,14 +225,14 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("s1") + .receiverQueueSize(receiverQueueSize) .batchReceivePolicy(batchReceivePolicy) .subscribe(); sendMessagesAsyncAndWait(producer, 100); batchReceiveAndCheck(consumer, 100); } - private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { - + private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { if (batchReceivePolicy.getTimeoutMs() <= 0) { return; } @@ -178,6 +248,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("s1") + .receiverQueueSize(receiverQueueSize) .batchReceivePolicy(batchReceivePolicy) .subscribe(); @@ -187,7 +258,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { latch.await(); } - private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce) throws Exception { + private void testBatchReceiveAndRedelivery(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic); if (!batchProduce) { producerBuilder.enableBatching(false); @@ -198,6 +269,7 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase { Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("s1") + .receiverQueueSize(receiverQueueSize) .batchReceivePolicy(batchReceivePolicy) .ackTimeout(1, TimeUnit.SECONDS) .subscribe(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java index f331b1b..3bce4c8 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java @@ -97,7 +97,7 @@ public class BatchReceivePolicy implements Serializable { return maxNumMessages; } - public long getMaxNumBytes() { + public int getMaxNumBytes() { return maxNumBytes; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 59d4041..e2b3329 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -50,6 +50,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> { @@ -95,10 +97,33 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T this.schema = schema; this.interceptors = interceptors; if (conf.getBatchReceivePolicy() != null) { - this.batchReceivePolicy = conf.getBatchReceivePolicy(); + BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy(); + if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) { + this.batchReceivePolicy = BatchReceivePolicy.builder() + .maxNumMessages(this.maxReceiverQueueSize) + .maxNumBytes(userBatchReceivePolicy.getMaxNumBytes()) + .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS) + .build(); + log.warn("BatchReceivePolicy maxNumMessages: {} is greater than maxReceiverQueueSize: {}, " + + "reset to maxReceiverQueueSize. batchReceivePolicy: {}", + userBatchReceivePolicy.getMaxNumMessages(), this.maxReceiverQueueSize, + this.batchReceivePolicy.toString()); + } else if (userBatchReceivePolicy.getMaxNumMessages() <= 0 && userBatchReceivePolicy.getMaxNumBytes() <= 0) { + this.batchReceivePolicy = BatchReceivePolicy.builder() + .maxNumMessages(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumMessages()) + .maxNumBytes(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumBytes()) + .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS) + .build(); + log.warn("BatchReceivePolicy maxNumMessages: {} or maxNumBytes: {} is less than 0. " + + "Reset to DEFAULT_POLICY. batchReceivePolicy: {}", userBatchReceivePolicy.getMaxNumMessages(), + userBatchReceivePolicy.getMaxNumBytes(), this.batchReceivePolicy.toString()); + } else { + this.batchReceivePolicy = conf.getBatchReceivePolicy(); + } } else { this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY; } + if (batchReceivePolicy.getTimeoutMs() > 0) { batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS); } @@ -594,4 +619,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } protected abstract void completeOpBatchReceive(OpBatchReceive<T> op); + + private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 970e134..4972b99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -626,6 +626,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { @Override protected void completeOpBatchReceive(OpBatchReceive<T> op) { notifyPendingBatchReceivedCallBack(op); + resumeReceivingFromPausedConsumersIfNeeded(); } @Override @@ -642,7 +643,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { try { seekAsync(timestamp).get(); } catch (Exception e) { - throw PulsarClientException.unwrap(e); } }
