hangc0276 commented on code in PR #3846:
URL: https://github.com/apache/bookkeeper/pull/3846#discussion_r1137992803


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java:
##########
@@ -1087,6 +1096,80 @@ public void addEntry(ByteBuf entry, boolean 
ackBeforeSync, WriteCallback cb, Obj
         }
     }
 
+    public void addEntryList(List<ParsedAddRequest> requests, boolean 
ackBeforeSync,
+                         WriteCallback cb, Object ctx, RequestStats 
requestStats) throws InterruptedException {
+        long requestNans = MathUtils.nowInNano();
+        boolean hasFailedRequests = false;
+        Map<Pair<Long, byte[]>, LedgerDescriptor> handleMap = new HashMap<>();
+        ListIterator<ParsedAddRequest> iter = requests.listIterator();
+        while (iter.hasNext()) {
+            ParsedAddRequest request = iter.next();
+            int rc = BookieProtocol.EOK;
+            try {
+                Pair<Long, byte[]> ledgerIdMasterKey = 
Pair.of(request.getLedgerId(), request.getMasterKey());
+                LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey);
+                if (handle == null) {
+                    handle = getLedgerForEntry(request.getData(), 
request.getMasterKey());
+                    handleMap.put(ledgerIdMasterKey, handle);
+                }
+
+                synchronized (handle) {
+                    if (handle.isFenced()) {
+                        throw 
BookieException.create(BookieException.Code.LedgerFencedException);
+                    }
+
+                    addEntryInternal(handle, request.getData(), ackBeforeSync,
+                        cb, ctx, request.getMasterKey(), false);
+                }
+            } catch (BookieException.OperationRejectedException e) {
+                requestStats.getAddEntryRejectedCounter().inc();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Operation rejected while writing {} ", request, 
e);
+                }
+                rc = BookieProtocol.ETOOMANYREQUESTS;
+            } catch (IOException e) {
+                LOG.error("Error writing {}", request, e);
+                rc = BookieProtocol.EIO;
+            } catch (BookieException.LedgerFencedException lfe) {
+                LOG.error("Attempt to write to fenced ledger ", lfe);
+                rc = BookieProtocol.EFENCED;
+            } catch (BookieException e) {
+                LOG.error("Unauthorized access to ledger {}", 
request.getLedgerId(), e);
+                rc = BookieProtocol.EUA;
+            } catch (Throwable t) {
+                LOG.error("Unexpected exception while writing {}@{} : {} ",
+                    request.getLedgerId(), request.getEntryId(), 
t.getMessage(), t);
+                rc = BookieProtocol.EBADREQ;
+            }
+
+            if (rc != BookieProtocol.EOK) {
+                hasFailedRequests = true;
+                requestStats.getAddEntryStats()
+                    .registerFailedEvent(MathUtils.elapsedNanos(requestNans), 
TimeUnit.NANOSECONDS);
+                cb.writeComplete(rc, request.getLedgerId(), 
request.getEntryId(), null, ctx);
+                iter.remove();
+                request.release();
+                request.recycle();
+            }
+        }
+        handleMap.clear();
+
+        if (hasFailedRequests && requestProcessor != null) {
+            requestProcessor.flushPendingResponses();
+        }
+
+        if (writeDataToJournal && !requests.isEmpty()) {
+            List<ByteBuf> entries = requests.stream()
+                .map(ParsedAddRequest::getData).collect(Collectors.toList());
+            getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, 
ackBeforeSync, cb, ctx);

Review Comment:
   We don't assume all the entries belong to the same ledger. When we write 
entries in the journal, we don't need to write all the ledger's data into the 
same journal directory. The goal of `getJournal(requests.get(0).getLedgerId())` 
is to balance the write throughput between journals.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to