lhotari commented on code in PR #18688: URL: https://github.com/apache/pulsar/pull/18688#discussion_r1189737621
########## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/DuplicateManagedLedgerTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import io.netty.buffer.ByteBuf; +import java.nio.charset.Charset; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = "broker") +@Slf4j +public class DuplicateManagedLedgerTest extends MockedBookKeeperTestCase { + + private void triggerLedgerRollover(ManagedLedger ledger, int maxEntriesPerLedger) { + new Thread(() -> { + int writeLedgerCount = 2; + for (int i = 0; i < writeLedgerCount; i++) { + for (int j = 0; j < maxEntriesPerLedger; j++) { + byte[] data = String.format("%s_%s", i, j).getBytes(Charset.defaultCharset()); + Object ctx = ""; + ledger.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + + } + }, ctx); + } + } + }).start(); + } + + @Test + public void testConcurrentCloseLedgerAndSwitchLedgerForReproduceIssue() throws Exception { + String managedLedgerName = "lg_" + UUID.randomUUID().toString().replaceAll("-", "_"); + int maxEntriesPerLedger = 5; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setThrottleMarkDelete(1); + config.setMaximumRolloverTime(Integer.MAX_VALUE, TimeUnit.SECONDS); + config.setMaxEntriesPerLedger(5); + final ProcessCoordinator processCoordinator = new ProcessCoordinator(); + + // call "switch ledger" and "managedLedger.close" concurrently. + final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) factory.open(managedLedgerName, config); + waitManagedLedgerStateEquals(managedLedger1, ManagedLedgerImpl.State.LedgerOpened); + processCoordinator.on(); + final ManagedLedgerImpl sequentiallyManagedLedger1 = + makeManagedLedgerWorksWithStrictlySequentially(managedLedger1, processCoordinator); + triggerLedgerRollover(sequentiallyManagedLedger1, maxEntriesPerLedger); + sequentiallyManagedLedger1.close(); + waitManagedLedgerStateNotEquals(managedLedger1, ManagedLedgerImpl.State.Closed); + + // create managedLedger2. + final ManagedLedgerImpl managedLedger2 = (ManagedLedgerImpl) factory.open(managedLedgerName, config); + Assert.assertEquals(factory.ledgers.size(), 1); + Assert.assertNotEquals(managedLedger1, managedLedger2); + waitManagedLedgerInFactoryEquals(managedLedger2); + processCoordinator.off(); + managedLedger1.close(); + waitManagedLedgerStateEquals(managedLedger1, ManagedLedgerImpl.State.Closed); + Assert.assertFalse(factory.ledgers.isEmpty()); + Assert.assertEquals(factory.ledgers.get(managedLedger2.getName()).join(), managedLedger2); + + // cleanup. + managedLedger2.close(); + } + + private ManagedLedgerImpl makeManagedLedgerWorksWithStrictlySequentially(ManagedLedgerImpl originalManagedLedger, + ProcessCoordinator processCoordinator) + throws Exception { + ManagedLedgerImpl sequentiallyManagedLedger = spy(originalManagedLedger); + // step-1. + doAnswer(invocation -> { + synchronized (originalManagedLedger) { + // step-3. + // Wait for `managedLedger.close`, then do task: "asyncCreateLedger()". + // Because the thread selector in "managedLedger.executor" is random logic, so it is possible to fail. + // Adding 1000 tasks to stuck the executor gives a high chance of success. + for (int i = 0; i < 1000; i++) { + originalManagedLedger.getExecutor().execute(() -> { + processCoordinator.waitPreviousAndSetStep(3); + }); + } + LedgerHandle lh = (LedgerHandle) invocation.getArguments()[0]; + processCoordinator.waitPreviousAndSetStep(1); + originalManagedLedger.ledgerClosed(lh); + } + return null; + }).when(sequentiallyManagedLedger).ledgerClosed(any(LedgerHandle.class)); + // step-2. + doAnswer(invocation -> { + processCoordinator.waitPreviousAndSetStep(2); + originalManagedLedger.close(); + return null; + }).when(sequentiallyManagedLedger).close(); + return sequentiallyManagedLedger; + } Review Comment: this looks like a hack, especially the loop of adding 1000 tasks to the executor. The "ProcessCoordinator" implementation looks like something that could be handled with [`java.util.concurrent.Phaser`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Phaser.html). It would be better to modify ManagedLedgerFactoryImpl so that it's possible to override a method that creates the ledger instance. That way it would be possible to have a way to override the method for tests and inject test logic without relying on Mockito, which isn't thread safe. That itself could cause issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
