This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new d9da76c506a [fix][test] Fix multiple resource leaks in tests (#24218)
d9da76c506a is described below

commit d9da76c506a8b2826df85e1b4bbd3a0f658cc758
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Fri Apr 25 21:58:09 2025 +0300

    [fix][test] Fix multiple resource leaks in tests (#24218)
    
    (cherry picked from commit b6afb168c834fd76c6c4921d326858984fff044c)
---
 .../BookieRackAffinityMappingTest.java             |  3 +-
 .../delayed/AbstractDeliveryTrackerTest.java       | 19 +++++++++----
 .../delayed/DelayedDeliveryTrackerFactoryTest.java | 33 +++++++++++++++-------
 .../delayed/InMemoryDeliveryTrackerTest.java       |  4 +--
 .../bucket/BucketDelayedDeliveryTrackerTest.java   |  2 ++
 .../pulsar/broker/service/BrokerServiceTest.java   |  1 +
 .../service/TopicPublishRateThrottleTest.java      |  5 +++-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  1 +
 .../prometheus/NamespaceStatsAggregatorTest.java   |  2 ++
 .../pulsar/broker/zookeeper/ZKReconnectTest.java   |  4 ++-
 .../client/impl/ProduceWithMessageIdTest.java      |  2 +-
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  1 +
 .../pulsar/client/impl/PulsarClientImplTest.java   |  2 +-
 .../pulsar/client/impl/TopicListWatcherTest.java   |  2 ++
 .../client/impl/UnAckedMessageTrackerTest.java     |  8 ++----
 .../jcloud/BlobStoreBackedInputStreamTest.java     | 11 ++++++--
 .../impl/BlobStoreBackedInputStreamTest.java       |  3 +-
 .../impl/BlockAwareSegmentInputStreamTest.java     |  5 ++++
 .../offload/jcloud/impl/OffloadIndexV2Test.java    |  6 ++--
 19 files changed, 79 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 9cd81604442..96e4d0ed264 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -257,6 +257,7 @@ public class BookieRackAffinityMappingTest {
                 .stream().filter(Objects::nonNull).toList();
         assertEquals(racks.size(), 0);
 
+        @Cleanup("stop")
         HashedWheelTimer timer = new HashedWheelTimer(
                 new 
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
                 bkClientConf.getTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
@@ -348,8 +349,6 @@ public class BookieRackAffinityMappingTest {
         
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), 
"/rack0");
         
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), 
"/default-rack");
         
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), 
"/default-rack");
-
-        timer.stop();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
index 1d166a8db5c..7eb64531b6b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java
@@ -41,22 +41,29 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public abstract class AbstractDeliveryTrackerTest {
 
-    // Create a single shared timer for the test.
-    protected final Timer timer =
-            new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
-                    500, TimeUnit.MILLISECONDS);
+    protected Timer timer;
     protected PersistentDispatcherMultipleConsumers dispatcher;
     protected Clock clock;
 
     protected AtomicLong clockTime;
 
+    @BeforeClass(alwaysRun = true)
+    public void createTimer() {
+        timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
+                        500, TimeUnit.MILLISECONDS);
+    }
+
     @AfterClass(alwaysRun = true)
-    public void cleanup() {
-        timer.stop();
+    public void stopTimer() {
+        if (timer != null) {
+            timer.stop();
+            timer = null;
+        }
     }
 
     @Test(dataProvider = "delayedTracker")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
index bb6ef9d3636..213f277e317 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java
@@ -18,6 +18,14 @@
  */
 package org.apache.pulsar.broker.delayed;
 
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
@@ -28,8 +36,13 @@ import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -37,15 +50,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
-import java.time.Duration;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 public class DelayedDeliveryTrackerFactoryTest extends ProducerConsumerBase {
     @BeforeClass
     @Override
@@ -92,6 +96,11 @@ public class DelayedDeliveryTrackerFactoryTest extends 
ProducerConsumerBase {
         // Mock dispatcher
         PersistentDispatcherMultipleConsumers dispatcher = 
Mockito.mock(PersistentDispatcherMultipleConsumers.class);
         Mockito.doReturn("test").when(dispatcher).getName();
+
+        @Cleanup
+        DelayedDeliveryTrackerFactory originalDelayedDeliveryTrackerFactory =
+                brokerService.getDelayedDeliveryTrackerFactory();
+
         // Mock BucketDelayedDeliveryTrackerFactory
         @Cleanup
         BucketDelayedDeliveryTrackerFactory factory = new 
BucketDelayedDeliveryTrackerFactory();
@@ -120,6 +129,10 @@ public class DelayedDeliveryTrackerFactoryTest extends 
ProducerConsumerBase {
         BrokerService brokerService = pair.getLeft();
         PersistentDispatcherMultipleConsumers dispatcher = pair.getRight();
 
+        @Cleanup
+        DelayedDeliveryTrackerFactory originalDelayedDeliveryTrackerFactory =
+                brokerService.getDelayedDeliveryTrackerFactory();
+
         // Mock InMemoryDelayedDeliveryTrackerFactory
         @Cleanup
         InMemoryDelayedDeliveryTrackerFactory factory = new 
InMemoryDelayedDeliveryTrackerFactory();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index 6711aed924c..22715437333 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -38,6 +38,7 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import lombok.Cleanup;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -209,6 +210,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
 
     @Test
     public void testClose() throws Exception {
+        @Cleanup("stop")
         Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"),
                 1, TimeUnit.MILLISECONDS);
 
@@ -245,7 +247,5 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
         tracker.close();
 
         assertNull(exceptions[0]);
-
-        timer.stop();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index eef10e49cba..b137a725f09 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -423,6 +423,8 @@ public class BucketDelayedDeliveryTrackerTest extends 
AbstractDeliveryTrackerTes
             PositionImpl position = scheduledMessages.pollFirst();
             assertEquals(position, PositionImpl.get(i, i));
         }
+
+        tracker.close();
     }
 
     @Test(dataProvider = "delayedTracker")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index d0b1e74084c..20a2f284cf6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -2014,6 +2014,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         pulsar.getConfiguration().setMetadataSyncEventTopic(topicName);
         PulsarMetadataEventSynchronizer sync = new 
PulsarMetadataEventSynchronizer(pulsar, topicName);
         // set invalid client for retry
+        @Cleanup
         PulsarClientImpl client = (PulsarClientImpl) 
PulsarClient.builder().serviceUrl("http://invalidhost:8080";)
                 .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
         sync.client = client;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
index e251e244a83..e580d478b1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
@@ -32,7 +32,6 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class TopicPublishRateThrottleTest extends BrokerTestBase{
-
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
@@ -71,6 +70,8 @@ public class TopicPublishRateThrottleTest extends 
BrokerTestBase{
         } catch (TimeoutException e) {
             // No-op
         }
+        // Close the PulsarClient gracefully to avoid ByteBuf leak
+        pulsarClient.close();
     }
 
     @Test
@@ -122,6 +123,8 @@ public class TopicPublishRateThrottleTest extends 
BrokerTestBase{
             // No-op
         }
         Assert.assertNotNull(messageId);
+        // Close the PulsarClient gracefully to avoid ByteBuf leak
+        pulsarClient.close();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 4d0849a3406..cf590a2bb3f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1894,6 +1894,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         Clock clock = mock();
         when(clock.millis()).thenAnswer(invocation -> currentTimeMillis.get());
 
+        @Cleanup
         PrometheusMetricsGenerator prometheusMetricsGenerator =
                 new PrometheusMetricsGenerator(pulsar, true, false, false,
                         false, clock);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
index cf923df0411..328a297f3b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats.prometheus;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 import java.util.List;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
@@ -106,6 +107,7 @@ public class NamespaceStatsAggregatorTest {
         PersistentTopicMetrics persistentTopicMetrics = new 
PersistentTopicMetrics();
         
when(topic.getPersistentTopicMetrics()).thenReturn(persistentTopicMetrics);
         topicsMap.put("my-topic", topic);
+        @Cleanup("releaseAll")
         PrometheusMetricStreams metricStreams = Mockito.spy(new 
PrometheusMetricStreams());
 
         // Populate subscriptions stats
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZKReconnectTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZKReconnectTest.java
index 7b9e4beec6b..78aa01f84ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZKReconnectTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZKReconnectTest.java
@@ -50,7 +50,9 @@ public class ZKReconnectTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testGetPartitionMetadataFailAlsoCanProduceMessage() throws 
Exception {
-
+        if (pulsarClient != null) {
+            pulsarClient.shutdown();
+        }
         pulsarClient = PulsarClient.builder().
                 serviceUrl(pulsar.getBrokerServiceUrl())
                 .build();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
index b8efdeb9969..ec7320f004a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
@@ -48,7 +48,7 @@ public class ProduceWithMessageIdTest {
     }
 
     @AfterClass(alwaysRun = true)
-    public void teardown() {
+    public void cleanup() throws Exception {
         if (mockBrokerService != null) {
             mockBrokerService.stop();
             mockBrokerService = null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index a6a3f83ebc3..105947825c6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -954,6 +954,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest 
{
         admin.namespaces().setRetention(ns, retention);
         String badUrl = "pulsar://bad-host:8080";
 
+        @Cleanup
         PulsarClient client = 
PulsarClient.builder().serviceUrl(badUrl).build();
 
         ReaderBuilder<byte[]> readerBuilder = 
client.newReader().topic(topic).startMessageFromRollbackDuration(100,
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 103254a6b90..04048037c76 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -185,11 +185,11 @@ public class PulsarClientImplTest {
         ConnectionPool pool = Mockito.spy(new 
ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop));
         conf.setServiceUrl("pulsar://localhost:6650");
 
+        @Cleanup("stop")
         HashedWheelTimer timer = new HashedWheelTimer();
         PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool, 
timer);
 
         client.shutdown();
-        client.timer().stop();
     }
 
     @Test
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index 1d1a17d50eb..95ac3c834b3 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
+import lombok.Cleanup;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -62,6 +63,7 @@ public class TopicListWatcherTest {
         when(client.getConfiguration()).thenReturn(new 
ClientConfigurationData());
         clientCnxFuture = new CompletableFuture<>();
         when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture);
+        @Cleanup("stop")
         Timer timer = new HashedWheelTimer();
         when(client.timer()).thenReturn(timer);
         String topic = "persistent://tenant/ns/topic\\d+";
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
index b01fbcb879f..f8c64a6d099 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
@@ -32,7 +32,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.Duration;
 import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
-
+import lombok.Cleanup;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -49,6 +49,7 @@ public class UnAckedMessageTrackerTest  {
         ConnectionPool connectionPool = mock(ConnectionPool.class);
         when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
         when(client.getCnxPool()).thenReturn(connectionPool);
+        @Cleanup("stop")
         Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
                 1, TimeUnit.MILLISECONDS);
         when(client.timer()).thenReturn(timer);
@@ -80,8 +81,6 @@ public class UnAckedMessageTrackerTest  {
         assertTrue(tracker.remove(mid));
         assertTrue(tracker.isEmpty());
         assertEquals(tracker.size(), 0);
-
-        timer.stop();
     }
 
     @Test
@@ -90,6 +89,7 @@ public class UnAckedMessageTrackerTest  {
         ConnectionPool connectionPool = mock(ConnectionPool.class);
         when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
         when(client.getCnxPool()).thenReturn(connectionPool);
+        @Cleanup("stop")
         Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
                 1, TimeUnit.MILLISECONDS);
         when(client.timer()).thenReturn(timer);
@@ -127,8 +127,6 @@ public class UnAckedMessageTrackerTest  {
 
         // Assert that all chunk message ID are removed from 
unAckedChunkedMessageIdSequenceMap
         assertEquals(consumer.unAckedChunkedMessageIdSequenceMap.size(), 0);
-
-        timer.stop();
     }
 
 }
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
index 3e5c4b609df..87a9804ec16 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
@@ -23,12 +23,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedInputStreamImpl;
 import org.jclouds.blobstore.BlobStore;
@@ -43,7 +43,7 @@ import org.testng.annotations.Test;
 
 @Slf4j
 public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase {
-    
+
     class RandomInputStream extends InputStream {
         final Random r;
         int bytesRemaining;
@@ -115,6 +115,7 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
         String ret = blobStore.putBlob(BUCKET, blob);
         log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
+        @Cleanup
         BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
@@ -137,6 +138,7 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
         String ret = blobStore.putBlob(BUCKET, blob);
         log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
+        @Cleanup
         BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
@@ -145,6 +147,7 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
 
     @Test(expectedExceptions = KeyNotFoundException.class)
     public void testNotFoundOnRead() throws Exception {
+        @Cleanup
         BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist",
                                                                  (key, md) -> 
{},
                                                                  1234, 1000);
@@ -176,6 +179,7 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
         String ret = blobStore.putBlob(BUCKET, blob);
         log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
+        @Cleanup
         BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
@@ -203,6 +207,7 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
         //BlobStore spiedBlobStore = spy(blobStore);
         BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
 
+        @Cleanup
         BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(spiedBlobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
@@ -250,6 +255,7 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
         String ret = blobStore.putBlob(BUCKET, blob);
         log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
+        @Cleanup
         BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
@@ -286,6 +292,7 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
             .contentLength(objectSize)
             .build();
         String ret = blobStore.putBlob(BUCKET, blob);
+        @Cleanup
         BackedInputStream bis = new BlobStoreBackedInputStreamImpl(
             blobStore, BUCKET, objectKey, (k, md) -> {}, objectSize, 512);
         assertEquals(bis.available(), objectSize);
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java
index 951180e4e18..9c3a4fb8fe4 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java
@@ -19,9 +19,9 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import static org.testng.Assert.assertEquals;
-
 import java.io.IOException;
 import java.io.InputStream;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.offload.jcloud.BlobStoreTestBase;
 import org.testng.annotations.Test;
 
@@ -29,6 +29,7 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
 
     @Test
     public void testFillBuffer() throws Exception {
+        @Cleanup
         BlobStoreBackedInputStreamImpl bis = new 
BlobStoreBackedInputStreamImpl(
             blobStore, BUCKET, "testFillBuffer", (k, md) -> {
         }, 2048, 512);
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
index 5ca4d6da20b..06e90f53eed 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java
@@ -39,6 +39,7 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
+import lombok.Cleanup;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -668,6 +669,7 @@ public class BlockAwareSegmentInputStreamTest {
         ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, 
() -> (byte)r.nextInt());
 
         int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 
2;
+        @Cleanup
         BlockAwareSegmentInputStreamImpl inputStream = new 
BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
 
         int bytesRead = 0;
@@ -692,6 +694,7 @@ public class BlockAwareSegmentInputStreamTest {
         ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, 
() -> (byte)r.nextInt());
 
         int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 
2;
+        @Cleanup
         BlockAwareSegmentInputStreamImpl inputStream = new 
BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
 
         int bytesRead = 0;
@@ -723,6 +726,7 @@ public class BlockAwareSegmentInputStreamTest {
 
         // set block size equals to (header + lac_entry) size.
         int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * 
(entrySize + 4 + 8);
+        @Cleanup
         BlockAwareSegmentInputStreamImpl inputStream = new 
BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
         int expectedEntryCount = (blockSize - 
DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8);
 
@@ -804,6 +808,7 @@ public class BlockAwareSegmentInputStreamTest {
     public void testCloseReleaseResources() throws Exception {
         ReadHandle readHandle = new MockReadHandle(1, 10, 10);
 
+        @Cleanup
         BlockAwareSegmentInputStreamImpl inputStream = new 
BlockAwareSegmentInputStreamImpl(readHandle, 0, 1024);
         inputStream.read();
         Field field = 
BlockAwareSegmentInputStreamImpl.class.getDeclaredField("paddingBuf");
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java
index b82f20ede2c..9ec4585f14a 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java
@@ -113,7 +113,7 @@ public class OffloadIndexV2Test {
         assertEquals(dataObjectLength, 1);
         assertEquals(dataHeaderLength, 23455);
 
-        wrapper.readBytes(segmentMetadataLength);
+        wrapper.skipBytes(segmentMetadataLength);
         log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: 
{}",
                 magic, indexBlockLength, segmentMetadataLength, 
indexEntryCount);
 
@@ -262,7 +262,7 @@ public class OffloadIndexV2Test {
         assertEquals(dataObjectLength, 1);
         assertEquals(dataHeaderLength, 23455);
 
-        wrapper.readBytes(segmentMetadataLength);
+        wrapper.skipBytes(segmentMetadataLength);
         log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: 
{}",
                 magic, indexBlockLength, segmentMetadataLength, 
indexEntryCount);
 
@@ -280,7 +280,7 @@ public class OffloadIndexV2Test {
         int indexEntryCount2 = wrapper.readInt();
         assertEquals(indexEntryCount2, 2);
         int segmentMetadataLength2 = wrapper.readInt();
-        wrapper.readBytes(segmentMetadataLength2);
+        wrapper.skipBytes(segmentMetadataLength2);
 
         OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(),
                 wrapper.readLong(), dataHeaderLength);

Reply via email to