This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1ca58222424d222a6200845e190db415b84b38f4 Author: Rajan Dhabalia <[email protected]> AuthorDate: Thu Mar 5 18:55:40 2020 -0800 [pulsar-client] fix deadlock on send failure (#6488) (cherry picked from commit ad5415ab90fac123d00ed1ec55b696914645edb1) --- .../client/api/SimpleProducerConsumerTest.java | 33 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 20 +++++++------ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index e907197..612b610 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -77,6 +77,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageCrypto; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -3283,4 +3284,36 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { consumer.close(); producer.close(); } + + /** + * It verifies that message failure successfully releases semaphore and client successfully receives + * InvalidMessageException. + * + * @throws Exception + */ + @Test + public void testReleaseSemaphoreOnFailMessages() throws Exception { + log.info("-- Starting {} test --", methodName); + + int maxPendingMessages = 10; + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().enableBatching(false) + .blockIfQueueFull(true).maxPendingMessages(maxPendingMessages) + .topic("persistent://my-property/my-ns/my-topic2"); + + Producer<byte[]> producer = producerBuilder.create(); + List<Future<MessageId>> futures = Lists.newArrayList(); + + // Asynchronously produce messages + byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1]; + for (int i = 0; i < maxPendingMessages + 10; i++) { + Future<MessageId> future = producer.sendAsync(message); + try { + future.get(); + fail("should fail with InvalidMessageException"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof PulsarClientException.InvalidMessageException); + } + } + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index eb05909..7e5cb3b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -351,7 +351,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds %d bytes", producerName, topic, compressedStr, compressedSize, ClientCnx.getMaxMessageSize())); - callback.sendComplete(invalidMessageException); + completeCallbackAndReleaseSemaphore(callback, invalidMessageException); return; } } @@ -360,7 +360,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s can not reuse the same message", producerName, topic)); - callback.sendComplete(invalidMessageException); + completeCallbackAndReleaseSemaphore(callback, invalidMessageException); compressedPayload.release(); return; } @@ -455,11 +455,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } } } catch (PulsarClientException e) { - semaphore.release(); - callback.sendComplete(e); + completeCallbackAndReleaseSemaphore(callback, e); } catch (Throwable t) { - semaphore.release(); - callback.sendComplete(new PulsarClientException(t)); + completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t)); } } @@ -471,8 +469,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return true; } if (!isMultiSchemaEnabled(true)) { - callback.sendComplete(new PulsarClientException.InvalidMessageException( - format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic))); + PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException( + format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic)); + completeCallbackAndReleaseSemaphore(callback, e); return false; } SchemaHash schemaHash = SchemaHash.of(msg.getSchema()); @@ -872,6 +871,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne semaphore.release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); } + private void completeCallbackAndReleaseSemaphore(SendCallback callback, Exception exception) { + semaphore.release(); + callback.sendComplete(exception); + } + /** * Checks message checksum to retry if message was corrupted while sending to broker. Recomputes checksum of the * message header-payload again.
