hangc0276 commented on code in PR #4211:
URL: https://github.com/apache/bookkeeper/pull/4211#discussion_r1495202016
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java:
##########
@@ -433,6 +445,109 @@ public void readComplete(int rc, LedgerHandle lh,
}, null);
}
+ void batchRecoverLedgerFragmentEntry(final long startEntryId,
+ final long endEntryId,
+ final LedgerHandle lh,
+ final AsyncCallback.VoidCallback
ledgerFragmentMcb,
+ final Set<BookieId> newBookies,
+ final BiConsumer<Long, Long>
onReadEntryFailureCallback)
+ throws InterruptedException {
+ int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
+ int maxBytesToReplicate = conf.getReplicationRateByBytes();
+ if (replicationThrottle != null) {
+ if (maxBytesToReplicate != -1 && maxBytesToReplicate >
averageEntrySize.get() * entriesToReplicateCnt) {
+ maxBytesToReplicate = averageEntrySize.get() *
entriesToReplicateCnt;
+ }
+ replicationThrottle.acquire(maxBytesToReplicate);
+ }
+
+ lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt,
maxBytesToReplicate,
+ new ReadCallback() {
+ @Override
+ public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("BK error reading ledger entries: {} - {}",
+ startEntryId, endEntryId,
BKException.create(rc));
+ onReadEntryFailureCallback.accept(lh.getId(),
startEntryId);
+ ledgerFragmentMcb.processResult(rc, null, null);
+ return;
+ }
+ long lastEntryId = startEntryId;
+ while (seq.hasMoreElements()) {
+ LedgerEntry entry = seq.nextElement();
+ lastEntryId = entry.getEntryId();
+ byte[] data = entry.getEntry();
+ final long dataLength = data.length;
+ numEntriesRead.inc();
+ numBytesRead.registerSuccessfulValue(dataLength);
+
+ ReferenceCounted toSend = lh.getDigestManager()
+
.computeDigestAndPackageForSending(entry.getEntryId(),
+ lh.getLastAddConfirmed(),
entry.getLength(),
+ Unpooled.wrappedBuffer(data, 0,
data.length),
+ lh.getLedgerKey(),
+ BookieProtocol.FLAG_RECOVERY_ADD);
+ if (replicationThrottle != null) {
+ if (toSend instanceof ByteBuf) {
+ updateAverageEntrySize(((ByteBuf)
toSend).readableBytes());
+ } else if (toSend instanceof ByteBufList) {
+ updateAverageEntrySize(((ByteBufList)
toSend).readableBytes());
+ }
+ }
+ AtomicInteger numCompleted = new AtomicInteger(0);
+ AtomicBoolean completed = new AtomicBoolean(false);
+
+ WriteCallback multiWriteCallback = new WriteCallback()
{
+ @Override
+ public void writeComplete(int rc, long ledgerId,
long entryId, BookieId addr, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("BK error writing entry for
ledgerId: {}, entryId: {}, bookie: {}",
+ ledgerId, entryId, addr,
BKException.create(rc));
+ if (completed.compareAndSet(false, true)) {
+ ledgerFragmentMcb.processResult(rc,
null, null);
+ }
+ } else {
+ numEntriesWritten.inc();
+ if (ctx instanceof Long) {
+
numBytesWritten.registerSuccessfulValue((Long) ctx);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Success writing ledger id
{}, entry id {} to a new bookie {}!",
+ ledgerId, entryId, addr);
+ }
+ if (numCompleted.incrementAndGet() ==
newBookies.size()
+ && completed.compareAndSet(false,
true)) {
+ ledgerFragmentMcb.processResult(rc,
null, null);
+ }
+ }
+ }
+ };
+
+ for (BookieId newBookie : newBookies) {
+ long startWriteEntryTime = MathUtils.nowInNano();
+ bkc.getBookieClient().addEntry(newBookie,
lh.getId(),
+ lh.getLedgerKey(), entry.getEntryId(),
toSend,
+ multiWriteCallback, dataLength,
BookieProtocol.FLAG_RECOVERY_ADD,
+ false, WriteFlag.NONE);
+ writeDataLatency.registerSuccessfulEvent(
+
MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
+ }
+ toSend.release();
+ }
+ if (lastEntryId == endEntryId) {
Review Comment:
Good point! Updated
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java:
##########
@@ -433,6 +445,109 @@ public void readComplete(int rc, LedgerHandle lh,
}, null);
}
+ void batchRecoverLedgerFragmentEntry(final long startEntryId,
+ final long endEntryId,
+ final LedgerHandle lh,
+ final AsyncCallback.VoidCallback
ledgerFragmentMcb,
+ final Set<BookieId> newBookies,
+ final BiConsumer<Long, Long>
onReadEntryFailureCallback)
+ throws InterruptedException {
+ int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
+ int maxBytesToReplicate = conf.getReplicationRateByBytes();
+ if (replicationThrottle != null) {
+ if (maxBytesToReplicate != -1 && maxBytesToReplicate >
averageEntrySize.get() * entriesToReplicateCnt) {
+ maxBytesToReplicate = averageEntrySize.get() *
entriesToReplicateCnt;
+ }
+ replicationThrottle.acquire(maxBytesToReplicate);
+ }
+
+ lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt,
maxBytesToReplicate,
+ new ReadCallback() {
+ @Override
+ public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("BK error reading ledger entries: {} - {}",
+ startEntryId, endEntryId,
BKException.create(rc));
+ onReadEntryFailureCallback.accept(lh.getId(),
startEntryId);
+ ledgerFragmentMcb.processResult(rc, null, null);
Review Comment:
Good point. Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]