This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 77877373a7a5801a48a1bcc169759430ad91f50c Author: tison <[email protected]> AuthorDate: Mon Aug 22 18:32:34 2022 +0800 [improve][test] force initialize field to avoid polluted by mocks (#17022) Master Issue: #16912 - [x] `doc-not-needed` cc @Shoothzj @eolivelli @nicoloboschi @Technoboy- Signed-off-by: tison <[email protected]> --- .../client/impl/BatchMessageContainerImpl.java | 18 ++++- .../client/impl/BatchMessageContainerImplTest.java | 84 +++++++++------------- .../PulsarByteBufAllocatorDefaultTest.java | 37 ++++------ ...ulsarByteBufAllocatorOomThrowExceptionTest.java | 35 ++++----- 4 files changed, 75 insertions(+), 99 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 7d95f0963ba..44fad489dac 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.util.ArrayList; @@ -61,7 +63,19 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { // keep track of callbacks for individual messages being published in a batch protected SendCallback firstCallback; + private final ByteBufAllocator allocator; + public BatchMessageContainerImpl() { + this(PulsarByteBufAllocator.DEFAULT); + } + + /** + * This constructor is for testing only. The global allocator is always + * {@link PulsarByteBufAllocator#DEFAULT}. + */ + @VisibleForTesting + BatchMessageContainerImpl(ByteBufAllocator allocator) { + this.allocator = allocator; } public BatchMessageContainerImpl(ProducerImpl<?> producer) { @@ -84,8 +98,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { messageMetadata.setSequenceId(msg.getSequenceId()); lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); this.firstCallback = callback; - batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize())); + batchedMessageMetadataAndPayload = allocator.buffer( + Math.min(maxBatchSize, ClientCnx.getMaxMessageSize())); if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) { currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits(); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java index 8fc018b3199..a4f0205c2cf 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java @@ -19,69 +19,51 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; +import static org.testng.Assert.assertTrue; +import io.netty.buffer.ByteBufAllocator; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.mockito.MockedConstruction; -import org.mockito.Mockito; -import org.powermock.reflect.Whitebox; import org.testng.annotations.Test; public class BatchMessageContainerImplTest { @Test - public void recoveryAfterOom() throws Exception { - AtomicBoolean called = new AtomicBoolean(); - try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class, - (mockAllocator, context) -> { - called.set(true); - doThrow(new OutOfMemoryError("test")).when(mockAllocator).buffer(anyInt(), anyInt()); - })) { - if (PulsarByteBufAllocator.DEFAULT != null && !called.get()) { - replaceByteBufAllocator(); - } - final ProducerImpl producer = Mockito.mock(ProducerImpl.class); - final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); - producerConfigurationData.setCompressionType(CompressionType.NONE); - when(producer.getConfiguration()).thenReturn(producerConfigurationData); - final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(); - batchMessageContainer.setProducer(producer); - MessageMetadata messageMetadata1 = new MessageMetadata(); - messageMetadata1.setSequenceId(1L); - messageMetadata1.setProducerName("producer1"); - messageMetadata1.setPublishTime(System.currentTimeMillis()); - ByteBuffer payload1 = ByteBuffer.wrap("payload1".getBytes(StandardCharsets.UTF_8)); - final MessageImpl<byte[]> message1 = MessageImpl.create(messageMetadata1, payload1, Schema.BYTES, null); - batchMessageContainer.add(message1, null); - MessageMetadata messageMetadata2 = new MessageMetadata(); - messageMetadata2.setSequenceId(1L); - messageMetadata2.setProducerName("producer1"); - messageMetadata2.setPublishTime(System.currentTimeMillis()); - ByteBuffer payload2 = ByteBuffer.wrap("payload2".getBytes(StandardCharsets.UTF_8)); - final MessageImpl<byte[]> message2 = MessageImpl.create(messageMetadata2, payload2, Schema.BYTES, null); - // after oom, our add can self-healing, won't throw exception - batchMessageContainer.add(message2, null); - } finally { - replaceByteBufAllocator(); - } - - } - - private void replaceByteBufAllocator() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { - Method createByteBufAllocatorMethod = PulsarByteBufAllocator.class.getDeclaredMethod("createByteBufAllocator"); - createByteBufAllocatorMethod.setAccessible(true); - Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT", - createByteBufAllocatorMethod.invoke(null)); + public void recoveryAfterOom() { + final AtomicBoolean called = new AtomicBoolean(); + final ProducerImpl<?> producer = mock(ProducerImpl.class); + final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); + producerConfigurationData.setCompressionType(CompressionType.NONE); + when(producer.getConfiguration()).thenReturn(producerConfigurationData); + final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class); + doAnswer((ignore) -> { + called.set(true); + throw new OutOfMemoryError("test"); + }).when(mockAllocator).buffer(anyInt()); + final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator); + batchMessageContainer.setProducer(producer); + MessageMetadata messageMetadata1 = new MessageMetadata(); + messageMetadata1.setSequenceId(1L); + messageMetadata1.setProducerName("producer1"); + messageMetadata1.setPublishTime(System.currentTimeMillis()); + ByteBuffer payload1 = ByteBuffer.wrap("payload1".getBytes(StandardCharsets.UTF_8)); + final MessageImpl<byte[]> message1 = MessageImpl.create(messageMetadata1, payload1, Schema.BYTES, null); + batchMessageContainer.add(message1, null); + assertTrue(called.get()); + MessageMetadata messageMetadata2 = new MessageMetadata(); + messageMetadata2.setSequenceId(1L); + messageMetadata2.setProducerName("producer1"); + messageMetadata2.setPublishTime(System.currentTimeMillis()); + ByteBuffer payload2 = ByteBuffer.wrap("payload2".getBytes(StandardCharsets.UTF_8)); + final MessageImpl<byte[]> message2 = MessageImpl.create(messageMetadata2, payload2, Schema.BYTES, null); + // after oom, our add can self-healing, won't throw exception + batchMessageContainer.add(message2, null); } - } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java index fb81835c7c4..9c548ff25e4 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorDefaultTest.java @@ -18,30 +18,30 @@ */ package org.apache.pulsar.common.allocator; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBufAllocator; -import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy; import org.apache.bookkeeper.common.allocator.PoolingPolicy; import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl; import org.mockito.MockedConstruction; import org.mockito.Mockito; -import org.powermock.reflect.Whitebox; import org.testng.annotations.Test; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -@Slf4j public class PulsarByteBufAllocatorDefaultTest { @Test - public void testDefaultConfig() throws Exception { + public void testDefaultConfig() { + // Force initialize PulsarByteBufAllocator.DEFAULT before mock the ctor so that it is not polluted. + assertNotNull(PulsarByteBufAllocator.DEFAULT); + AtomicBoolean called = new AtomicBoolean(); - try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class, + try (MockedConstruction<ByteBufAllocatorImpl> ignored = Mockito.mockConstruction(ByteBufAllocatorImpl.class, (mock, context) -> { called.set(true); final List<?> arguments = context.arguments(); @@ -49,21 +49,10 @@ public class PulsarByteBufAllocatorDefaultTest { assertEquals(arguments.get(2), PoolingPolicy.PooledDirect); assertEquals(arguments.get(4), OutOfMemoryPolicy.FallbackToHeap); assertEquals(arguments.get(6), LeakDetectionPolicy.Advanced); - })) { - final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT; - // use the variable, in case the compiler optimization - log.trace("{}", byteBufAllocator); - - if (!called.get()) { - // maybe PulsarByteBufAllocator static initialization has already been called by a previous test - // let's rerun the same method - PulsarByteBufAllocator.createByteBufAllocator(); - } + assertFalse(called.get()); + PulsarByteBufAllocator.createByteBufAllocator(); assertTrue(called.get()); - } finally { - Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT", - PulsarByteBufAllocator.createByteBufAllocator()); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java index 20bbeb06590..e1a3176e5bd 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocatorOomThrowExceptionTest.java @@ -18,31 +18,31 @@ */ package org.apache.pulsar.common.allocator; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBufAllocator; -import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy; import org.apache.bookkeeper.common.allocator.PoolingPolicy; import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl; import org.mockito.MockedConstruction; import org.mockito.Mockito; -import org.powermock.reflect.Whitebox; import org.testng.annotations.Test; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -@Slf4j public class PulsarByteBufAllocatorOomThrowExceptionTest { @Test - public void testDefaultConfig() throws Exception { + public void testDefaultConfig() { + // Force initialize PulsarByteBufAllocator.DEFAULT before mock the ctor so that it is not polluted. + assertNotNull(PulsarByteBufAllocator.DEFAULT); + AtomicBoolean called = new AtomicBoolean(); System.setProperty("pulsar.allocator.out_of_memory_policy", "ThrowException"); - try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class, + try (MockedConstruction<ByteBufAllocatorImpl> ignored = Mockito.mockConstruction(ByteBufAllocatorImpl.class, (mock, context) -> { called.set(true); final List<?> arguments = context.arguments(); @@ -50,21 +50,12 @@ public class PulsarByteBufAllocatorOomThrowExceptionTest { assertEquals(arguments.get(2), PoolingPolicy.PooledDirect); assertEquals(arguments.get(4), OutOfMemoryPolicy.ThrowException); assertEquals(arguments.get(6), LeakDetectionPolicy.Advanced); - })) { - final ByteBufAllocatorImpl byteBufAllocator = (ByteBufAllocatorImpl) PulsarByteBufAllocator.DEFAULT; - // use the variable, in case the compiler optimization - log.trace("{}", byteBufAllocator); - if (!called.get()) { - // maybe PulsarByteBufAllocator static initialization has already been called by a previous test - // let's rerun the same method - PulsarByteBufAllocator.createByteBufAllocator(); - } + assertFalse(called.get()); + PulsarByteBufAllocator.createByteBufAllocator(); assertTrue(called.get()); } finally { System.clearProperty("pulsar.allocator.out_of_memory_policy"); - Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT", - PulsarByteBufAllocator.createByteBufAllocator()); } }
