This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1ca58222424d222a6200845e190db415b84b38f4
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Mar 5 18:55:40 2020 -0800

    [pulsar-client] fix deadlock on send failure (#6488)
    
    (cherry picked from commit ad5415ab90fac123d00ed1ec55b696914645edb1)
---
 .../client/api/SimpleProducerConsumerTest.java     | 33 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 20 +++++++------
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index e907197..612b610 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -77,6 +77,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageCrypto;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -3283,4 +3284,36 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.close();
         producer.close();
     }
+
+    /**
+     * It verifies that message failure successfully releases semaphore and 
client successfully receives
+     * InvalidMessageException.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testReleaseSemaphoreOnFailMessages() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int maxPendingMessages = 10;
+        ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().enableBatching(false)
+                .blockIfQueueFull(true).maxPendingMessages(maxPendingMessages)
+                .topic("persistent://my-property/my-ns/my-topic2");
+
+        Producer<byte[]> producer = producerBuilder.create();
+        List<Future<MessageId>> futures = Lists.newArrayList();
+
+        // Asynchronously produce messages
+        byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
+        for (int i = 0; i < maxPendingMessages + 10; i++) {
+            Future<MessageId> future = producer.sendAsync(message);
+            try {
+                future.get();
+                fail("should fail with InvalidMessageException");
+            } catch (Exception e) {
+                assertTrue(e.getCause() instanceof 
PulsarClientException.InvalidMessageException);
+            }
+        }
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index eb05909..7e5cb3b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -351,7 +351,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 PulsarClientException.InvalidMessageException 
invalidMessageException = new PulsarClientException.InvalidMessageException(
                     format("The producer %s of the topic %s sends a %s message 
with %d bytes that exceeds %d bytes",
                         producerName, topic, compressedStr, compressedSize, 
ClientCnx.getMaxMessageSize()));
-                callback.sendComplete(invalidMessageException);
+                completeCallbackAndReleaseSemaphore(callback, 
invalidMessageException);
                 return;
             }
         }
@@ -360,7 +360,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             PulsarClientException.InvalidMessageException 
invalidMessageException =
                 new PulsarClientException.InvalidMessageException(
                     format("The producer %s of the topic %s can not reuse the 
same message", producerName, topic));
-            callback.sendComplete(invalidMessageException);
+            completeCallbackAndReleaseSemaphore(callback, 
invalidMessageException);
             compressedPayload.release();
             return;
         }
@@ -455,11 +455,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 }
             }
         } catch (PulsarClientException e) {
-            semaphore.release();
-            callback.sendComplete(e);
+            completeCallbackAndReleaseSemaphore(callback, e);
         } catch (Throwable t) {
-            semaphore.release();
-            callback.sendComplete(new PulsarClientException(t));
+            completeCallbackAndReleaseSemaphore(callback, new 
PulsarClientException(t));
         }
     }
 
@@ -471,8 +469,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return true;
         }
         if (!isMultiSchemaEnabled(true)) {
-            callback.sendComplete(new 
PulsarClientException.InvalidMessageException(
-                format("The producer %s of the topic %s is disabled the 
`MultiSchema`", producerName, topic)));
+            PulsarClientException.InvalidMessageException e = new 
PulsarClientException.InvalidMessageException(
+                    format("The producer %s of the topic %s is disabled the 
`MultiSchema`", producerName, topic));
+            completeCallbackAndReleaseSemaphore(callback, e);
             return false;
         }
         SchemaHash schemaHash = SchemaHash.of(msg.getSchema());
@@ -872,6 +871,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         semaphore.release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 
1);
     }
 
+    private void completeCallbackAndReleaseSemaphore(SendCallback callback, 
Exception exception) {
+        semaphore.release();
+        callback.sendComplete(exception);
+    }
+
     /**
      * Checks message checksum to retry if message was corrupted while sending 
to broker. Recomputes checksum of the
      * message header-payload again.

Reply via email to