littleorca opened a new issue #11412:
URL: https://github.com/apache/pulsar/issues/11412
**Describe the bug**
For the moment that ManagedLedgerFactoryImpl.open() runs into
MetaStoreImpl.getManagedLedgerInfo() and waiting for the future returned by
store.get() to complete, if ManagedLedgerFactoryImpl.shutdown() is invoked
then, the executor will be shutdown, and thus the future will be unable to run
following stage after it's completion, as the executor will reject.
Unfortunately, the exceptionally() also requires the executor, and the failure
callback won't be called, open() will never return.
**To Reproduce**
Refer to the following test:
```java
public class ManagedLedgerTest {
@Test
public void openEncounteredShutdown() throws Exception {
final String ledgerName = UUID.randomUUID().toString();
final long version = 0;
final long createTimeMillis = System.currentTimeMillis();
MetadataStore metadataStore = mock(MetadataStore.class);
given(metadataStore.get(any())).willAnswer(inv -> {
String path = inv.getArgumentAt(0, String.class);
if (path == null) {
throw new IllegalArgumentException("Path is null.");
}
if (path.endsWith(ledgerName)) { // ledger
MLDataFormats.ManagedLedgerInfo.Builder mli =
MLDataFormats.ManagedLedgerInfo.newBuilder()
.addLedgerInfo(0,
MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder()
.setLedgerId(0)
.setEntries(0)
.setTimestamp(System.currentTimeMillis()));
Stat stat = new Stat(path, version, createTimeMillis,
createTimeMillis);
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("store.get return with ml");
return Optional.of(new
GetResult(mli.build().toByteArray(), stat));
});
} else if (path.contains(ledgerName)) { // cursor
MLDataFormats.ManagedCursorInfo.Builder mci =
MLDataFormats.ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(-1)
.setMarkDeleteLedgerId(0)
.setMarkDeleteLedgerId(-1);
Stat stat = new Stat(path, version, createTimeMillis,
createTimeMillis);
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("store.get return with mc");
return Optional.of(new
GetResult(mci.build().toByteArray(), stat));
});
} else {
throw new IllegalArgumentException("Invalid path: " + path);
}
});
given(metadataStore.put(anyString(), any(), any())).willAnswer(inv
-> {
Optional<Long> expectedVersion = inv.getArgumentAt(2,
Optional.class);
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Stat(inv.getArgumentAt(0, String.class),
expectedVersion.orElse(0L) + 1, createTimeMillis, System.currentTimeMillis());
});
});
given(metadataStore.getChildren(anyString())).willAnswer(inv -> {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Collections.singletonList("cursor");
});
});
BookKeeper bookKeeper = mock(BookKeeper.class);
LedgerHandle ledgerHandle = mock(LedgerHandle.class);
LedgerHandle newLedgerHandle = mock(LedgerHandle.class);
OrderedExecutor executor =
OrderedExecutor.newBuilder().name("Test").build();
given(bookKeeper.getMainWorkerPool()).willReturn(executor);
doAnswer(inv -> {
Thread.sleep(100);
AsyncCallback.OpenCallback cb = inv.getArgumentAt(3,
AsyncCallback.OpenCallback.class);
cb.openComplete(0, ledgerHandle, inv.getArgumentAt(4,
Object.class));
return null;
}).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(),
any());
doAnswer(inv -> {
Thread.sleep(100);
AsyncCallback.CreateCallback cb = inv.getArgumentAt(5,
AsyncCallback.CreateCallback.class);
cb.createComplete(0, newLedgerHandle, inv.getArgumentAt(6,
Object.class));
return null;
}).when(bookKeeper).asyncCreateLedger(anyInt(), anyInt(), anyInt(),
any(), any(), any()/*callback*/, any(), any());
ManagedLedgerFactoryImpl factory = new
ManagedLedgerFactoryImpl(metadataStore, bookKeeper);
CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture.runAsync(() -> {
try {
ManagedLedger ml = factory.open(ledgerName);
} catch (Throwable e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
Thread.sleep(5000);
System.out.println("Shutdown...");
factory.shutdown();
if (!countDownLatch.await(60, TimeUnit.SECONDS)) {
fail("open() not returned in time.");
}
}
}
```
**Expected behavior**
ManagedLedgerFactoryImpl.open()/asyncOpen() should fail with error instead
of hang forever.
**Additional context**
VERSION: v2.8.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]