zymap commented on code in PR #3846:
URL: https://github.com/apache/bookkeeper/pull/3846#discussion_r1132115816


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java:
##########
@@ -47,4 +49,11 @@ public interface RequestProcessor extends AutoCloseable {
      * Flush any pending response staged on all the client connections.
      */
     void flushPendingResponses();
+
+    /**
+     * Process a list of ParsedAddRequests.
+     * @param r
+     * @param channel
+     */
+    void processAddRequest(List<BookieProtocol.ParsedAddRequest> r, 
BookieRequestHandler channel);

Review Comment:
   Looks like `void processRequest(Object r, BookieRequestHandler channel);` is 
the same as the new one. Because the argument is an object, we can reuse that 
method



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java:
##########
@@ -1087,6 +1093,114 @@ public void addEntry(ByteBuf entry, boolean 
ackBeforeSync, WriteCallback cb, Obj
         }
     }
 
+    public void addEntryList(List<ParsedAddRequest> requests, boolean 
ackBeforeSync,
+                         WriteCallback cb, Object ctx, RequestStats 
requestStats) throws InterruptedException {
+        long requestNans = MathUtils.nowInNano();
+        boolean hasFailedRequests = false;
+        Map<Pair<Long, byte[]>, LedgerDescriptor> handleMap = new HashMap<>();
+        ListIterator<ParsedAddRequest> iter = requests.listIterator();
+        while (iter.hasNext()) {
+            ParsedAddRequest request = iter.next();
+            int rc = BookieProtocol.EOK;
+            try {
+                Pair<Long, byte[]> ledgerIdMasterKey = 
Pair.of(request.getLedgerId(), request.getMasterKey());
+                LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey);
+                if (handle == null) {
+                    handle = getLedgerForEntry(request.getData(), 
request.getMasterKey());
+                    handleMap.put(ledgerIdMasterKey, handle);
+                }
+
+                synchronized (handle) {
+                    if (handle.isFenced()) {
+                        throw 
BookieException.create(BookieException.Code.LedgerFencedException);
+                    }
+
+                    addEntryInternalWithoutJournal(handle, request.getData(), 
ackBeforeSync,
+                        cb, ctx, request.getMasterKey());
+                }
+            } catch (BookieException.OperationRejectedException e) {
+                requestStats.getAddEntryRejectedCounter().inc();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Operation rejected while writing {} ", request, 
e);
+                }
+                rc = BookieProtocol.ETOOMANYREQUESTS;
+            } catch (IOException e) {
+                LOG.error("Error writing {}", request, e);
+                rc = BookieProtocol.EIO;
+            } catch (BookieException.LedgerFencedException lfe) {
+                LOG.error("Attempt to write to fenced ledger ", lfe);
+                rc = BookieProtocol.EFENCED;
+            } catch (BookieException e) {
+                LOG.error("Unauthorized access to ledger {}", 
request.getLedgerId(), e);
+                rc = BookieProtocol.EUA;
+            } catch (Throwable t) {
+                LOG.error("Unexpected exception while writing {}@{} : {} ",
+                    request.getLedgerId(), request.getEntryId(), 
t.getMessage(), t);
+                rc = BookieProtocol.EBADREQ;
+            }
+
+            if (rc != BookieProtocol.EOK) {
+                hasFailedRequests = true;
+                requestStats.getAddEntryStats()
+                    .registerFailedEvent(MathUtils.elapsedNanos(requestNans), 
TimeUnit.NANOSECONDS);
+                cb.writeComplete(rc, request.getLedgerId(), 
request.getEntryId(), null, ctx);
+                iter.remove();
+                request.release();
+                request.recycle();
+            }
+        }
+        handleMap.clear();
+
+        if (hasFailedRequests && requestProcessor != null) {
+            requestProcessor.flushPendingResponses();
+        }
+
+        if (writeDataToJournal && !requests.isEmpty()) {
+            List<ByteBuf> entries = requests.stream()
+                .map(ParsedAddRequest::getData).collect(Collectors.toList());
+            getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, 
ackBeforeSync, cb, ctx);
+
+            requests.forEach(t -> {
+                t.release();
+                t.recycle();
+            });
+        }
+    }
+
+    private void addEntryInternalWithoutJournal(LedgerDescriptor handle, 
ByteBuf entry,

Review Comment:
   We can easily add a flag at the original logic to avoid unnecessary 
duplicated code.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -885,6 +888,25 @@ public void logAddEntry(ByteBuf entry, boolean 
ackBeforeSync, WriteCallback cb,
         logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
     }
 
+    public void logAddEntry(List<ByteBuf> entries, boolean ackBeforeSync, 
WriteCallback cb, Object ctx)
+        throws InterruptedException {
+        AtomicLong reserveMemory = new AtomicLong();
+        QueueEntry[] queueEntries = new QueueEntry[entries.size()];
+        for (int i = 0; i < entries.size(); ++i) {
+            ByteBuf entry = entries.get(i);
+            long ledgerId = entry.getLong(entry.readerIndex());
+            long entryId = entry.getLong(entry.readerIndex() + 8);
+            entry.retain();
+            reserveMemory.addAndGet(entry.readableBytes());
+            queueEntries[i] = QueueEntry.create(entry, ackBeforeSync, 
ledgerId, entryId, cb, ctx,
+                MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), 
callbackTime);
+        }
+
+        memoryLimitController.releaseMemory(reserveMemory.get());

Review Comment:
   reserveMemory?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java:
##########
@@ -229,6 +230,16 @@ protected void onAddRequestFinish() {
         }
     }
 
+    protected void onAddRequestFinishWithoutUnTrack() {
+        if (addsSemaphore != null) {
+            addsSemaphore.release();
+        }
+    }
+
+    protected void onAddRequestUnTrack() {
+        requestStats.untrackAddRequest();
+    }

Review Comment:
   You need to track N add requests at onAddRequestStart, and untrack it when 
onAddRequestFinish.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java:
##########
@@ -87,7 +97,32 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
             ctx.fireChannelRead(msg);
             return;
         }
-        requestProcessor.processRequest(msg, this);
+
+        if (msg instanceof BookieProtocol.ParsedAddRequest
+            && ADDENTRY == ((BookieProtocol.ParsedAddRequest) msg).getOpCode()
+            && !((BookieProtocol.ParsedAddRequest) msg).isHighPriority()
+            && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == 
BookieProtocol.CURRENT_PROTOCOL_VERSION
+            && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) {
+            msgs.put((BookieProtocol.ParsedAddRequest) msg);

Review Comment:
   Put is a blocking operation. We should check the size first and then put the 
message into the queue.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java:
##########
@@ -87,7 +97,32 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
             ctx.fireChannelRead(msg);
             return;
         }
-        requestProcessor.processRequest(msg, this);
+
+        if (msg instanceof BookieProtocol.ParsedAddRequest
+            && ADDENTRY == ((BookieProtocol.ParsedAddRequest) msg).getOpCode()
+            && !((BookieProtocol.ParsedAddRequest) msg).isHighPriority()
+            && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == 
BookieProtocol.CURRENT_PROTOCOL_VERSION
+            && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) {
+            msgs.put((BookieProtocol.ParsedAddRequest) msg);
+            if (msgs.size() >= maxCapacity) {
+                int count = msgs.size();
+                List<BookieProtocol.ParsedAddRequest> c = new 
ArrayList<>(count);
+                msgs.drainTo(c, count);
+                requestProcessor.processAddRequest(c, this);
+            }

Review Comment:
   ```suggestion
               if (!msgs.offer(msg)) {
                   ctx.fireChannelReadComplete();
               }
               msgs.put(msg); 
   ```



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java:
##########
@@ -729,4 +740,28 @@ public boolean isBlacklisted(Channel channel) {
     public void handleNonWritableChannel(Channel channel) {
         onResponseTimeout.accept(channel);
     }
+
+    @Override
+    public void processAddRequest(List<BookieProtocol.ParsedAddRequest> msgs, 
BookieRequestHandler requestHandler) {
+        WriteBatchEntryProcessor write = WriteBatchEntryProcessor.create(msgs, 
requestHandler, this);

Review Comment:
   You can use[ the original 
method](https://github.com/apache/bookkeeper/pull/3846/files#diff-380ca68ed5ce21fb226c59b8f9c9bd7c7be70da37091ea2c55db89cc4125e578R318)
 and when the Object msg is `List<BookieProtocol.ParsedAddRequest>` then call 
here. They do the same thing just in different ways, so let's reuse the 
existing method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to