This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8682776 Fix: managedledger factory shutdown stuck when any of ledger future-result is not completed (#1945) 8682776 is described below commit 8682776b2ae82353e05975c69a2fa7cd21f99a03 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Fri Jun 8 17:35:48 2018 -0700 Fix: managedledger factory shutdown stuck when any of ledger future-result is not completed (#1945) --- .../mledger/impl/ManagedLedgerFactoryImpl.java | 1 + .../org/apache/pulsar/io/PulsarSinkE2ETest.java | 36 +++++----------------- 2 files changed, 9 insertions(+), 28 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index cb4d2f9..2d90725 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -272,6 +272,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : ledgers.values()) { ManagedLedgerImpl ledger = ledgerFuture.getNow(null); if (ledger == null) { + latch.countDown(); continue; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 328d6b2..531f8ad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -26,7 +26,6 @@ import java.io.File; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.MalformedURLException; -import java.net.URI; import java.net.URL; import java.util.HashMap; import java.util.HashSet; @@ -103,8 +102,6 @@ public class PulsarSinkE2ETest { final String tenant = "external-repl-prop"; String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; String primaryHost; - ExecutorService executor; - ExecutorService workerExecutor; private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); private final int brokerWebServicePort = PortManager.nextFreePort(); @@ -125,9 +122,6 @@ public class PulsarSinkE2ETest { log.info("--- Setting up method {} ---", method.getName()); - executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - workerExecutor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-worker-test")); - // Start local bookkeeper ensemble bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); bkEnsemble.start(); @@ -158,10 +152,7 @@ public class PulsarSinkE2ETest { functionsWorkerService = createPulsarFunctionWorker(config); urlTls = new URL(brokerServiceUrl); - boolean isFunctionWebServerRequired = method.getName() - .equals("testExternalReplicatorRedirectionToWorkerService"); - Optional<WorkerService> functionWorkerService = isFunctionWebServerRequired ? Optional.ofNullable(null) - : Optional.of(functionsWorkerService); + Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService); pulsar = new PulsarService(config, functionWorkerService); pulsar.start(); @@ -197,29 +188,17 @@ public class PulsarSinkE2ETest { TenantInfo propAdmin = new TenantInfo(); propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); admin.tenants().updateTenant(tenant, propAdmin); - - if (isFunctionWebServerRequired) { - URI dlogURI = Utils.initializeDlogNamespace(config.getZookeeperServers(), "/ledgers"); - functionsWorkerService.start(dlogURI); - functionsWorkerServer = new WorkerServer(functionsWorkerService); - workerExecutor.submit(functionsWorkerServer); - } + Thread.sleep(100); } @AfterMethod void shutdown() throws Exception { log.info("--- Shutting down ---"); - if (executor != null) { - executor.shutdown(); - } - if (workerExecutor != null) { - workerExecutor.shutdown(); - } pulsarClient.close(); admin.close(); - pulsar.close(); functionsWorkerService.stop(); + pulsar.close(); bkEnsemble.stop(); } @@ -260,7 +239,7 @@ public class PulsarSinkE2ETest { * * @throws Exception */ - @Test + @Test(timeOut = 20000) public void testE2EPulsarSink() throws Exception { final String namespacePortion = "myReplNs"; @@ -289,7 +268,8 @@ public class PulsarSinkE2ETest { // validate pulsar sink consumer has started on the topic Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); - for (int i = 0; i < 5; i++) { + int totalMsgs = 5; + for (int i = 0; i < totalMsgs; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); } @@ -304,8 +284,8 @@ public class PulsarSinkE2ETest { }, 5, 150); // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages // due to publish failure - Assert.assertEquals( - admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, 0); + Assert.assertNotEquals( + admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, totalMsgs); } -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.