eolivelli commented on code in PR #3846:
URL: https://github.com/apache/bookkeeper/pull/3846#discussion_r1136657665
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java:
##########
@@ -1087,6 +1096,80 @@ public void addEntry(ByteBuf entry, boolean
ackBeforeSync, WriteCallback cb, Obj
}
}
+ public void addEntryList(List<ParsedAddRequest> requests, boolean
ackBeforeSync,
+ WriteCallback cb, Object ctx, RequestStats
requestStats) throws InterruptedException {
+ long requestNans = MathUtils.nowInNano();
+ boolean hasFailedRequests = false;
+ Map<Pair<Long, byte[]>, LedgerDescriptor> handleMap = new HashMap<>();
+ ListIterator<ParsedAddRequest> iter = requests.listIterator();
+ while (iter.hasNext()) {
+ ParsedAddRequest request = iter.next();
+ int rc = BookieProtocol.EOK;
+ try {
+ Pair<Long, byte[]> ledgerIdMasterKey =
Pair.of(request.getLedgerId(), request.getMasterKey());
+ LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey);
+ if (handle == null) {
+ handle = getLedgerForEntry(request.getData(),
request.getMasterKey());
+ handleMap.put(ledgerIdMasterKey, handle);
+ }
+
+ synchronized (handle) {
+ if (handle.isFenced()) {
+ throw
BookieException.create(BookieException.Code.LedgerFencedException);
+ }
+
+ addEntryInternal(handle, request.getData(), ackBeforeSync,
+ cb, ctx, request.getMasterKey(), false);
+ }
+ } catch (BookieException.OperationRejectedException e) {
+ requestStats.getAddEntryRejectedCounter().inc();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Operation rejected while writing {} ", request,
e);
+ }
+ rc = BookieProtocol.ETOOMANYREQUESTS;
+ } catch (IOException e) {
+ LOG.error("Error writing {}", request, e);
+ rc = BookieProtocol.EIO;
+ } catch (BookieException.LedgerFencedException lfe) {
+ LOG.error("Attempt to write to fenced ledger ", lfe);
+ rc = BookieProtocol.EFENCED;
+ } catch (BookieException e) {
+ LOG.error("Unauthorized access to ledger {}",
request.getLedgerId(), e);
+ rc = BookieProtocol.EUA;
+ } catch (Throwable t) {
+ LOG.error("Unexpected exception while writing {}@{} : {} ",
+ request.getLedgerId(), request.getEntryId(),
t.getMessage(), t);
+ rc = BookieProtocol.EBADREQ;
+ }
+
+ if (rc != BookieProtocol.EOK) {
+ hasFailedRequests = true;
+ requestStats.getAddEntryStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(requestNans),
TimeUnit.NANOSECONDS);
+ cb.writeComplete(rc, request.getLedgerId(),
request.getEntryId(), null, ctx);
+ iter.remove();
+ request.release();
+ request.recycle();
+ }
+ }
+ handleMap.clear();
+
+ if (hasFailedRequests && requestProcessor != null) {
+ requestProcessor.flushPendingResponses();
+ }
+
+ if (writeDataToJournal && !requests.isEmpty()) {
+ List<ByteBuf> entries = requests.stream()
+ .map(ParsedAddRequest::getData).collect(Collectors.toList());
+ getJournal(requests.get(0).getLedgerId()).logAddEntry(entries,
ackBeforeSync, cb, ctx);
+ }
+
+ requests.forEach(t -> {
Review Comment:
put this into a "finally" block ?
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -885,6 +887,26 @@ public void logAddEntry(ByteBuf entry, boolean
ackBeforeSync, WriteCallback cb,
logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
}
+ public void logAddEntry(List<ByteBuf> entries, boolean ackBeforeSync,
WriteCallback cb, Object ctx)
Review Comment:
nit: logAddEntries
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java:
##########
@@ -1087,6 +1096,80 @@ public void addEntry(ByteBuf entry, boolean
ackBeforeSync, WriteCallback cb, Obj
}
}
+ public void addEntryList(List<ParsedAddRequest> requests, boolean
ackBeforeSync,
+ WriteCallback cb, Object ctx, RequestStats
requestStats) throws InterruptedException {
+ long requestNans = MathUtils.nowInNano();
+ boolean hasFailedRequests = false;
+ Map<Pair<Long, byte[]>, LedgerDescriptor> handleMap = new HashMap<>();
+ ListIterator<ParsedAddRequest> iter = requests.listIterator();
+ while (iter.hasNext()) {
+ ParsedAddRequest request = iter.next();
+ int rc = BookieProtocol.EOK;
+ try {
+ Pair<Long, byte[]> ledgerIdMasterKey =
Pair.of(request.getLedgerId(), request.getMasterKey());
+ LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey);
+ if (handle == null) {
+ handle = getLedgerForEntry(request.getData(),
request.getMasterKey());
+ handleMap.put(ledgerIdMasterKey, handle);
+ }
+
+ synchronized (handle) {
+ if (handle.isFenced()) {
+ throw
BookieException.create(BookieException.Code.LedgerFencedException);
+ }
+
+ addEntryInternal(handle, request.getData(), ackBeforeSync,
+ cb, ctx, request.getMasterKey(), false);
+ }
+ } catch (BookieException.OperationRejectedException e) {
+ requestStats.getAddEntryRejectedCounter().inc();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Operation rejected while writing {} ", request,
e);
+ }
+ rc = BookieProtocol.ETOOMANYREQUESTS;
+ } catch (IOException e) {
+ LOG.error("Error writing {}", request, e);
+ rc = BookieProtocol.EIO;
+ } catch (BookieException.LedgerFencedException lfe) {
+ LOG.error("Attempt to write to fenced ledger ", lfe);
+ rc = BookieProtocol.EFENCED;
+ } catch (BookieException e) {
+ LOG.error("Unauthorized access to ledger {}",
request.getLedgerId(), e);
+ rc = BookieProtocol.EUA;
+ } catch (Throwable t) {
+ LOG.error("Unexpected exception while writing {}@{} : {} ",
+ request.getLedgerId(), request.getEntryId(),
t.getMessage(), t);
+ rc = BookieProtocol.EBADREQ;
+ }
+
+ if (rc != BookieProtocol.EOK) {
+ hasFailedRequests = true;
+ requestStats.getAddEntryStats()
+ .registerFailedEvent(MathUtils.elapsedNanos(requestNans),
TimeUnit.NANOSECONDS);
+ cb.writeComplete(rc, request.getLedgerId(),
request.getEntryId(), null, ctx);
+ iter.remove();
+ request.release();
+ request.recycle();
+ }
+ }
+ handleMap.clear();
+
+ if (hasFailedRequests && requestProcessor != null) {
+ requestProcessor.flushPendingResponses();
+ }
+
+ if (writeDataToJournal && !requests.isEmpty()) {
+ List<ByteBuf> entries = requests.stream()
+ .map(ParsedAddRequest::getData).collect(Collectors.toList());
+ getJournal(requests.get(0).getLedgerId()).logAddEntry(entries,
ackBeforeSync, cb, ctx);
Review Comment:
here we are assuming that all the entries are for the same ledger.
Can we add some assertions ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]