This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 38b93b2ec32 Revert "[fix][client]Fix client memory limit currentUsage 
leak and semaphore release duplicated in ProducerImpl (#16837)"
38b93b2ec32 is described below

commit 38b93b2ec32aa0361cb1009fd9068d616ac85ac1
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Aug 5 17:23:23 2022 +0800

    Revert "[fix][client]Fix client memory limit currentUsage leak and 
semaphore release duplicated in ProducerImpl (#16837)"
    
    Even after importing the mockito dependency, the
    testProducerBatchSendTimeoutMemoryRelease will still fail.
    
    This reverts commit 9610640e26b634df2061f9f6ff46c028ca75bd8e.
---
 .../client/impl/ProducerMemoryLimitTest.java       | 29 ------------------
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 35 ----------------------
 .../apache/pulsar/client/impl/ProducerImpl.java    |  5 ++--
 3 files changed, 3 insertions(+), 66 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index e49c4f3b70c..264ec306413 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -69,35 +69,6 @@ public class ProducerMemoryLimitTest extends 
ProducerConsumerBase {
 
     }
 
-    @Test(timeOut = 10_000)
-    public void testProducerBatchSendTimeoutMemoryRelease() throws Exception {
-        initClientWithMemoryLimit();
-        @Cleanup
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
-                .topic("testProducerMemoryLimit")
-                .sendTimeout(5, TimeUnit.SECONDS)
-                .maxPendingMessages(0)
-                .enableBatching(true)
-                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
-                .batchingMaxBytes(12)
-                .create();
-        this.stopBroker();
-        try {
-            
producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
-            try {
-                
producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
-            } catch (Exception e) {
-                throw PulsarClientException.unwrap(e);
-            }
-
-            throw new IllegalStateException("can not reach here");
-        } catch (PulsarClientException.TimeoutException ex) {
-            PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
-            final MemoryLimitController memoryLimitController = 
clientImpl.getMemoryLimitController();
-            Assert.assertEquals(memoryLimitController.currentUsage(), 0);
-        }
-    }
-
     @Test(timeOut = 10_000)
     public void testProducerCloseMemoryRelease() throws Exception {
         initClientWithMemoryLimit();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 7ea63aa674f..d325ed67852 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -18,10 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.mockito.ArgumentMatchers.any;
-import java.lang.reflect.Field;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -210,35 +206,4 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize);
         Assert.assertFalse(producer.isErrorStat());
     }
-
-    @Test(timeOut = 10_000)
-    public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws 
Exception {
-        final int pendingQueueSize = 10;
-        @Cleanup
-        ProducerImpl<byte[]> producer =
-                (ProducerImpl<byte[]>) pulsarClient.newProducer()
-                        .topic("testProducerSemaphoreRelease")
-                        .sendTimeout(5, TimeUnit.SECONDS)
-                        .maxPendingMessages(pendingQueueSize)
-                        .enableBatching(true)
-                        .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
-                        .batchingMaxBytes(12)
-                        .create();
-        this.stopBroker();
-        try {
-            ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
-            Mockito.doThrow(new PulsarClientException.CryptoException("crypto 
error")).when(spyProducer)
-                    .encryptMessage(any(),any());
-
-            Field batchMessageContainerField = 
ProducerImpl.class.getDeclaredField("batchMessageContainer");
-            batchMessageContainerField.setAccessible(true);
-            BatchMessageContainerImpl batchMessageContainer = 
(BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
-            batchMessageContainer.setProducer(spyProducer);
-            
spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
-
-            throw new IllegalStateException("can not reach here");
-        } catch (PulsarClientException.TimeoutException ex) {
-            
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
-        }
-    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index bf8fb97cb21..9dd2e01c375 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1787,10 +1787,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return;
         }
         final int numMessagesInBatch = 
batchMessageContainer.getNumMessagesInBatch();
-        final long currentBatchSize = 
batchMessageContainer.getCurrentBatchSize();
         batchMessageContainer.discard(ex);
         semaphoreRelease(numMessagesInBatch);
-        client.getMemoryLimitController().releaseMemory(currentBatchSize);
     }
 
     @Override
@@ -1832,7 +1830,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 for (OpSendMsg opSendMsg : opSendMsgs) {
                     processOpSendMsg(opSendMsg);
                 }
+            } catch (PulsarClientException e) {
+                
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
             } catch (Throwable t) {
+                
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
                 log.warn("[{}] [{}] error while create opSendMsg by batch 
message container", topic, producerName, t);
             }
         }

Reply via email to