This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ef4ce1385f AutoRecovery supports batch read (#4211)
ef4ce1385f is described below
commit ef4ce1385f8f59fe81cb9f47d72777356a99969d
Author: Hang Chen <[email protected]>
AuthorDate: Wed Feb 21 02:11:12 2024 +0800
AutoRecovery supports batch read (#4211)
* AutoRecovery support batch read
* Fix check style
* address comments
---
.../client/LedgerFragmentReplicator.java | 142 +++++++++++++++++++--
.../bookkeeper/conf/ClientConfiguration.java | 23 ++++
.../replication/TestReplicationWorker.java | 63 +++++++++
3 files changed, 216 insertions(+), 12 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 5cc22362ac..9f6c90d29e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -171,29 +171,41 @@ public class LedgerFragmentReplicator {
return;
}
- /*
- * Add all the entries to entriesToReplicate list from
- * firstStoredEntryId to lastStoredEntryID.
- */
- List<Long> entriesToReplicate = new LinkedList<Long>();
- long lastStoredEntryId = lf.getLastStoredEntryId();
- for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++)
{
- entriesToReplicate.add(i);
- }
/*
* Now asynchronously replicate all of the entries for the ledger
* fragment that were on the dead bookie.
*/
+ int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
MultiCallback ledgerFragmentEntryMcb = new MultiCallback(
- entriesToReplicate.size(), ledgerFragmentMcb, null,
BKException.Code.OK,
+ entriesToReplicateCnt, ledgerFragmentMcb, null,
BKException.Code.OK,
BKException.Code.LedgerRecoveryException);
if (this.replicationThrottle != null) {
this.replicationThrottle.resetRate(this.conf.getReplicationRateByBytes());
}
- for (final Long entryId : entriesToReplicate) {
- recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
+
+ if (conf.isRecoveryBatchReadEnabled()
+ && conf.getUseV2WireProtocol()
+ && conf.isBatchReadEnabled()
+ && lh.getLedgerMetadata().getEnsembleSize() ==
lh.getLedgerMetadata().getWriteQuorumSize()) {
+ batchRecoverLedgerFragmentEntry(startEntryId, endEntryId, lh,
ledgerFragmentEntryMcb,
newBookies, onReadEntryFailureCallback);
+
+ } else {
+ /*
+ * Add all the entries to entriesToReplicate list from
+ * firstStoredEntryId to lastStoredEntryID.
+ */
+ List<Long> entriesToReplicate = new LinkedList<Long>();
+ long lastStoredEntryId = lf.getLastStoredEntryId();
+ for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId;
i++) {
+ entriesToReplicate.add(i);
+ }
+ for (final Long entryId : entriesToReplicate) {
+ recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
+ newBookies, onReadEntryFailureCallback);
+ }
}
+
}
/**
@@ -433,6 +445,112 @@ public class LedgerFragmentReplicator {
}, null);
}
+ void batchRecoverLedgerFragmentEntry(final long startEntryId,
+ final long endEntryId,
+ final LedgerHandle lh,
+ final AsyncCallback.VoidCallback
ledgerFragmentMcb,
+ final Set<BookieId> newBookies,
+ final BiConsumer<Long, Long>
onReadEntryFailureCallback)
+ throws InterruptedException {
+ int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
+ int maxBytesToReplicate = conf.getReplicationRateByBytes();
+ if (replicationThrottle != null) {
+ if (maxBytesToReplicate != -1 && maxBytesToReplicate >
averageEntrySize.get() * entriesToReplicateCnt) {
+ maxBytesToReplicate = averageEntrySize.get() *
entriesToReplicateCnt;
+ }
+ replicationThrottle.acquire(maxBytesToReplicate);
+ }
+
+ lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt,
maxBytesToReplicate,
+ new ReadCallback() {
+ @Override
+ public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("BK error reading ledger entries: {} - {}",
+ startEntryId, endEntryId,
BKException.create(rc));
+ onReadEntryFailureCallback.accept(lh.getId(),
startEntryId);
+ for (int i = 0; i < entriesToReplicateCnt; i++) {
+ ledgerFragmentMcb.processResult(rc, null, null);
+ }
+ return;
+ }
+ long lastEntryId = startEntryId;
+ while (seq.hasMoreElements()) {
+ LedgerEntry entry = seq.nextElement();
+ lastEntryId = entry.getEntryId();
+ byte[] data = entry.getEntry();
+ final long dataLength = data.length;
+ numEntriesRead.inc();
+ numBytesRead.registerSuccessfulValue(dataLength);
+
+ ReferenceCounted toSend = lh.getDigestManager()
+
.computeDigestAndPackageForSending(entry.getEntryId(),
+ lh.getLastAddConfirmed(),
entry.getLength(),
+ Unpooled.wrappedBuffer(data, 0,
data.length),
+ lh.getLedgerKey(),
+ BookieProtocol.FLAG_RECOVERY_ADD);
+ if (replicationThrottle != null) {
+ if (toSend instanceof ByteBuf) {
+ updateAverageEntrySize(((ByteBuf)
toSend).readableBytes());
+ } else if (toSend instanceof ByteBufList) {
+ updateAverageEntrySize(((ByteBufList)
toSend).readableBytes());
+ }
+ }
+ AtomicInteger numCompleted = new AtomicInteger(0);
+ AtomicBoolean completed = new AtomicBoolean(false);
+
+ WriteCallback multiWriteCallback = new WriteCallback()
{
+ @Override
+ public void writeComplete(int rc, long ledgerId,
long entryId, BookieId addr, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("BK error writing entry for
ledgerId: {}, entryId: {}, bookie: {}",
+ ledgerId, entryId, addr,
BKException.create(rc));
+ if (completed.compareAndSet(false, true)) {
+ ledgerFragmentMcb.processResult(rc,
null, null);
+ }
+ } else {
+ numEntriesWritten.inc();
+ if (ctx instanceof Long) {
+
numBytesWritten.registerSuccessfulValue((Long) ctx);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Success writing ledger id
{}, entry id {} to a new bookie {}!",
+ ledgerId, entryId, addr);
+ }
+ if (numCompleted.incrementAndGet() ==
newBookies.size()
+ && completed.compareAndSet(false,
true)) {
+ ledgerFragmentMcb.processResult(rc,
null, null);
+ }
+ }
+ }
+ };
+
+ for (BookieId newBookie : newBookies) {
+ long startWriteEntryTime = MathUtils.nowInNano();
+ bkc.getBookieClient().addEntry(newBookie,
lh.getId(),
+ lh.getLedgerKey(), entry.getEntryId(),
toSend,
+ multiWriteCallback, dataLength,
BookieProtocol.FLAG_RECOVERY_ADD,
+ false, WriteFlag.NONE);
+ writeDataLatency.registerSuccessfulEvent(
+
MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
+ }
+ toSend.release();
+ }
+ if (lastEntryId != endEntryId) {
+ try {
+ batchRecoverLedgerFragmentEntry(lastEntryId + 1,
endEntryId, lh,
+ ledgerFragmentMcb, newBookies,
onReadEntryFailureCallback);
+ } catch (InterruptedException e) {
+ int remainingEntries = (int) (endEntryId -
lastEntryId);
+ for (int i = 0; i < remainingEntries; i++) {
+
ledgerFragmentMcb.processResult(BKException.Code.InterruptedException, null,
null);
+ }
+ }
+ }
+ }
+ }, null);
+ }
+
private void updateAverageEntrySize(int toSendSize) {
averageEntrySize.updateAndGet(value -> (int) (value *
AVERAGE_ENTRY_SIZE_RATIO
+ (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 924dee4ada..03eb6d1abd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -115,6 +115,7 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
protected static final String RECOVERY_READ_BATCH_SIZE =
"recoveryReadBatchSize";
protected static final String REORDER_READ_SEQUENCE_ENABLED =
"reorderReadSequenceEnabled";
protected static final String STICKY_READS_ENABLED = "stickyReadSEnabled";
+ protected static final String RECOVERY_BATCH_READ_ENABLED =
"recoveryBatchReadEnabled";
// Add Parameters
protected static final String OPPORTUNISTIC_STRIPING =
"opportunisticStriping";
protected static final String DELAY_ENSEMBLE_CHANGE =
"delayEnsembleChange";
@@ -1203,6 +1204,23 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
return this;
}
+ /**
+ * If recovery batch read enabled or not.
+ * @return
+ */
+ public boolean isRecoveryBatchReadEnabled() {
+ return getBoolean(RECOVERY_BATCH_READ_ENABLED, false);
+ }
+
+ /**
+ * Enable/disable recovery batch read.
+ * @param enabled
+ * @return
+ */
+ public ClientConfiguration setRecoveryBatchReadEnabled(boolean enabled) {
+ setProperty(RECOVERY_BATCH_READ_ENABLED, enabled);
+ return this;
+ }
/**
* Get Ensemble Placement Policy Class.
*
@@ -2084,6 +2102,11 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
return getBoolean(BATCH_READ_ENABLED, true);
}
+ public ClientConfiguration setBatchReadEnabled(boolean enabled) {
+ setProperty(BATCH_READ_ENABLED, enabled);
+ return this;
+ }
+
@Override
protected ClientConfiguration getThis() {
return this;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index a991423389..f4a9245c76 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -437,6 +437,69 @@ public class TestReplicationWorker extends
BookKeeperClusterTestCase {
}
+ @Test
+ public void testMultipleLedgerReplicationWithReplicationWorkerBatchRead()
throws Exception {
+ LedgerHandle lh1 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
TESTPASSWD);
+ for (int i = 0; i < 200; ++i) {
+ lh1.addEntry(data);
+ }
+ BookieId replicaToKillFromFirstLedger =
lh1.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
+
+ LedgerHandle lh2 = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
TESTPASSWD);
+ for (int i = 0; i < 200; ++i) {
+ lh2.addEntry(data);
+ }
+
+ BookieId replicaToKillFromSecondLedger =
lh2.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
+
+ LOG.info("Killing Bookie : {}", replicaToKillFromFirstLedger);
+ killBookie(replicaToKillFromFirstLedger);
+ lh1.close();
+
+ LOG.info("Killing Bookie : {}", replicaToKillFromSecondLedger);
+ killBookie(replicaToKillFromSecondLedger);
+ lh2.close();
+
+ BookieId newBkAddr = startNewBookieAndReturnBookieId();
+ LOG.info("New Bookie addr : {}", newBkAddr);
+
+ if (replicaToKillFromFirstLedger != replicaToKillFromSecondLedger) {
+ BookieId newBkAddr2 = startNewBookieAndReturnBookieId();
+ LOG.info("New Bookie addr : {}", newBkAddr2);
+ }
+
+ ClientConfiguration clientConfiguration = new
ClientConfiguration(baseClientConf);
+ clientConfiguration.setUseV2WireProtocol(true);
+ clientConfiguration.setRecoveryBatchReadEnabled(true);
+ clientConfiguration.setBatchReadEnabled(true);
+ clientConfiguration.setRereplicationEntryBatchSize(100);
+ clientConfiguration.setReplicationRateByBytes(3 * 1024);
+ ReplicationWorker rw = new ReplicationWorker(new
ServerConfiguration(clientConfiguration));
+
+ rw.start();
+ try {
+ // Mark ledger1 and ledger2 as underreplicated
+ underReplicationManager.markLedgerUnderreplicated(lh1.getId(),
replicaToKillFromFirstLedger.toString());
+ underReplicationManager.markLedgerUnderreplicated(lh2.getId(),
replicaToKillFromSecondLedger.toString());
+
+ while (ReplicationTestUtil.isLedgerInUnderReplication(zkc,
lh1.getId(), basePath)) {
+ Thread.sleep(100);
+ }
+
+ while (ReplicationTestUtil.isLedgerInUnderReplication(zkc,
lh2.getId(), basePath)) {
+ Thread.sleep(100);
+ }
+
+ killAllBookies(lh1, newBkAddr);
+
+ // Should be able to read the entries from 0-99
+ verifyRecoveredLedgers(lh1, 0, 199);
+ verifyRecoveredLedgers(lh2, 0, 199);
+ } finally {
+ rw.shutdown();
+ }
+ }
+
/**
* Tests that ReplicationWorker should fence the ledger and release ledger
* lock after timeout. Then replication should happen normally.