asafm commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r942431297


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java:
##########
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.io.Closeable;
+import java.util.HashMap;
+
+/***
+ * Describes the working status of the {@link TxnLogBufferedWriter}, helps 
users tune the thresholds of
+ * {@link TxnLogBufferedWriter} for best performance.
+ * Note-1: When batch feature is turned off, no data is logged at this. In 
this scenario,users can see the
+ *    {@link org.apache.bookkeeper.mledger.ManagedLedgerMXBean}.
+ * Note-2: Even if enable batch feature, if a single record is too big, it 
still directly write to Bookie without batch,
+ *    property {@link #pulsarBatchedLogTriggeringCountByForce} can indicate 
this case. But this case will not affect
+ *    other metrics, because it would obscure the real situation. E.g. there 
has two record:
+ *    [{recordsCount=512, triggerByMaxRecordCount}, {recordCount=1, 
triggerByTooLarge}], we should not tell the users
+ *    that the average batch records is 256, if the users knows that there are 
only 256 records per batch, then users
+ *    will try to increase {@link 
TxnLogBufferedWriter#batchedWriteMaxDelayInMillis} so that there is more data 
per
+ *    batch to improve throughput, but that does not work.
+ */
+public class TxnLogBufferedWriterMetricsStats implements Closeable {
+
+    /**
+     * Key is the name we used to create {@link 
TxnLogBufferedWriterMetricsStats}, and now there are two kinds:
+     * ["pulsar_txn_tc_log", "pulsar_txn_tc_batched_log"]. There can be 
multiple labels in each
+     * {@link TxnLogBufferedWriterMetricsStats}, such as The Transaction 
Coordinator using coordinatorId as label and
+     * The Transaction Pending Ack Store using subscriptionName as label.
+     */
+    private static final HashMap<String, Collector> COLLECTOR_CACHE = new 
HashMap<>();

Review Comment:
   Ok. I can leave with that but just to be on the safe side I asked also for 
@tjiuming advice here. 



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java:
##########
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.io.Closeable;
+import java.util.HashMap;
+
+/***
+ * Describes the working status of the {@link TxnLogBufferedWriter}, helps 
users tune the thresholds of
+ * {@link TxnLogBufferedWriter} for best performance.
+ * Note-1: When batch feature is turned off, no data is logged at this. In 
this scenario,users can see the
+ *   {@link org.apache.bookkeeper.mledger.ManagedLedgerMXBean}.
+ * Note-2: Even if enable batch feature. A batch has numerous triggers. The 
metrics in this class count each type of
+ *   trigger to allow you to diagnose what mostly causing a batch flush. The 
metric also includes a histogram for delay
+ *   of batch since 1st record entered, the size of batch in bytes number of 
records in batch. This will help you to
+ *   tune the parameters that control some of the batch flush triggers: 
maxDelay, maxRecords, maxSize.
+ *   Note that the 4th trigger - a single record larger than batch size - 
triggers a flush of the current batch, but
+ *   the big record itself is not written in batch hence is not included in 
the batch metrics written above (batch
+ *   size, batch delay, etc). The trigger is of course counter as other 
trigger types.

Review Comment:
   I know I wrote it but I `counter` --> `counted`



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -210,9 +210,15 @@ public void asyncAddData(T data, AddDataCallback callback, 
Object ctx){
             return;
         }
         singleThreadExecutorForWrite.execute(() -> {
+            int recordsCountBeforeAdd = dataArray.size();
             try {
                 internalAsyncAddData(data, callback, ctx);
             } catch (Exception e){
+                // Avoid missing callback, do failed callback when error occur 
before add data to the array.
+                int recordsCountAfter = dataArray.size();
+                if (recordsCountAfter == recordsCountBeforeAdd){
+                    callback.addFailed(new 
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);

Review Comment:
   1. import static ManagedLedgerException to shorten expression.
   2. Can you please explain why did you choose the Fenced exception for this 
specific error which you don't know its nature?
   



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -210,9 +210,15 @@ public void asyncAddData(T data, AddDataCallback callback, 
Object ctx){
             return;
         }
         singleThreadExecutorForWrite.execute(() -> {
+            int recordsCountBeforeAdd = dataArray.size();
             try {
                 internalAsyncAddData(data, callback, ctx);
             } catch (Exception e){
+                // Avoid missing callback, do failed callback when error occur 
before add data to the array.
+                int recordsCountAfter = dataArray.size();
+                if (recordsCountAfter == recordsCountBeforeAdd){

Review Comment:
   `dataArray` is subject to modification by other threads: Other calling 
threads might add records to it, while you're inside `internalAsyncAddData()`, 
so you can't really the count to be equal, right?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                internalAsyncAddData(data, callback, ctx);
+            } catch (Exception e){
+                log.error("Internal async add data fail", e);
+            }
+        });
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK,  this flush event will not 
record to Metrics.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
-        int len = dataSerializer.getSerializedSize(data);
-        if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
+        int dataLength = dataSerializer.getSerializedSize(data);
+        if (dataLength >= batchedWriteMaxSize){
+            trigFlushByLargeSingleData();
             ByteBuf byteBuf = dataSerializer.serialize(data);
             managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        // Add data.
-        this.dataArray.add(data);
-        // Add callback info.
+        // Append data to the data-array.
+        dataArray.add(data);
+        // Append callback to the flushContext.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
-        this.flushContext.asyncAddArgsList.add(asyncAddArgs);
-        // Calculate bytes-size.
-        this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        flushContext.asyncAddArgsList.add(asyncAddArgs);
+        // Calculate bytes size.
+        bytesSize += dataLength;
+        trigFlushIfReachMaxRecordsOrMaxSize();
+    }
+
+    /**
+     * Change to IO thread and do flush, only called by {@link 
#timingFlushTask}.
+     */
+    private void trigFlushByTimingTask(){
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                if (flushContext.asyncAddArgsList.isEmpty()) {
+                    return;
+                }
+                if (metrics != null) {
+                    
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), 
bytesSize,
+                            System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+                }
+                doFlush();
+            } catch (Exception e){
+                log.error("Trig flush by timing task fail.", e);

Review Comment:
   We need to close this with @codelipenghui 



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -546,10 +552,9 @@ public void addCallback(AddDataCallback callback, Object 
ctx){
         }
     }
 
-    /** Callback for batch write BK. **/
-    private final BufferedAddEntryCallback bufferedAddEntryCallback = new 
BufferedAddEntryCallback();
+    private final BookKeeperBatchedWriteCallback 
bookKeeperBatchedWriteCallback = new BookKeeperBatchedWriteCallback();
 
-    private class BufferedAddEntryCallback implements 
AsyncCallbacks.AddEntryCallback{
+    private class BookKeeperBatchedWriteCallback implements 
AsyncCallbacks.AddEntryCallback{

Review Comment:
   if your callback doesn't need to access any variables located in 
`TxnLogBufferedWriter` then perhaps it's better to declare it `private static 
class`
   



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -541,8 +545,52 @@ public void recycle(){
             this.asyncAddArgsList.clear();
             this.handle.recycle(this);
         }
+
+        public void addCallback(AddDataCallback callback, Object ctx){
+            AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, 
ctx, System.currentTimeMillis());
+            asyncAddArgsList.add(asyncAddArgs);
+        }
     }
 
+    private final BookKeeperBatchedWriteCallback 
bookKeeperBatchedWriteCallback = new BookKeeperBatchedWriteCallback();

Review Comment:
   As I wrote 
[here](https://github.com/apache/pulsar/pull/16758#discussion_r939988061), this 
line should be at the beginning of `TxnLogBufferedWriter` where all the 
variables are placed.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                internalAsyncAddData(data, callback, ctx);
+            } catch (Exception e){
+                log.error("Internal async add data fail", e);
+            }
+        });
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK,  this flush event will not 
record to Metrics.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
-        int len = dataSerializer.getSerializedSize(data);
-        if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
+        int dataLength = dataSerializer.getSerializedSize(data);
+        if (dataLength >= batchedWriteMaxSize){
+            trigFlushByLargeSingleData();
             ByteBuf byteBuf = dataSerializer.serialize(data);
             managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        // Add data.
-        this.dataArray.add(data);
-        // Add callback info.
+        // Append data to the data-array.
+        dataArray.add(data);
+        // Append callback to the flushContext.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
-        this.flushContext.asyncAddArgsList.add(asyncAddArgs);
-        // Calculate bytes-size.
-        this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        flushContext.asyncAddArgsList.add(asyncAddArgs);
+        // Calculate bytes size.
+        bytesSize += dataLength;
+        trigFlushIfReachMaxRecordsOrMaxSize();
+    }
+
+    /**
+     * Change to IO thread and do flush, only called by {@link 
#timingFlushTask}.
+     */
+    private void trigFlushByTimingTask(){
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                if (flushContext.asyncAddArgsList.isEmpty()) {
+                    return;
+                }
+                if (metrics != null) {
+                    
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), 
bytesSize,
+                            System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+                }
+                doFlush();
+            } catch (Exception e){
+                log.error("Trig flush by timing task fail.", e);
+            } finally {
+                // Start the next timing task.
+                nextTimingTrigger();
+            }
+        });
+    }
+
+    /**
+     * If reach the thresholds {@link #batchedWriteMaxRecords} or {@link 
#batchedWriteMaxSize}, do flush.
+     */
+    private void trigFlushIfReachMaxRecordsOrMaxSize(){
+        if (flushContext.asyncAddArgsList.size() >= batchedWriteMaxRecords) {

Review Comment:
   Look:
   You have  threads:
   1. singleThreadExecutorForWrite which access `asyncAddArgList` (check if 
it's empty, iterates over it, ...). 
   2. `addComplete` and `addFailed` : both access `asyncAddArgList` but from 
the orderedExecutor threads which are inside `ManagedLedgerImpl`. 
   
   Thus you have concurrent access to  `asyncAddArgsList` which is not 
protected by concurrent access
            



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                internalAsyncAddData(data, callback, ctx);
+            } catch (Exception e){
+                log.error("Internal async add data fail", e);

Review Comment:
   Completly lost you here



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -143,7 +143,7 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, 
OrderedExecutor ordered
                                 DataSerializer<T> dataSerializer,
                                 int batchedWriteMaxRecords, int 
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
                                 boolean batchEnabled, 
TxnLogBufferedWriterMetricsStats metrics){
-        this.batchEnabled = batchEnabled;
+        this.batchEnabled = batchEnabled && batchedWriteMaxRecords > 1;

Review Comment:
   If you're disabling it due to misconfiguration, let the user know by a 
warning log line



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                internalAsyncAddData(data, callback, ctx);
+            } catch (Exception e){
+                log.error("Internal async add data fail", e);
+            }
+        });
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK,  this flush event will not 
record to Metrics.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
-        int len = dataSerializer.getSerializedSize(data);
-        if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
+        int dataLength = dataSerializer.getSerializedSize(data);
+        if (dataLength >= batchedWriteMaxSize){
+            trigFlushByLargeSingleData();
             ByteBuf byteBuf = dataSerializer.serialize(data);
             managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        // Add data.
-        this.dataArray.add(data);
-        // Add callback info.
+        // Append data to the data-array.
+        dataArray.add(data);
+        // Append callback to the flushContext.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
-        this.flushContext.asyncAddArgsList.add(asyncAddArgs);
-        // Calculate bytes-size.
-        this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        flushContext.asyncAddArgsList.add(asyncAddArgs);
+        // Calculate bytes size.
+        bytesSize += dataLength;
+        trigFlushIfReachMaxRecordsOrMaxSize();
+    }
+
+    /**
+     * Change to IO thread and do flush, only called by {@link 
#timingFlushTask}.
+     */
+    private void trigFlushByTimingTask(){
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                if (flushContext.asyncAddArgsList.isEmpty()) {
+                    return;
+                }
+                if (metrics != null) {
+                    
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), 
bytesSize,
+                            System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+                }
+                doFlush();
+            } catch (Exception e){
+                log.error("Trig flush by timing task fail.", e);
+            } finally {
+                // Start the next timing task.
+                nextTimingTrigger();

Review Comment:
   But @poorbarcode , here is the javadoc of `scheduleWithFixedDelay`:
   >Creates and executes a periodic action that becomes enabled first after the 
given initial delay, and subsequently with the given delay between the 
termination of one execution and the commencement of the next. If any execution 
of the task encounters an exception, subsequent executions are suppressed. 
Otherwise, the task will only terminate via cancellation or termination of the 
executor.
   
   You can clearly see it will also avoid a large number of fixed-time flush 
tasks.
   In fact, it is *exactly* the functionality you wrote using Timer 
   



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,64 +336,24 @@ private void internalAsyncAddData(T data, AddDataCallback 
callback, Object ctx){
 
     }
 
-    /**
-     * Trigger write to bookie once, If the conditions are not met, nothing 
will be done.
-     */
-    public void trigFlush(final boolean force, boolean byScheduleThreads){
-        singleThreadExecutorForWrite.execute(() -> doTrigFlush(force, 
byScheduleThreads));
-    }
-
-    private void doTrigFlush(boolean force, boolean byScheduleThreads){
-        try {
-            if (flushContext.asyncAddArgsList.isEmpty()) {
-                return;
-            }
-            if (force) {
-                doFlush();
-                return;
-            }
-            if (byScheduleThreads) {
-                doFlush();
-                return;
-            }
-            AsyncAddArgs firstAsyncAddArgs = 
flushContext.asyncAddArgsList.get(0);
-            if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >= 
batchedWriteMaxDelayInMillis) {
-                doFlush();
-                return;
-            }
-            if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords) {
-                doFlush();
-                return;
-            }
-            if (this.bytesSize >= batchedWriteMaxSize) {
-                doFlush();
-            }
-        } finally {
-            if (byScheduleThreads) {
-                nextTimingTrigger();
-            }
-        }
-    }
-
     private void doFlush(){
-        // Combine data.
-        ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
-        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
-        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
-        ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
-        ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix, 
actualContent);
+        // Combine data cached by flushContext, and write to BK.
+        ByteBuf prefixByteFuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+        prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+        prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+        ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+        ByteBuf wholeByteBuf = 
Unpooled.wrappedUnmodifiableBuffer(prefixByteFuf, contentByteBuf);

Review Comment:
   Ok, you are correct. The name confused me. This FixedCompositeByteBuf 
allocates nothing: Just an instance that contains an array of bytebuf as one 
bytebuf
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to