gaozhangmin opened a new issue #14212:
URL: https://github.com/apache/pulsar/issues/14212
Reproduce test:
```
@Test
public void testProducerExceptionAndThenUnblockSizeQuota() throws
Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
Maps.newHashMap());
admin.namespaces().setBacklogQuota("prop/quotahold",
BacklogQuota.builder()
.limitSize(10 * 1024)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build());
@Cleanup
final PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
final String topic1 = "persistent://prop/quotahold/exceptandunblock";
final String subName1 = "c1except";
boolean gotException = false;
Consumer<byte[]> consumer =
client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
byte[] content = new byte[1024];
Producer<byte[]> producer = createProducer(client, topic1);
for (int i = 0; i < 10; i++) {
producer.send(content);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
try {
// try to send over backlog quota and make sure it fails
producer.send(content);
producer.send(content);
fail("backlog quota did not exceed");
} catch (PulsarClientException ce) {
assertTrue(ce instanceof
PulsarClientException.ProducerBlockedQuotaExceededException
|| ce instanceof PulsarClientException.TimeoutException,
ce.getMessage());
gotException = true;
}
assertTrue(gotException, "backlog exceeded exception did not occur");
// now remove backlog and ensure that producer is unblocked;
TopicStats stats = admin.topics().getStats(topic1);
int backlog = (int)
stats.getSubscriptions().get(subName1).getMsgBacklog();
for (int i = 0; i < backlog; i++) {
Message<?> msg = consumer.receive();
consumer.acknowledge(msg);
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
producer.flush();
}
```
In my opinion, flush shouldn't fail because we had consumed all the backlog.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]