This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 38b93b2ec32 Revert "[fix][client]Fix client memory limit currentUsage
leak and semaphore release duplicated in ProducerImpl (#16837)"
38b93b2ec32 is described below
commit 38b93b2ec32aa0361cb1009fd9068d616ac85ac1
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Aug 5 17:23:23 2022 +0800
Revert "[fix][client]Fix client memory limit currentUsage leak and
semaphore release duplicated in ProducerImpl (#16837)"
Even after importing the mockito dependency, the
testProducerBatchSendTimeoutMemoryRelease will still fail.
This reverts commit 9610640e26b634df2061f9f6ff46c028ca75bd8e.
---
.../client/impl/ProducerMemoryLimitTest.java | 29 ------------------
.../pulsar/client/impl/ProducerSemaphoreTest.java | 35 ----------------------
.../apache/pulsar/client/impl/ProducerImpl.java | 5 ++--
3 files changed, 3 insertions(+), 66 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index e49c4f3b70c..264ec306413 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -69,35 +69,6 @@ public class ProducerMemoryLimitTest extends
ProducerConsumerBase {
}
- @Test(timeOut = 10_000)
- public void testProducerBatchSendTimeoutMemoryRelease() throws Exception {
- initClientWithMemoryLimit();
- @Cleanup
- ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
- .topic("testProducerMemoryLimit")
- .sendTimeout(5, TimeUnit.SECONDS)
- .maxPendingMessages(0)
- .enableBatching(true)
- .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
- .batchingMaxBytes(12)
- .create();
- this.stopBroker();
- try {
-
producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
- try {
-
producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
- } catch (Exception e) {
- throw PulsarClientException.unwrap(e);
- }
-
- throw new IllegalStateException("can not reach here");
- } catch (PulsarClientException.TimeoutException ex) {
- PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
- final MemoryLimitController memoryLimitController =
clientImpl.getMemoryLimitController();
- Assert.assertEquals(memoryLimitController.currentUsage(), 0);
- }
- }
-
@Test(timeOut = 10_000)
public void testProducerCloseMemoryRelease() throws Exception {
initClientWithMemoryLimit();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 7ea63aa674f..d325ed67852 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -18,10 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import static org.mockito.ArgumentMatchers.any;
-import java.lang.reflect.Field;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -210,35 +206,4 @@ public class ProducerSemaphoreTest extends
ProducerConsumerBase {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(),
pendingQueueSize);
Assert.assertFalse(producer.isErrorStat());
}
-
- @Test(timeOut = 10_000)
- public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws
Exception {
- final int pendingQueueSize = 10;
- @Cleanup
- ProducerImpl<byte[]> producer =
- (ProducerImpl<byte[]>) pulsarClient.newProducer()
- .topic("testProducerSemaphoreRelease")
- .sendTimeout(5, TimeUnit.SECONDS)
- .maxPendingMessages(pendingQueueSize)
- .enableBatching(true)
- .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
- .batchingMaxBytes(12)
- .create();
- this.stopBroker();
- try {
- ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
- Mockito.doThrow(new PulsarClientException.CryptoException("crypto
error")).when(spyProducer)
- .encryptMessage(any(),any());
-
- Field batchMessageContainerField =
ProducerImpl.class.getDeclaredField("batchMessageContainer");
- batchMessageContainerField.setAccessible(true);
- BatchMessageContainerImpl batchMessageContainer =
(BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
- batchMessageContainer.setProducer(spyProducer);
-
spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
-
- throw new IllegalStateException("can not reach here");
- } catch (PulsarClientException.TimeoutException ex) {
-
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
- }
- }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index bf8fb97cb21..9dd2e01c375 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1787,10 +1787,8 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return;
}
final int numMessagesInBatch =
batchMessageContainer.getNumMessagesInBatch();
- final long currentBatchSize =
batchMessageContainer.getCurrentBatchSize();
batchMessageContainer.discard(ex);
semaphoreRelease(numMessagesInBatch);
- client.getMemoryLimitController().releaseMemory(currentBatchSize);
}
@Override
@@ -1832,7 +1830,10 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
for (OpSendMsg opSendMsg : opSendMsgs) {
processOpSendMsg(opSendMsg);
}
+ } catch (PulsarClientException e) {
+
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
} catch (Throwable t) {
+
semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
log.warn("[{}] [{}] error while create opSendMsg by batch
message container", topic, producerName, t);
}
}