This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new d9da76c506a [fix][test] Fix multiple resource leaks in tests (#24218) d9da76c506a is described below commit d9da76c506a8b2826df85e1b4bbd3a0f658cc758 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Fri Apr 25 21:58:09 2025 +0300 [fix][test] Fix multiple resource leaks in tests (#24218) (cherry picked from commit b6afb168c834fd76c6c4921d326858984fff044c) --- .../BookieRackAffinityMappingTest.java | 3 +- .../delayed/AbstractDeliveryTrackerTest.java | 19 +++++++++---- .../delayed/DelayedDeliveryTrackerFactoryTest.java | 33 +++++++++++++++------- .../delayed/InMemoryDeliveryTrackerTest.java | 4 +-- .../bucket/BucketDelayedDeliveryTrackerTest.java | 2 ++ .../pulsar/broker/service/BrokerServiceTest.java | 1 + .../service/TopicPublishRateThrottleTest.java | 5 +++- .../pulsar/broker/stats/PrometheusMetricsTest.java | 1 + .../prometheus/NamespaceStatsAggregatorTest.java | 2 ++ .../pulsar/broker/zookeeper/ZKReconnectTest.java | 4 ++- .../client/impl/ProduceWithMessageIdTest.java | 2 +- .../org/apache/pulsar/client/impl/ReaderTest.java | 1 + .../pulsar/client/impl/PulsarClientImplTest.java | 2 +- .../pulsar/client/impl/TopicListWatcherTest.java | 2 ++ .../client/impl/UnAckedMessageTrackerTest.java | 8 ++---- .../jcloud/BlobStoreBackedInputStreamTest.java | 11 ++++++-- .../impl/BlobStoreBackedInputStreamTest.java | 3 +- .../impl/BlockAwareSegmentInputStreamTest.java | 5 ++++ .../offload/jcloud/impl/OffloadIndexV2Test.java | 6 ++-- 19 files changed, 79 insertions(+), 35 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index 9cd81604442..96e4d0ed264 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -257,6 +257,7 @@ public class BookieRackAffinityMappingTest { .stream().filter(Objects::nonNull).toList(); assertEquals(racks.size(), 0); + @Cleanup("stop") HashedWheelTimer timer = new HashedWheelTimer( new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(), bkClientConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, @@ -348,8 +349,6 @@ public class BookieRackAffinityMappingTest { assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0"); assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/default-rack"); assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack"); - - timer.stop(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java index 1d166a8db5c..7eb64531b6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java @@ -41,22 +41,29 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public abstract class AbstractDeliveryTrackerTest { - // Create a single shared timer for the test. - protected final Timer timer = - new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), - 500, TimeUnit.MILLISECONDS); + protected Timer timer; protected PersistentDispatcherMultipleConsumers dispatcher; protected Clock clock; protected AtomicLong clockTime; + @BeforeClass(alwaysRun = true) + public void createTimer() { + timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), + 500, TimeUnit.MILLISECONDS); + } + @AfterClass(alwaysRun = true) - public void cleanup() { - timer.stop(); + public void stopTimer() { + if (timer != null) { + timer.stop(); + timer = null; + } } @Test(dataProvider = "delayedTracker") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java index bb6ef9d3636..213f277e317 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java @@ -18,6 +18,14 @@ */ package org.apache.pulsar.broker.delayed; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException; @@ -28,8 +36,13 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; @@ -37,15 +50,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.lang.reflect.Field; -import java.time.Duration; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - public class DelayedDeliveryTrackerFactoryTest extends ProducerConsumerBase { @BeforeClass @Override @@ -92,6 +96,11 @@ public class DelayedDeliveryTrackerFactoryTest extends ProducerConsumerBase { // Mock dispatcher PersistentDispatcherMultipleConsumers dispatcher = Mockito.mock(PersistentDispatcherMultipleConsumers.class); Mockito.doReturn("test").when(dispatcher).getName(); + + @Cleanup + DelayedDeliveryTrackerFactory originalDelayedDeliveryTrackerFactory = + brokerService.getDelayedDeliveryTrackerFactory(); + // Mock BucketDelayedDeliveryTrackerFactory @Cleanup BucketDelayedDeliveryTrackerFactory factory = new BucketDelayedDeliveryTrackerFactory(); @@ -120,6 +129,10 @@ public class DelayedDeliveryTrackerFactoryTest extends ProducerConsumerBase { BrokerService brokerService = pair.getLeft(); PersistentDispatcherMultipleConsumers dispatcher = pair.getRight(); + @Cleanup + DelayedDeliveryTrackerFactory originalDelayedDeliveryTrackerFactory = + brokerService.getDelayedDeliveryTrackerFactory(); + // Mock InMemoryDelayedDeliveryTrackerFactory @Cleanup InMemoryDelayedDeliveryTrackerFactory factory = new InMemoryDelayedDeliveryTrackerFactory(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index 6711aed924c..22715437333 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -38,6 +38,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import lombok.Cleanup; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -209,6 +210,7 @@ public class InMemoryDeliveryTrackerTest extends AbstractDeliveryTrackerTest { @Test public void testClose() throws Exception { + @Cleanup("stop") Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), 1, TimeUnit.MILLISECONDS); @@ -245,7 +247,5 @@ public class InMemoryDeliveryTrackerTest extends AbstractDeliveryTrackerTest { tracker.close(); assertNull(exceptions[0]); - - timer.stop(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index eef10e49cba..b137a725f09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -423,6 +423,8 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes PositionImpl position = scheduledMessages.pollFirst(); assertEquals(position, PositionImpl.get(i, i)); } + + tracker.close(); } @Test(dataProvider = "delayedTracker") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index d0b1e74084c..20a2f284cf6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -2014,6 +2014,7 @@ public class BrokerServiceTest extends BrokerTestBase { pulsar.getConfiguration().setMetadataSyncEventTopic(topicName); PulsarMetadataEventSynchronizer sync = new PulsarMetadataEventSynchronizer(pulsar, topicName); // set invalid client for retry + @Cleanup PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://invalidhost:8080") .operationTimeout(1000, TimeUnit.MILLISECONDS).build(); sync.client = client; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java index e251e244a83..e580d478b1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java @@ -32,7 +32,6 @@ import org.testng.annotations.Test; @Test(groups = "broker") public class TopicPublishRateThrottleTest extends BrokerTestBase{ - @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { @@ -71,6 +70,8 @@ public class TopicPublishRateThrottleTest extends BrokerTestBase{ } catch (TimeoutException e) { // No-op } + // Close the PulsarClient gracefully to avoid ByteBuf leak + pulsarClient.close(); } @Test @@ -122,6 +123,8 @@ public class TopicPublishRateThrottleTest extends BrokerTestBase{ // No-op } Assert.assertNotNull(messageId); + // Close the PulsarClient gracefully to avoid ByteBuf leak + pulsarClient.close(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 4d0849a3406..cf590a2bb3f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1894,6 +1894,7 @@ public class PrometheusMetricsTest extends BrokerTestBase { Clock clock = mock(); when(clock.millis()).thenAnswer(invocation -> currentTimeMillis.get()); + @Cleanup PrometheusMetricsGenerator prometheusMetricsGenerator = new PrometheusMetricsGenerator(pulsar, true, false, false, false, clock); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java index cf923df0411..328a297f3b1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats.prometheus; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; import java.util.List; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; import org.apache.bookkeeper.mledger.util.StatsBuckets; @@ -106,6 +107,7 @@ public class NamespaceStatsAggregatorTest { PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); when(topic.getPersistentTopicMetrics()).thenReturn(persistentTopicMetrics); topicsMap.put("my-topic", topic); + @Cleanup("releaseAll") PrometheusMetricStreams metricStreams = Mockito.spy(new PrometheusMetricStreams()); // Populate subscriptions stats diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZKReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZKReconnectTest.java index 7b9e4beec6b..78aa01f84ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZKReconnectTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZKReconnectTest.java @@ -50,7 +50,9 @@ public class ZKReconnectTest extends MockedPulsarServiceBaseTest { @Test public void testGetPartitionMetadataFailAlsoCanProduceMessage() throws Exception { - + if (pulsarClient != null) { + pulsarClient.shutdown(); + } pulsarClient = PulsarClient.builder(). serviceUrl(pulsar.getBrokerServiceUrl()) .build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java index b8efdeb9969..ec7320f004a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java @@ -48,7 +48,7 @@ public class ProduceWithMessageIdTest { } @AfterClass(alwaysRun = true) - public void teardown() { + public void cleanup() throws Exception { if (mockBrokerService != null) { mockBrokerService.stop(); mockBrokerService = null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index a6a3f83ebc3..105947825c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -954,6 +954,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { admin.namespaces().setRetention(ns, retention); String badUrl = "pulsar://bad-host:8080"; + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build(); ReaderBuilder<byte[]> readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100, diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 103254a6b90..04048037c76 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -185,11 +185,11 @@ public class PulsarClientImplTest { ConnectionPool pool = Mockito.spy(new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop)); conf.setServiceUrl("pulsar://localhost:6650"); + @Cleanup("stop") HashedWheelTimer timer = new HashedWheelTimer(); PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool, timer); client.shutdown(); - client.timer().stop(); } @Test diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java index 1d1a17d50eb..95ac3c834b3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import io.netty.channel.ChannelHandlerContext; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; +import lombok.Cleanup; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -62,6 +63,7 @@ public class TopicListWatcherTest { when(client.getConfiguration()).thenReturn(new ClientConfigurationData()); clientCnxFuture = new CompletableFuture<>(); when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture); + @Cleanup("stop") Timer timer = new HashedWheelTimer(); when(client.timer()).thenReturn(timer); String topic = "persistent://tenant/ns/topic\\d+"; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java index b01fbcb879f..f8c64a6d099 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java @@ -32,7 +32,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.time.Duration; import java.util.HashSet; import java.util.concurrent.TimeUnit; - +import lombok.Cleanup; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -49,6 +49,7 @@ public class UnAckedMessageTrackerTest { ConnectionPool connectionPool = mock(ConnectionPool.class); when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); when(client.getCnxPool()).thenReturn(connectionPool); + @Cleanup("stop") Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); when(client.timer()).thenReturn(timer); @@ -80,8 +81,6 @@ public class UnAckedMessageTrackerTest { assertTrue(tracker.remove(mid)); assertTrue(tracker.isEmpty()); assertEquals(tracker.size(), 0); - - timer.stop(); } @Test @@ -90,6 +89,7 @@ public class UnAckedMessageTrackerTest { ConnectionPool connectionPool = mock(ConnectionPool.class); when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); when(client.getCnxPool()).thenReturn(connectionPool); + @Cleanup("stop") Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); when(client.timer()).thenReturn(timer); @@ -127,8 +127,6 @@ public class UnAckedMessageTrackerTest { // Assert that all chunk message ID are removed from unAckedChunkedMessageIdSequenceMap assertEquals(consumer.unAckedChunkedMessageIdSequenceMap.size(), 0); - - timer.stop(); } } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java index 3e5c4b609df..87a9804ec16 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java @@ -23,12 +23,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; - import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Random; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedInputStreamImpl; import org.jclouds.blobstore.BlobStore; @@ -43,7 +43,7 @@ import org.testng.annotations.Test; @Slf4j public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { - + class RandomInputStream extends InputStream { final Random r; int bytesRemaining; @@ -115,6 +115,7 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { String ret = blobStore.putBlob(BUCKET, blob); log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); + @Cleanup BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); @@ -137,6 +138,7 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { String ret = blobStore.putBlob(BUCKET, blob); log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); + @Cleanup BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); @@ -145,6 +147,7 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { @Test(expectedExceptions = KeyNotFoundException.class) public void testNotFoundOnRead() throws Exception { + @Cleanup BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist", (key, md) -> {}, 1234, 1000); @@ -176,6 +179,7 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { String ret = blobStore.putBlob(BUCKET, blob); log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); + @Cleanup BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); @@ -203,6 +207,7 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { //BlobStore spiedBlobStore = spy(blobStore); BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); + @Cleanup BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(spiedBlobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); @@ -250,6 +255,7 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { String ret = blobStore.putBlob(BUCKET, blob); log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); + @Cleanup BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); @@ -286,6 +292,7 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { .contentLength(objectSize) .build(); String ret = blobStore.putBlob(BUCKET, blob); + @Cleanup BackedInputStream bis = new BlobStoreBackedInputStreamImpl( blobStore, BUCKET, objectKey, (k, md) -> {}, objectSize, 512); assertEquals(bis.available(), objectSize); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java index 951180e4e18..9c3a4fb8fe4 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java @@ -19,9 +19,9 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import static org.testng.Assert.assertEquals; - import java.io.IOException; import java.io.InputStream; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.offload.jcloud.BlobStoreTestBase; import org.testng.annotations.Test; @@ -29,6 +29,7 @@ public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { @Test public void testFillBuffer() throws Exception { + @Cleanup BlobStoreBackedInputStreamImpl bis = new BlobStoreBackedInputStreamImpl( blobStore, BUCKET, "testFillBuffer", (k, md) -> { }, 2048, 512); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 5ca4d6da20b..06e90f53eed 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -39,6 +39,7 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import java.util.stream.IntStream; +import lombok.Cleanup; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; @@ -668,6 +669,7 @@ public class BlockAwareSegmentInputStreamTest { ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt()); int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2; + @Cleanup BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); int bytesRead = 0; @@ -692,6 +694,7 @@ public class BlockAwareSegmentInputStreamTest { ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt()); int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2; + @Cleanup BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); int bytesRead = 0; @@ -723,6 +726,7 @@ public class BlockAwareSegmentInputStreamTest { // set block size equals to (header + lac_entry) size. int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * (entrySize + 4 + 8); + @Cleanup BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize); int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8); @@ -804,6 +808,7 @@ public class BlockAwareSegmentInputStreamTest { public void testCloseReleaseResources() throws Exception { ReadHandle readHandle = new MockReadHandle(1, 10, 10); + @Cleanup BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, 1024); inputStream.read(); Field field = BlockAwareSegmentInputStreamImpl.class.getDeclaredField("paddingBuf"); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java index b82f20ede2c..9ec4585f14a 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java @@ -113,7 +113,7 @@ public class OffloadIndexV2Test { assertEquals(dataObjectLength, 1); assertEquals(dataHeaderLength, 23455); - wrapper.readBytes(segmentMetadataLength); + wrapper.skipBytes(segmentMetadataLength); log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}", magic, indexBlockLength, segmentMetadataLength, indexEntryCount); @@ -262,7 +262,7 @@ public class OffloadIndexV2Test { assertEquals(dataObjectLength, 1); assertEquals(dataHeaderLength, 23455); - wrapper.readBytes(segmentMetadataLength); + wrapper.skipBytes(segmentMetadataLength); log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}", magic, indexBlockLength, segmentMetadataLength, indexEntryCount); @@ -280,7 +280,7 @@ public class OffloadIndexV2Test { int indexEntryCount2 = wrapper.readInt(); assertEquals(indexEntryCount2, 2); int segmentMetadataLength2 = wrapper.readInt(); - wrapper.readBytes(segmentMetadataLength2); + wrapper.skipBytes(segmentMetadataLength2); OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong(), dataHeaderLength);