poorbarcode opened a new issue, #15370:
URL: https://github.com/apache/pulsar/issues/15370

   ## Motivation
   
   `ManagedLedger` provides apis for operating on Bookie, now  has only one 
implementation: `ManagedLedgerImpl`, In the case of an `Entry` only has single 
message, performance is less perfect. We should provide a feature that could 
accumulative and batch processing. Just like this: 
   
   ````java
   public void addEntry(ByteBuf buffer){
       list.add(buffer);
       if (list.size() > 512){
           doFlush();
       }
   }
   ````
   
   If Pulsar have this feature, Broker can recieve a lot request in short time, 
but only write to Bookie once, this will greatly improve performance.
   
   
   User need this feature at some case: 
   - User has a lot client, send message at regular intervals. For example: 
statistics data collect/ device data collect.
   - At transaction mode, Broker will receive more tx-request in short time. 
These requests contains `ADD_PARTITION_TO_TXN`, `ADD_SUBSCRIPTION_TO_TXN`, 
`ACK_RESPONSE` and more.
   
   ## Goal
   
   1.Provide a batched implementation for `ManagedLedger` for write.
   
   **org.apache.bookkeeper.mledger.impl.BatchManagedLedgerImpl**
   ````java
   public class BatchManagedLedgerImpl extends ManagedLedgerImpl {
   
       /** When items count reaches the threshold, do batch flush. default: 
512. **/
       private final int batchThresholdRecordCount;
       /** When data size reaches the threshold, do batch flush. default: 4m. 
**/
       private final int batchThresholdByteSize;
       /** Do batch flush scheduled at fixed time, no matter whether the 
threshold is reached. **/
       private final int batchIntervalMillis;
   
       @Override
       public void asyncAddEntry(final ByteBuf buffer, final 
AsyncCallbacks.AddEntryCallback callback,
                                 final Object context){
           // batch write
       }
   
       @Override
       public void asyncAddEntry(final ByteBuf buffer, final int 
numberOfMessages,
                                 final AsyncCallbacks.AddEntryCallback 
callback, final Object context) {
           // batch write
       }
   }
   ````
   
   2.Provide a batched implementation for `ManagedCursor` for read.
   
   **org.apache.bookkeeper.mledger.impl.BatchManagedCursorImpl**
   ````java
   public class BatchManagedCursorImpl extends ManagedCursorImpl{
   
       @Override
       public void asyncReadEntries(int numberOfEntriesToRead, 
AsyncCallbacks.ReadEntriesCallback callback, Object ctx,
                                    PositionImpl maxPosition){
           // batch read
       }
   }
   ````
   
   
   ## API Changes
   
   No API change.
   
   ## Implementation
   
   **How to cache requests and trigger flush ?**
   
   - Definition a array field at `ManagedLedgerImpl`
   - Three flush strategies
     - When requests count reach threshold
     - When requests byte size reach threshold
     - Schedule at fixed rate
   
   **How to mark the Entry is a batch Entry ?**
   
   Add a field at `BrokerEntryMetadata`:  `elementEndIndexArray`, this field 
has two meanings:
   
   - Mark the entry is a batch entry
   - Mark every inner entry's range( start at & end at )
   
   Provide another implementation for `ManagedLedgerInterceptor`, to set 
`elementEndIndexArray` value.
   
   **After batch managed ledger implementation, what else to do ?**
   - Make `MLTransactionLogImpl` batch submit.
   - Make `MLPendingAckStore` batch submit.
   - Make `Producer.publishMessage()` batch submit( User optional ).
   
   
   ## Reject Alternatives
   
   Nothing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to