This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b395ad47971 Revert "Change ``rollCurrentLedgerIfFull`` logic to follow
lazy creation of ledger (#14672) (#16806)
b395ad47971 is described below
commit b395ad47971768826907e9f89e2ce92a2a7c1ca9
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Aug 4 15:04:17 2022 +0800
Revert "Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of
ledger (#14672) (#16806)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 1 +
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 12 ++++---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 41 +++++++++-------------
.../broker/service/BrokerEntryMetadataE2ETest.java | 7 ++--
.../service/CurrentLedgerRolloverIfFullTest.java | 41 ++--------------------
.../MLTransactionMetadataStoreTest.java | 6 ++--
6 files changed, 31 insertions(+), 77 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4da04009de7..29c78dd7bbb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1706,6 +1706,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
ledgerClosed(lh);
+ createLedgerAfterClosed();
}
}, System.nanoTime());
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4fb51885b9e..cea743c79ff 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2367,13 +2367,15 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ledger.addEntry("fourth".getBytes(Encoding));
Position last = ledger.addEntry("last-expired".getBytes(Encoding));
- ledger.getConfig().setMaxEntriesPerLedger(1);
// roll a new ledger
+ int numLedgersBefore = ledger.getLedgersInfo().size();
+ ledger.getConfig().setMaxEntriesPerLedger(1);
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
- Awaitility.await().untilAsserted(() -> {
- Assert.assertEquals(ledger.getLedgersInfo().size(), 1);
- Assert.assertEquals(ledger.getState(),
ManagedLedgerImpl.State.ClosedLedger);
- });
+ Awaitility.await().atMost(20, TimeUnit.SECONDS)
+ .until(() -> ledger.getLedgersInfo().size() >
numLedgersBefore);
// the algorithm looks for "expired" messages
// starting from the first, then it moves to the last message
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index dff1176c86d..d556ff8eb63 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -31,7 +31,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
@@ -69,7 +68,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
@@ -2106,8 +2104,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ledger.addEntry("data".getBytes());
Awaitility.await().untilAsserted(() -> {
- assertEquals(ledger.getLedgersInfoAsList().size(), 1);
- assertEquals(ledger.getState(),
ManagedLedgerImpl.State.ClosedLedger);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 2);
});
}
@@ -2254,7 +2251,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(0);
- config.setMaxEntriesPerLedger(2);
+ config.setMaxEntriesPerLedger(1);
config.setRetentionTime(1, TimeUnit.SECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
@@ -2264,25 +2261,19 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
- long preLedgerId = ml.getLedgersInfoAsList().get(ml.ledgers.size()
-1).getLedgerId();
- ml.pendingAddEntries.add(OpAddEntry.
- createNoRetainBuffer(ml,
ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null));
+ // let current ledger close
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
ml.rollCurrentLedgerIfFull();
- AtomicLong currentLedgerId = new AtomicLong(-1);
- // create a new ledger
- Awaitility.await().untilAsserted(() -> {
-
currentLedgerId.set(ml.getLedgersInfoAsList().get(ml.ledgers.size()
-1).getLedgerId());
- assertNotEquals(preLedgerId, currentLedgerId.get());
- });
// let retention expire
Thread.sleep(1500);
// delete the expired ledger
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
// the closed and expired ledger should be deleted
- assertEquals(ml.getLedgersInfoAsList().size(), 1);
- assertEquals(currentLedgerId.get(),
- ml.getLedgersInfoAsList().get(ml.getLedgersInfoAsList().size()
- 1).getLedgerId());
+ assertTrue(ml.getLedgersInfoAsList().size() <= 1);
+ assertEquals(ml.getTotalSize(), 0);
ml.close();
}
@@ -2547,12 +2538,10 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- Awaitility.await().untilAsserted(() -> {
- assertEquals(managedLedger.getLedgersInfo().size(), 2);
- assertEquals(managedLedger.getState(),
ManagedLedgerImpl.State.ClosedLedger);
- });
+ Awaitility.await().untilAsserted(() ->
assertEquals(managedLedger.getLedgersInfo().size(), 3));
assertEquals(5,
managedLedger.getLedgersInfoAsList().get(0).getEntries());
assertEquals(5,
managedLedger.getLedgersInfoAsList().get(1).getEntries());
+ assertEquals(0,
managedLedger.getLedgersInfoAsList().get(2).getEntries());
log.info("### ledgers {}", managedLedger.getLedgersInfo());
long firstLedger = managedLedger.getLedgersInfo().firstKey();
@@ -2597,8 +2586,8 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> {
- assertEquals(managedLedger.getLedgersInfo().size(), 2);
- assertEquals(managedLedger.getState(),
ManagedLedgerImpl.State.ClosedLedger);
+ assertEquals(managedLedger.getLedgersInfo().size(), 3);
+ assertEquals(managedLedger.getState(),
ManagedLedgerImpl.State.LedgerOpened);
});
assertEquals(5,
managedLedger.getLedgersInfoAsList().get(0).getEntries());
assertEquals(5,
managedLedger.getLedgersInfoAsList().get(1).getEntries());
@@ -3446,10 +3435,12 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
// all the messages have benn acknowledged
// and all the ledgers have been removed except the last ledger
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
- Awaitility.await().untilAsserted(() ->
- Assert.assertEquals(ledger.getState(),
ManagedLedgerImpl.State.ClosedLedger));
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(ledger.getTotalSize(), 0));
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 3356f0c5178..49b4742b71d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -391,13 +391,10 @@ public class BrokerEntryMetadataE2ETest extends
BrokerTestBase {
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().atMost(Duration.ofSeconds(3))
- .untilAsserted(() -> {
- Assert.assertEquals(managedLedger.getLedgersInfo().size(),
1);
- Assert.assertEquals(managedLedger.getState(),
ManagedLedgerImpl.State.ClosedLedger);
- });
+ .until(() -> managedLedger.getLedgersInfo().size() > 1);
final List<LedgerInfo> ledgerInfoList =
managedLedger.getLedgersInfoAsList();
- Assert.assertEquals(ledgerInfoList.size(), 1);
+ Assert.assertEquals(ledgerInfoList.size(), 2);
Assert.assertEquals(ledgerInfoList.get(0).getSize(),
managedLedger.getTotalSize());
cursor.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index f1d7a609a79..b05abf3be52 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -20,14 +20,10 @@ package org.apache.pulsar.broker.service;
import java.lang.reflect.Field;
import java.time.Duration;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
-import io.netty.buffer.ByteBufAllocator;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.OpAddEntry;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -93,10 +89,8 @@ public class CurrentLedgerRolloverIfFullTest extends
BrokerTestBase {
consumer.acknowledge(msg);
}
- MLDataFormats.ManagedLedgerInfo.LedgerInfo lastLh =
-
managedLedger.getLedgersInfoAsList().get(managedLedger.getLedgersInfoAsList().size()
- 1);
// all the messages have been acknowledged
- // and all the ledgers have been removed except the last ledger
+ // and all the ledgers have been removed except the the last ledger
Awaitility.await()
.pollInterval(Duration.ofMillis(500L))
.untilAsserted(() -> {
@@ -110,41 +104,12 @@ public class CurrentLedgerRolloverIfFullTest extends
BrokerTestBase {
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- // If there are no pending write messages, the last ledger will be
closed and still held.
+ // the last ledger will be closed and removed and we have one ledger
for empty
Awaitility.await()
.pollInterval(Duration.ofMillis(1000L))
.untilAsserted(() -> {
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
- Assert.assertEquals(lastLh.getLedgerId(),
-
managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
- });
- producer.send(new byte[1024 * 1024]);
- Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
- Assert.assertNotNull(msg);
- consumer.acknowledge(msg);
- // Assert that we got a new ledger and all but the current ledger are
deleted
- Awaitility.await()
- .untilAsserted(()-> {
-
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
- Assert.assertNotEquals(lastLh.getLedgerId(),
-
managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
- });
- long lastLhIdAfterRolloverAndSendAgain =
managedLedger.getLedgersInfoAsList().get(0).getLedgerId();
-
- // Mock pendingAddEntries
- OpAddEntry op = OpAddEntry.
- createNoRetainBuffer(managedLedger,
ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
- Field pendingAddEntries =
managedLedger.getClass().getDeclaredField("pendingAddEntries");
- pendingAddEntries.setAccessible(true);
- ConcurrentLinkedQueue<OpAddEntry> queue =
(ConcurrentLinkedQueue<OpAddEntry>) pendingAddEntries.get(managedLedger);
- queue.add(op);
- // When ml has pending write messages, ml will create a new ledger and
close and delete the previous ledger
- Awaitility.await()
- .untilAsserted(()-> {
- managedLedger.rollCurrentLedgerIfFull();
-
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
-
Assert.assertNotEquals(managedLedger.getLedgersInfoAsList().get(0).getLedgerId(),
- lastLhIdAfterRolloverAndSendAgain);
+ Assert.assertEquals(managedLedger.getTotalSize(), 0);
});
}
}
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 740fa095ac3..41a2552d04f 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -53,7 +53,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.ClosedLedger;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -196,9 +195,8 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger,
ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- Awaitility.await().untilAsserted(() -> {
-
Assert.assertTrue(managedLedger.ledgerExists(position.getLedgerId()));
- Assert.assertEquals(managedLedger.getState(), ClosedLedger);
+ Awaitility.await().until(() -> {
+ return !managedLedger.ledgerExists(position.getLedgerId());
});
}
mlTransactionLog.closeAsync().get(2, TimeUnit.SECONDS);