This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8682776  Fix: managedledger factory shutdown stuck when any of ledger 
future-result is not completed (#1945)
8682776 is described below

commit 8682776b2ae82353e05975c69a2fa7cd21f99a03
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Jun 8 17:35:48 2018 -0700

    Fix: managedledger factory shutdown stuck when any of ledger future-result 
is not completed (#1945)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  1 +
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 36 +++++-----------------
 2 files changed, 9 insertions(+), 28 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index cb4d2f9..2d90725 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -272,6 +272,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : 
ledgers.values()) {
             ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
             if (ledger == null) {
+                latch.countDown();
                 continue;
             }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 328d6b2..531f8ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -26,7 +26,6 @@ import java.io.File;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
-import java.net.URI;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -103,8 +102,6 @@ public class PulsarSinkE2ETest {
     final String tenant = "external-repl-prop";
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
     String primaryHost;
-    ExecutorService executor;
-    ExecutorService workerExecutor;
 
     private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
     private final int brokerWebServicePort = PortManager.nextFreePort();
@@ -125,9 +122,6 @@ public class PulsarSinkE2ETest {
 
         log.info("--- Setting up method {} ---", method.getName());
 
-        executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>());
-        workerExecutor = Executors.newScheduledThreadPool(1, new 
DefaultThreadFactory("pulsar-worker-test"));
-
         // Start local bookkeeper ensemble
         bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
         bkEnsemble.start();
@@ -158,10 +152,7 @@ public class PulsarSinkE2ETest {
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         urlTls = new URL(brokerServiceUrl);
-        boolean isFunctionWebServerRequired = method.getName()
-                .equals("testExternalReplicatorRedirectionToWorkerService");
-        Optional<WorkerService> functionWorkerService = 
isFunctionWebServerRequired ? Optional.ofNullable(null)
-                : Optional.of(functionsWorkerService);
+        Optional<WorkerService> functionWorkerService = 
Optional.of(functionsWorkerService);
         pulsar = new PulsarService(config, functionWorkerService);
         pulsar.start();
 
@@ -197,29 +188,17 @@ public class PulsarSinkE2ETest {
         TenantInfo propAdmin = new TenantInfo();
         
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
         admin.tenants().updateTenant(tenant, propAdmin);
-
-        if (isFunctionWebServerRequired) {
-            URI dlogURI = 
Utils.initializeDlogNamespace(config.getZookeeperServers(), "/ledgers");
-            functionsWorkerService.start(dlogURI);
-            functionsWorkerServer = new WorkerServer(functionsWorkerService);
-            workerExecutor.submit(functionsWorkerServer);
-        }
+       
         Thread.sleep(100);
     }
 
     @AfterMethod
     void shutdown() throws Exception {
         log.info("--- Shutting down ---");
-        if (executor != null) {
-            executor.shutdown();
-        }
-        if (workerExecutor != null) {
-            workerExecutor.shutdown();
-        }
         pulsarClient.close();
         admin.close();
-        pulsar.close();
         functionsWorkerService.stop();
+        pulsar.close();
         bkEnsemble.stop();
     }
 
@@ -260,7 +239,7 @@ public class PulsarSinkE2ETest {
      * 
      * @throws Exception
      */
-    @Test
+    @Test(timeOut = 20000)
     public void testE2EPulsarSink() throws Exception {
 
         final String namespacePortion = "myReplNs";
@@ -289,7 +268,8 @@ public class PulsarSinkE2ETest {
         // validate pulsar sink consumer has started on the topic
         
Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 
1);
 
-        for (int i = 0; i < 5; i++) {
+        int totalMsgs = 5;
+        for (int i = 0; i < totalMsgs; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
@@ -304,8 +284,8 @@ public class PulsarSinkE2ETest {
         }, 5, 150);
         // validate pulsar-sink consumer has consumed all messages and 
delivered to Pulsar sink but unacked messages
         // due to publish failure
-        Assert.assertEquals(
-                
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
 0);
+        Assert.assertNotEquals(
+                
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
 totalMsgs);
 
     }
 

-- 
To stop receiving notification emails like this one, please contact
rdhaba...@apache.org.

Reply via email to