This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 51202a68895 [fix][test] Fix multiple thread leaks in tests, part 3
(#21543)
51202a68895 is described below
commit 51202a6889582117bf790e9ff2325b9f3119510f
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 8 16:36:03 2023 +0200
[fix][test] Fix multiple thread leaks in tests, part 3 (#21543)
---
.../impl/ManagedLedgerFactoryShutdownTest.java | 2 +
.../authentication/AuthenticationProviderSasl.java | 4 +
.../authentication/SaslAuthenticateTest.java | 44 +--
.../jetty/tls/JettySslContextFactoryTest.java | 13 +-
.../JettySslContextFactoryWithKeyStoreTest.java | 13 +-
.../pulsar/broker/service/BatchMessageTest.java | 1 +
.../client/impl/auth/AuthenticationSasl.java | 6 +
.../pulsar/common/sasl/TGTRefreshThread.java | 2 +-
.../instance/JavaInstanceRunnableTest.java | 35 ++-
.../windowing/WaterMarkEventGeneratorTest.java | 2 +-
.../worker/FunctionRuntimeManagerTest.java | 14 +-
.../functions/worker/MembershipManagerTest.java | 5 +
.../functions/worker/SchedulerManagerTest.java | 1 +
.../bookkeeper/PulsarRegistrationClientTest.java | 1 +
.../pulsar/testclient/PerformanceTransaction.java | 303 +++++++++++----------
.../pulsar/testclient/PerformanceProducerTest.java | 8 +-
.../MLTransactionMetadataStoreTest.java | 53 ++--
.../pulsar/websocket/PingPongSupportTest.java | 11 +-
.../impl/FileSystemManagedLedgerOffloader.java | 4 +
.../offload/filesystem/FileStoreTestBase.java | 26 +-
.../impl/FileSystemManagedLedgerOffloaderTest.java | 16 ++
21 files changed, 345 insertions(+), 219 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index c223490f1c7..1c9fb29066b 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BookKeeper;
@@ -130,6 +131,7 @@ public class ManagedLedgerFactoryShutdownTest {
LedgerHandle ledgerHandle = mock(LedgerHandle.class);
LedgerHandle newLedgerHandle = mock(LedgerHandle.class);
+ @Cleanup("shutdownNow")
OrderedExecutor executor =
OrderedExecutor.newBuilder().name("Test").build();
given(bookKeeper.getMainWorkerPool()).willReturn(executor);
doAnswer(inv -> {
diff --git
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index ed2d05ab722..2616f90c664 100644
---
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -125,6 +125,10 @@ public class AuthenticationProviderSasl implements
AuthenticationProvider {
@Override
public void close() throws IOException {
+ if (jaasCredentialsContainer != null) {
+ jaasCredentialsContainer.close();
+ jaasCredentialsContainer = null;
+ }
}
@Override
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index f4a797d6a4a..ae282a49dc3 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -29,6 +29,7 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.FileWriter;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.file.Files;
@@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import javax.security.auth.login.Configuration;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -77,7 +79,7 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
private static Properties properties;
private static String localHostname = "localhost";
- private static Authentication authSasl;
+ private Authentication authSasl;
@BeforeClass
public static void startMiniKdc() throws Exception {
@@ -146,17 +148,11 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
System.setProperty("java.security.krb5.conf",
krb5file.getAbsolutePath());
Configuration.getConfiguration().refresh();
- // Client config
- Map<String, String> clientSaslConfig = new HashMap<>();
- clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
- clientSaslConfig.put("serverType", "broker");
- log.info("set client jaas section name: PulsarClient");
- authSasl =
AuthenticationFactory.create(AuthenticationSasl.class.getName(),
clientSaslConfig);
- log.info("created AuthenticationSasl");
+
}
@AfterClass(alwaysRun = true)
- public static void stopMiniKdc() {
+ public static void stopMiniKdc() throws IOException {
System.clearProperty("java.security.auth.login.config");
System.clearProperty("java.security.krb5.conf");
if (kdc != null) {
@@ -175,6 +171,14 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
// use http lookup to verify HttpClient works well.
isTcpLookup = false;
+ // Client config
+ Map<String, String> clientSaslConfig = new HashMap<>();
+ clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+ clientSaslConfig.put("serverType", "broker");
+ log.info("set client jaas section name: PulsarClient");
+ authSasl =
AuthenticationFactory.create(AuthenticationSasl.class.getName(),
clientSaslConfig);
+ log.info("created AuthenticationSasl");
+
conf.setAdvertisedAddress(localHostname);
conf.setAuthenticationEnabled(true);
conf.setSaslJaasClientAllowedIds(".*" + "client" + ".*");
@@ -187,9 +191,6 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client" + "@" +
kdc.getRealm()));
- Map<String, String> clientSaslConfig = new HashMap<>();
- clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
- clientSaslConfig.put("serverType", "broker");
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
@@ -307,8 +308,11 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
@Test
public void testSaslOnlyAuthFirstStage() throws Exception {
- AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl)
pulsar.getBrokerService()
-
.getAuthenticationService().getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME);
+ @Cleanup
+ AuthenticationProviderSasl saslServer = new
AuthenticationProviderSasl();
+ // The cache expiration time is set to 50ms. Residual auth info should
be cleaned up
+ conf.setInflightSaslContextExpiryMs(50);
+ saslServer.initialize(conf);
HttpServletRequest servletRequest = mock(HttpServletRequest.class);
doReturn("Init").when(servletRequest).getHeader("State");
@@ -325,9 +329,6 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
field.setAccessible(true);
Cache<Long, AuthenticationState> cache = (Cache<Long,
AuthenticationState>) field.get(saslServer);
assertEquals(cache.asMap().size(), 10);
- // The cache expiration time is set to 1ms. Residual auth info should
be cleaned up
- conf.setInflightSaslContextExpiryMs(1);
- saslServer.initialize(conf);
// Add more auth info into memory
for (int i = 0; i < 10; i++) {
AuthenticationDataProvider dataProvider =
authSasl.getAuthData("localhost");
@@ -339,7 +340,7 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
}
long start = System.currentTimeMillis();
while (true) {
- if (System.currentTimeMillis() - start > 10_00) {
+ if (System.currentTimeMillis() - start > 1000) {
fail();
}
cache = (Cache<Long, AuthenticationState>) field.get(saslServer);
@@ -347,14 +348,14 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
if (CollectionUtils.hasElements(cache.asMap())) {
break;
}
- Thread.yield();
+ Thread.sleep(5);
}
}
@Test
public void testMaxInflightContext() throws Exception {
- AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl)
pulsar.getBrokerService()
-
.getAuthenticationService().getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME);
+ @Cleanup
+ AuthenticationProviderSasl saslServer = new
AuthenticationProviderSasl();
HttpServletRequest servletRequest = mock(HttpServletRequest.class);
doReturn("Init").when(servletRequest).getHeader("State");
conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
@@ -375,5 +376,4 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
//only 1 context was left in the memory
assertEquals(cache.asMap().size(), 1);
}
-
}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java
index bf91dab14fe..2f0c8b627d5 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.config.RegistryBuilder;
@@ -47,6 +48,7 @@ public class JettySslContextFactoryTest {
@Test
public void testJettyTlsServerTls() throws Exception {
+ @Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory =
JettySslContextFactory.createServerSslContext(
@@ -72,15 +74,15 @@ public class JettySslContextFactoryTest {
new SSLConnectionSocketFactory(getClientSslContext(), new
NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
+ @Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" +
connector.getLocalPort());
httpClient.execute(httpGet);
- httpClient.close();
- server.stop();
}
@Test(expectedExceptions = SSLHandshakeException.class)
public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
+ @Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory =
JettySslContextFactory.createServerSslContext(
@@ -110,15 +112,15 @@ public class JettySslContextFactoryTest {
new String[]{"TLSv1.2"}, null, new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
+ @Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" +
connector.getLocalPort());
httpClient.execute(httpGet);
- httpClient.close();
- server.stop();
}
@Test(expectedExceptions = SSLHandshakeException.class)
public void testJettyTlsServerInvalidCipher() throws Exception {
+ @Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory =
JettySslContextFactory.createServerSslContext(
@@ -154,11 +156,10 @@ public class JettySslContextFactoryTest {
new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
+ @Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" +
connector.getLocalPort());
httpClient.execute(httpGet);
- httpClient.close();
- server.stop();
}
private static SSLContext getClientSslContext() throws
GeneralSecurityException, IOException {
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java
index 1d41cd36841..f08f62c480c 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java
@@ -30,6 +30,7 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.TrustManagerFactory;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.config.RegistryBuilder;
@@ -62,6 +63,7 @@ public class JettySslContextFactoryWithKeyStoreTest {
@Test
public void testJettyTlsServerTls() throws Exception {
+ @Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory =
JettySslContextFactory.createServerSslContextWithKeystore(null,
@@ -81,16 +83,16 @@ public class JettySslContextFactoryWithKeyStoreTest {
new SSLConnectionSocketFactory(getClientSslContext(), new
NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
+ @Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" +
connector.getLocalPort());
httpClient.execute(httpGet);
- httpClient.close();
- server.stop();
}
@Test(expectedExceptions = SSLHandshakeException.class)
public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
Configurator.setRootLevel(Level.INFO);
+ @Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory =
JettySslContextFactory.createServerSslContextWithKeystore(null,
@@ -114,15 +116,15 @@ public class JettySslContextFactoryWithKeyStoreTest {
new String[]{"TLSv1.2"}, null, new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
+ @Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" +
connector.getLocalPort());
httpClient.execute(httpGet);
- httpClient.close();
- server.stop();
}
@Test(expectedExceptions = SSLHandshakeException.class)
public void testJettyTlsServerInvalidCipher() throws Exception {
+ @Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory =
JettySslContextFactory.createServerSslContextWithKeystore(null,
@@ -151,11 +153,10 @@ public class JettySslContextFactoryWithKeyStoreTest {
new NoopHostnameVerifier()));
PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager(registryBuilder.build());
httpClientBuilder.setConnectionManager(cm);
+ @Cleanup
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpGet httpGet = new HttpGet("https://localhost:" +
connector.getLocalPort());
httpClient.execute(httpGet);
- httpClient.close();
- server.stop();
}
private static SSLContext getClientSslContext() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index a2d80b2ba60..c66eff2c8a1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -768,6 +768,7 @@ public class BatchMessageTest extends BrokerTestBase {
final Consumer<byte[]> myConsumer =
pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscribe();
// assertEquals(dispatcher.getTotalUnackedMessages(), 1);
+ @Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(10);
final CountDownLatch latch = new CountDownLatch(numMsgs);
diff --git
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index f51fb766f03..f7ec9b964c6 100644
---
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -165,6 +165,12 @@ public class AuthenticationSasl implements Authentication,
EncodedAuthentication
public void close() throws IOException {
if (client != null) {
client.close();
+ client = null;
+ }
+ if (jaasCredentialsContainer != null) {
+ jaasCredentialsContainer.close();
+ jaasCredentialsContainer = null;
+ initializedJAAS = false;
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
index aa4c8de4a9c..6a0a7448f75 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
@@ -97,7 +97,7 @@ public class TGTRefreshThread extends Thread {
@Override
public void run() {
log.info("TGT refresh thread started.");
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
// renewal thread's main loop. if it exits from here, thread will
exit.
KerberosTicket tgt = getTGT();
long now = System.currentTimeMillis();
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 134e77a3b58..c83648132d4 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -37,6 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -60,10 +62,14 @@ import org.apache.pulsar.io.core.SourceContext;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+@Slf4j
public class JavaInstanceRunnableTest {
+ private final List<AutoCloseable> closeables = new ArrayList<>();
static class IntegerSerDe implements SerDe<Integer> {
@Override
@@ -113,8 +119,10 @@ public class JavaInstanceRunnableTest {
.build();
InstanceConfig config = createInstanceConfig(functionDetails);
config.setClusterName("test-cluster");
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build();
+ registerCloseable(pulsarClient);
return new JavaInstanceRunnable(config, clientBuilder,
-
PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build(), null,
null, null, null, null,
+ pulsarClient, null, null, null, null, null,
Thread.currentThread().getContextClassLoader(), null);
}
@@ -493,4 +501,29 @@ public class JavaInstanceRunnableTest {
Assert.assertFalse((boolean)
getPrivateField(javaInstanceRunnable, "isInitialized"));
});
}
+
+ @AfterClass
+ public void cleanupInstanceCache() {
+ InstanceCache.shutdown();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void cleanupCloseables() {
+ callCloseables(closeables);
+ }
+
+ protected <T extends AutoCloseable> T registerCloseable(T closeable) {
+ closeables.add(closeable);
+ return closeable;
+ }
+
+ private static void callCloseables(List<AutoCloseable> closeables) {
+ for (int i = closeables.size() - 1; i >= 0; i--) {
+ try {
+ closeables.get(i).close();
+ } catch (Exception e) {
+ log.error("Failure in calling close method", e);
+ }
+ }
+ }
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
index ce3109b852e..162c9ad51ce 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
@@ -63,7 +63,7 @@ public class WaterMarkEventGeneratorTest {
@AfterMethod(alwaysRun = true)
public void tearDown() {
-// waterMarkEventGenerator.shutdown();
+ waterMarkEventGenerator.shutdown();
eventList.clear();
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index bc56b1766d3..c332b5e6461 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -45,6 +45,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.Functions;
@@ -113,6 +114,7 @@ public class FunctionRuntimeManagerTest {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment add functions
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -205,6 +207,7 @@ public class FunctionRuntimeManagerTest {
// test new assignment delete functions
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -305,6 +308,7 @@ public class FunctionRuntimeManagerTest {
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment update functions
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -444,6 +448,7 @@ public class FunctionRuntimeManagerTest {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment update functions
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -638,6 +643,7 @@ public class FunctionRuntimeManagerTest {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment add functions
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -729,6 +735,7 @@ public class FunctionRuntimeManagerTest {
// test new assignment update functions
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -856,7 +863,6 @@ public class FunctionRuntimeManagerTest {
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
-
fail();
} catch (Exception e) {
assertEquals(e.getMessage(), "A Function Runtime Factory needs to
be set");
@@ -933,6 +939,7 @@ public class FunctionRuntimeManagerTest {
mockRuntimeFactory(runtimeFactoryMockedStatic);
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = new
FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
@@ -984,6 +991,7 @@ public class FunctionRuntimeManagerTest {
})) {
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = new
FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
@@ -1013,6 +1021,7 @@ public class FunctionRuntimeManagerTest {
workerConfig = new WorkerConfig();
workerConfig.setProcessContainerFactory(processContainerFactory);
+ functionRuntimeManager.close();
functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
@@ -1041,6 +1050,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setThreadContainerFactory(threadContainerFactory);
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
+ functionRuntimeManager.close();
functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
@@ -1112,6 +1122,7 @@ public class FunctionRuntimeManagerTest {
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -1201,6 +1212,7 @@ public class FunctionRuntimeManagerTest {
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
workerService,
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index ac3176b3135..66b831f8788 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.assertNotNull;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import lombok.Cleanup;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -106,6 +107,7 @@ public class MembershipManagerTest {
doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -181,6 +183,7 @@ public class MembershipManagerTest {
doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -273,6 +276,7 @@ public class MembershipManagerTest {
doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
workerService,
@@ -357,6 +361,7 @@ public class MembershipManagerTest {
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
+ @Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new
FunctionRuntimeManager(
workerConfig,
workerService,
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 01d34d75945..6a8d15814f3 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -148,6 +148,7 @@ public class SchedulerManagerTest {
@AfterMethod(alwaysRun = true)
public void stop() {
+ schedulerManager.close();
this.executor.shutdownNow();
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 4dcbcda3d90..5660b3518f1 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -363,6 +363,7 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
final String zksConnectionString = zks.getConnectionString();
final String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
// prepare registration manager
+ @Cleanup
ZooKeeper zk = new ZooKeeper(zksConnectionString, 5000, null);
final ServerConfiguration serverConfiguration = new
ServerConfiguration();
serverConfiguration.setZkLedgersRootPath(ledgersRoot);
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 3b422452d64..84aaba5fab6 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -227,27 +227,27 @@ public class PerformanceTransaction {
.memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTransaction(!arguments.isDisableTransaction);
- PulsarClient client = clientBuilder.build();
+ try (PulsarClient client = clientBuilder.build()) {
- ExecutorService executorService = new
ThreadPoolExecutor(arguments.numTestThreads,
- arguments.numTestThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>());
+ ExecutorService executorService = new
ThreadPoolExecutor(arguments.numTestThreads,
+ arguments.numTestThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>());
- long startTime = System.nanoTime();
- long testEndTime = startTime + (long) (arguments.testTime * 1e9);
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- if (!arguments.isDisableTransaction) {
- printTxnAggregatedThroughput(startTime);
- } else {
- printAggregatedThroughput(startTime);
- }
- printAggregatedStats();
- }));
+ long startTime = System.nanoTime();
+ long testEndTime = startTime + (long) (arguments.testTime * 1e9);
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (!arguments.isDisableTransaction) {
+ printTxnAggregatedThroughput(startTime);
+ } else {
+ printAggregatedThroughput(startTime);
+ }
+ printAggregatedStats();
+ }));
- // start perf test
- AtomicBoolean executing = new AtomicBoolean(true);
+ // start perf test
+ AtomicBoolean executing = new AtomicBoolean(true);
RateLimiter rateLimiter = arguments.openTxnRate > 0
? RateLimiter.create(arguments.openTxnRate)
@@ -304,97 +304,98 @@ public class PerformanceTransaction {
}
Transaction transaction = atomicReference.get();
for (List<Consumer<byte[]>> subscriptions : consumers)
{
- for (Consumer<byte[]> consumer :
subscriptions) {
- for (int j = 0; j <
arguments.numMessagesReceivedPerTransaction; j++) {
- Message<byte[]> message = null;
- try {
- message = consumer.receive();
- } catch (PulsarClientException e) {
- log.error("Receive message
failed", e);
- executorService.shutdownNow();
- PerfClientUtils.exit(1);
- }
- long receiveTime = System.nanoTime();
- if (!arguments.isDisableTransaction) {
-
consumer.acknowledgeAsync(message.getMessageId(), transaction)
- .thenRun(() -> {
- long latencyMicros =
NANOSECONDS.toMicros(
-
System.nanoTime() - receiveTime);
-
messageAckRecorder.recordValue(latencyMicros);
-
messageAckCumulativeRecorder.recordValue(latencyMicros);
-
numMessagesAckSuccess.increment();
- }).exceptionally(exception
-> {
- if (exception instanceof
InterruptedException && !executing.get()) {
- return null;
- }
- log.error(
- "Ack message failed
with transaction {} throw exception",
- transaction,
exception);
-
numMessagesAckFailed.increment();
- return null;
- });
- } else {
-
consumer.acknowledgeAsync(message).thenRun(() -> {
- long latencyMicros =
NANOSECONDS.toMicros(
- System.nanoTime() -
receiveTime);
-
messageAckRecorder.recordValue(latencyMicros);
-
messageAckCumulativeRecorder.recordValue(latencyMicros);
-
numMessagesAckSuccess.increment();
- }).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
+ for (Consumer<byte[]> consumer : subscriptions) {
+ for (int j = 0; j <
arguments.numMessagesReceivedPerTransaction; j++) {
+ Message<byte[]> message = null;
+ try {
+ message = consumer.receive();
+ } catch (PulsarClientException e) {
+ log.error("Receive message failed", e);
+ executorService.shutdownNow();
+ PerfClientUtils.exit(1);
+ }
+ long receiveTime = System.nanoTime();
+ if (!arguments.isDisableTransaction) {
+
consumer.acknowledgeAsync(message.getMessageId(), transaction)
+ .thenRun(() -> {
+ long latencyMicros =
NANOSECONDS.toMicros(
+ System.nanoTime()
- receiveTime);
+
messageAckRecorder.recordValue(latencyMicros);
+
messageAckCumulativeRecorder.recordValue(latencyMicros);
+
numMessagesAckSuccess.increment();
+ }).exceptionally(exception -> {
+ if (exception instanceof
InterruptedException && !executing.get()) {
+ return null;
+ }
+ log.error(
+ "Ack message
failed with transaction {} throw exception",
+ transaction,
exception);
+
numMessagesAckFailed.increment();
return null;
- }
- log.error(
- "Ack message failed
with transaction {} throw exception",
- transaction,
exception);
-
numMessagesAckFailed.increment();
+ });
+ } else {
+
consumer.acknowledgeAsync(message).thenRun(() -> {
+ long latencyMicros =
NANOSECONDS.toMicros(
+ System.nanoTime() -
receiveTime);
+
messageAckRecorder.recordValue(latencyMicros);
+
messageAckCumulativeRecorder.recordValue(latencyMicros);
+ numMessagesAckSuccess.increment();
+ }).exceptionally(exception -> {
+ if (exception instanceof
InterruptedException && !executing.get()) {
return null;
- });
- }
+ }
+ log.error(
+ "Ack message failed with
transaction {} throw exception",
+ transaction, exception);
+ numMessagesAckFailed.increment();
+ return null;
+ });
+ }
}
}
}
- for (Producer<byte[]> producer : producers){
+ for (Producer<byte[]> producer : producers) {
for (int j = 0; j <
arguments.numMessagesProducedPerTransaction; j++) {
long sendTime = System.nanoTime();
if (!arguments.isDisableTransaction) {
producer.newMessage(transaction).value(payloadBytes)
.sendAsync().thenRun(() -> {
- long latencyMicros =
NANOSECONDS.toMicros(
- System.nanoTime() - sendTime);
-
messageSendRecorder.recordValue(latencyMicros);
-
messageSendRCumulativeRecorder.recordValue(latencyMicros);
- numMessagesSendSuccess.increment();
- }).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
- return null;
- }
- log.error("Send transaction message
failed with exception : ", exception);
- numMessagesSendFailed.increment();
- return null;
- });
+ long latencyMicros =
NANOSECONDS.toMicros(
+ System.nanoTime() -
sendTime);
+
messageSendRecorder.recordValue(latencyMicros);
+
messageSendRCumulativeRecorder.recordValue(latencyMicros);
+
numMessagesSendSuccess.increment();
+ }).exceptionally(exception -> {
+ if (exception instanceof
InterruptedException && !executing.get()) {
+ return null;
+ }
+ log.error("Send transaction
message failed with exception : ",
+ exception);
+
numMessagesSendFailed.increment();
+ return null;
+ });
} else {
producer.newMessage().value(payloadBytes)
.sendAsync().thenRun(() -> {
- long latencyMicros =
NANOSECONDS.toMicros(
- System.nanoTime() - sendTime);
-
messageSendRecorder.recordValue(latencyMicros);
-
messageSendRCumulativeRecorder.recordValue(latencyMicros);
- numMessagesSendSuccess.increment();
- }).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
- return null;
- }
- log.error("Send message failed with
exception : ", exception);
- numMessagesSendFailed.increment();
- return null;
- });
+ long latencyMicros =
NANOSECONDS.toMicros(
+ System.nanoTime() -
sendTime);
+
messageSendRecorder.recordValue(latencyMicros);
+
messageSendRCumulativeRecorder.recordValue(latencyMicros);
+
numMessagesSendSuccess.increment();
+ }).exceptionally(exception -> {
+ if (exception instanceof
InterruptedException && !executing.get()) {
+ return null;
+ }
+ log.error("Send message failed
with exception : ", exception);
+
numMessagesSendFailed.increment();
+ return null;
+ });
}
}
}
- if (rateLimiter != null){
+ if (rateLimiter != null) {
rateLimiter.tryAcquire();
}
if (!arguments.isDisableTransaction) {
@@ -437,13 +438,13 @@ public class PerformanceTransaction {
atomicReference.compareAndSet(transaction,
newTransaction);
totalNumTxnOpenTxnSuccess.increment();
break;
- } catch (Exception throwable){
- if (throwable instanceof
InterruptedException && !executing.get()) {
- break;
- }
- log.error("Failed to new transaction
with exception: ", throwable);
- totalNumTxnOpenTxnFail.increment();
+ } catch (Exception throwable) {
+ if (throwable instanceof
InterruptedException && !executing.get()) {
+ break;
}
+ log.error("Failed to new transaction with
exception: ", throwable);
+ totalNumTxnOpenTxnFail.increment();
+ }
}
} else {
totalNumTxnOpenTxnSuccess.increment();
@@ -455,68 +456,68 @@ public class PerformanceTransaction {
}
+ // Print report stats
+ long oldTime = System.nanoTime();
- // Print report stats
- long oldTime = System.nanoTime();
-
- Histogram reportSendHistogram = null;
- Histogram reportAckHistogram = null;
+ Histogram reportSendHistogram = null;
+ Histogram reportAckHistogram = null;
- String statsFileName = "perf-transaction-" +
System.currentTimeMillis() + ".hgrm";
- log.info("Dumping latency stats to {}", statsFileName);
+ String statsFileName = "perf-transaction-" +
System.currentTimeMillis() + ".hgrm";
+ log.info("Dumping latency stats to {}", statsFileName);
- PrintStream histogramLog = new PrintStream(new
FileOutputStream(statsFileName), false);
- HistogramLogWriter histogramLogWriter = new
HistogramLogWriter(histogramLog);
+ PrintStream histogramLog = new PrintStream(new
FileOutputStream(statsFileName), false);
+ HistogramLogWriter histogramLogWriter = new
HistogramLogWriter(histogramLog);
- // Some log header bits
- histogramLogWriter.outputLogFormatVersion();
- histogramLogWriter.outputLegend();
+ // Some log header bits
+ histogramLogWriter.outputLogFormatVersion();
+ histogramLogWriter.outputLegend();
- while (executing.get()) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- break;
+ while (executing.get()) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ long now = System.nanoTime();
+ double elapsed = (now - oldTime) / 1e9;
+ long total = totalNumEndTxnOpFailed.sum() +
totalNumTxnOpenTxnSuccess.sum();
+ double rate = numTxnOpSuccess.sumThenReset() / elapsed;
+ reportSendHistogram =
messageSendRecorder.getIntervalHistogram(reportSendHistogram);
+ reportAckHistogram =
messageAckRecorder.getIntervalHistogram(reportAckHistogram);
+ String txnOrTaskLog = !arguments.isDisableTransaction
+ ? "Throughput transaction: {} transaction executes ---
{} transaction/s"
+ : "Throughput task: {} task executes --- {} task/s";
+ log.info(
+ txnOrTaskLog + " --- send Latency: mean: {} ms - med:
{} "
+ + "- 95pct: {} - 99pct: {} - 99.9pct: {} -
99.99pct: {} - Max: {}"
+ + " --- ack Latency: "
+ + "mean: {} ms - med: {} - 95pct: {} - 99pct:
{} - 99.9pct: {} - 99.99pct: {} - Max: "
+ + "{}",
+ INTFORMAT.format(total),
+ DEC.format(rate),
+ DEC.format(reportSendHistogram.getMean() / 1000.0),
+
DEC.format(reportSendHistogram.getValueAtPercentile(50) / 1000.0),
+
DEC.format(reportSendHistogram.getValueAtPercentile(95) / 1000.0),
+
DEC.format(reportSendHistogram.getValueAtPercentile(99) / 1000.0),
+
DEC.format(reportSendHistogram.getValueAtPercentile(99.9) / 1000.0),
+
DEC.format(reportSendHistogram.getValueAtPercentile(99.99) / 1000.0),
+ DEC.format(reportSendHistogram.getMaxValue() / 1000.0),
+ DEC.format(reportAckHistogram.getMean() / 1000.0),
+ DEC.format(reportAckHistogram.getValueAtPercentile(50)
/ 1000.0),
+ DEC.format(reportAckHistogram.getValueAtPercentile(95)
/ 1000.0),
+ DEC.format(reportAckHistogram.getValueAtPercentile(99)
/ 1000.0),
+
DEC.format(reportAckHistogram.getValueAtPercentile(99.9) / 1000.0),
+
DEC.format(reportAckHistogram.getValueAtPercentile(99.99) / 1000.0),
+ DEC.format(reportAckHistogram.getMaxValue() / 1000.0));
+
+
histogramLogWriter.outputIntervalHistogram(reportSendHistogram);
+ histogramLogWriter.outputIntervalHistogram(reportAckHistogram);
+ reportSendHistogram.reset();
+ reportAckHistogram.reset();
+
+ oldTime = now;
}
- long now = System.nanoTime();
- double elapsed = (now - oldTime) / 1e9;
- long total = totalNumEndTxnOpFailed.sum() +
totalNumTxnOpenTxnSuccess.sum();
- double rate = numTxnOpSuccess.sumThenReset() / elapsed;
- reportSendHistogram =
messageSendRecorder.getIntervalHistogram(reportSendHistogram);
- reportAckHistogram =
messageAckRecorder.getIntervalHistogram(reportAckHistogram);
- String txnOrTaskLog = !arguments.isDisableTransaction
- ? "Throughput transaction: {} transaction executes --- {}
transaction/s"
- : "Throughput task: {} task executes --- {} task/s";
- log.info(
- txnOrTaskLog + " --- send Latency: mean: {} ms - med: {} "
- + "- 95pct: {} - 99pct: {} - 99.9pct: {} -
99.99pct: {} - Max: {}" + " --- ack Latency: "
- + "mean: {} ms - med: {} - 95pct: {} - 99pct: {} -
99.9pct: {} - 99.99pct: {} - Max: {}",
- INTFORMAT.format(total),
- DEC.format(rate),
- DEC.format(reportSendHistogram.getMean() / 1000.0),
- DEC.format(reportSendHistogram.getValueAtPercentile(50) /
1000.0),
- DEC.format(reportSendHistogram.getValueAtPercentile(95) /
1000.0),
- DEC.format(reportSendHistogram.getValueAtPercentile(99) /
1000.0),
- DEC.format(reportSendHistogram.getValueAtPercentile(99.9)
/ 1000.0),
- DEC.format(reportSendHistogram.getValueAtPercentile(99.99)
/ 1000.0),
- DEC.format(reportSendHistogram.getMaxValue() / 1000.0),
- DEC.format(reportAckHistogram.getMean() / 1000.0),
- DEC.format(reportAckHistogram.getValueAtPercentile(50) /
1000.0),
- DEC.format(reportAckHistogram.getValueAtPercentile(95) /
1000.0),
- DEC.format(reportAckHistogram.getValueAtPercentile(99) /
1000.0),
- DEC.format(reportAckHistogram.getValueAtPercentile(99.9) /
1000.0),
- DEC.format(reportAckHistogram.getValueAtPercentile(99.99)
/ 1000.0),
- DEC.format(reportAckHistogram.getMaxValue() / 1000.0));
-
- histogramLogWriter.outputIntervalHistogram(reportSendHistogram);
- histogramLogWriter.outputIntervalHistogram(reportAckHistogram);
- reportSendHistogram.reset();
- reportAckHistogram.reset();
-
- oldTime = now;
}
-
-
}
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index bf2fade1c67..20679d83676 100644
---
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -169,9 +170,9 @@ public class PerformanceProducerTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testBatchingDisabled() throws Exception {
PerformanceProducer.Arguments arguments = new
PerformanceProducer.Arguments();
-
+
int producerId = 0;
-
+
String topic = testTopic + UUID.randomUUID();
arguments.topics = List.of(topic);
arguments.msgRate = 10;
@@ -181,8 +182,9 @@ public class PerformanceProducerTest extends
MockedPulsarServiceBaseTest {
ClientBuilder clientBuilder =
PerfClientUtils.createClientBuilderFromArguments(arguments)
.enableTransaction(arguments.isEnableTransaction);
+ @Cleanup
PulsarClient client = clientBuilder.build();
-
+
ProducerBuilderImpl<byte[]> builder = (ProducerBuilderImpl<byte[]>)
PerformanceProducer.createProducerBuilder(client, arguments, producerId);
Assert.assertFalse(builder.getConf().isBatchingEnabled());
}
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 3b831ad38ba..1e10db9753a 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -18,9 +18,20 @@
*/
package org.apache.pulsar.transaction.coordinator;
+import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
+import static
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -35,41 +46,34 @@ import org.apache.pulsar.common.util.FutureUtil;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
-import static
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
- private HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
- 1, TimeUnit.MILLISECONDS);
+ private HashedWheelTimer transactionTimer;
public MLTransactionMetadataStoreTest() {
super(3);
}
+ @BeforeClass
+ public void initTimer() {
+ transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ }
+
@AfterClass
- public void cleanup(){
+ public void cleanupTimer(){
transactionTimer.stop();
}
@@ -84,9 +88,11 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator =
new MLTransactionSequenceIdGenerator();
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+ @Cleanup("closeAsync")
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig, txnLogBufferedWriterConfig,
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+ @Cleanup("closeAsync")
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
@@ -172,9 +178,11 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator =
new MLTransactionSequenceIdGenerator();
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
managedLedgerConfig.setMaxEntriesPerLedger(3);
+ @Cleanup("closeAsync")
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig, disabledBufferedWriter, transactionTimer,
DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+ @Cleanup("closeAsync")
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
@@ -203,6 +211,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID,
factory,
managedLedgerConfig, disabledBufferedWriter, transactionTimer,
DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+ transactionMetadataStore.closeAsync();
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
@@ -229,10 +238,12 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
managedLedgerConfig.setMaxEntriesPerLedger(2);
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator =
new MLTransactionSequenceIdGenerator();
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+ @Cleanup("closeAsync")
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig, txnLogBufferedWriterConfig,
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+ @Cleanup("closeAsync")
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
@@ -284,6 +295,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
DISABLED_BUFFERED_WRITER_METRICS);
txnLog2.initialize().get(2, TimeUnit.SECONDS);
+ @Cleanup("closeAsync")
MLTransactionMetadataStore transactionMetadataStoreTest =
new
MLTransactionMetadataStore(transactionCoordinatorID,
txnLog2, new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
@@ -357,9 +369,11 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator =
new MLTransactionSequenceIdGenerator();
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+ @Cleanup("closeAsync")
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig, txnLogBufferedWriterConfig,
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+ @Cleanup("closeAsync")
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
@@ -435,9 +449,11 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator =
new MLTransactionSequenceIdGenerator();
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+ @Cleanup("closeAsync")
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig, txnLogBufferedWriterConfig,
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+ @Cleanup("closeAsync")
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
@@ -456,6 +472,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID,
factory,
managedLedgerConfig, txnLogBufferedWriterConfig,
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+ transactionMetadataStore.closeAsync();
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
@@ -475,9 +492,11 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator =
new MLTransactionSequenceIdGenerator();
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+ @Cleanup("closeAsync")
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig, txnLogBufferedWriterConfig,
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+ @Cleanup("closeAsync")
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
index 8119c2f1f81..1ce858ec4a1 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
+import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
@@ -56,12 +57,13 @@ import org.testng.annotations.Test;
*/
public class PingPongSupportTest {
- private static Server server;
+ private Server server;
- private static final WebExecutorThreadPool executor = new
WebExecutorThreadPool(6, "pulsar-websocket-web-test");
+ private WebExecutorThreadPool executor;
@BeforeClass
- public static void setup() throws Exception {
+ public void setup() throws Exception {
+ executor = new WebExecutorThreadPool(6, "pulsar-websocket-web-test");
server = new Server(executor);
List<ServerConnector> connectors = new ArrayList<>();
ServerConnector connector = new ServerConnector(server);
@@ -90,7 +92,7 @@ public class PingPongSupportTest {
}
@AfterClass(alwaysRun = true)
- public static void tearDown() throws Exception {
+ public void tearDown() throws Exception {
if (server != null) {
server.stop();
}
@@ -108,6 +110,7 @@ public class PingPongSupportTest {
@Test(dataProvider = "endpoint")
public void testPingPong(String endpoint) throws Exception {
+ @Cleanup("stop")
HttpClient httpClient = new HttpClient();
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
diff --git
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 67e5cfd3560..56612adc1ef 100644
---
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
import static
org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.Recycler;
import java.io.IOException;
import java.util.Iterator;
@@ -402,5 +403,8 @@ public class FileSystemManagedLedgerOffloader implements
LedgerOffloader {
log.error("FileSystemManagedLedgerOffloader close failed!", e);
}
}
+ if (assignmentScheduler != null) {
+ MoreExecutors.shutdownAndAwaitTermination(assignmentScheduler, 5,
TimeUnit.SECONDS);
+ }
}
}
diff --git
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 477a03e2ca5..9609362e9b7 100644
---
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.offload.filesystem;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;
import java.util.concurrent.Executors;
@@ -53,12 +54,15 @@ public abstract class FileStoreTestBase {
}
@AfterClass(alwaysRun = true)
- public final void afterClass() {
+ public final void afterClass() throws IOException {
cleanup();
}
- public void cleanup() {
- scheduler.shutdownNow();
+ public void cleanup() throws IOException {
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ scheduler = null;
+ }
}
@BeforeMethod(alwaysRun = true)
@@ -66,6 +70,7 @@ public abstract class FileStoreTestBase {
File baseDir =
Files.createTempDirectory(basePath).toFile().getAbsoluteFile();
Configuration conf = new Configuration();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
baseDir.getAbsolutePath());
+ conf.set("dfs.namenode.gc.time.monitor.enable", "false");
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
hdfsCluster = builder.build();
@@ -80,9 +85,18 @@ public abstract class FileStoreTestBase {
@AfterMethod(alwaysRun = true)
public void tearDown() {
- hdfsCluster.shutdown(true, true);
- hdfsCluster.close();
- scheduledExecutorService.shutdownNow();
+ if (fileSystemManagedLedgerOffloader != null) {
+ fileSystemManagedLedgerOffloader.close();
+ fileSystemManagedLedgerOffloader = null;
+ }
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown(true, true);
+ hdfsCluster = null;
+ }
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ scheduledExecutorService = null;
+ }
}
public String getURI() {
diff --git
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index 7276be51217..71fe5ec7219 100644
---
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
@@ -41,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -62,6 +64,14 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
map.put("ManagedLedgerName", managedLedgerName);
}
+ @Override
+ public void cleanup() throws IOException {
+ if (bk != null) {
+ bk.shutdown();
+ }
+ super.cleanup();
+ }
+
private ReadHandle buildReadHandle() throws Exception {
lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32,
"foobar".getBytes());
@@ -86,6 +96,12 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
super.start();
}
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void tearDown() {
+ super.tearDown();
+ }
+
@Test
public void testOffloadAndRead() throws Exception {
LedgerOffloader offloader = fileSystemManagedLedgerOffloader;