asafm commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r937388882


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -201,30 +213,93 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
         singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
+        // If param-data is too large.

Review Comment:
   Instead of writing this comment, just write the code to look like the 
comment:
   ```
   int dataLength = dataSerializer.getSerializedSize(data);
           if (dataLength >= batchedWriteMaxSize){
   ```
   
   Whenever you stop to read a comment, you break your concentration. 



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -201,30 +213,93 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
         singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
+        // If param-data is too large.
         int len = dataSerializer.getSerializedSize(data);
         if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
-            ByteBuf byteBuf = dataSerializer.serialize(data);
-            managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
-                    AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
+            trigFlushByLargeSingleData(data, callback, ctx);
             return;
         }
-        // Add data.
+        // Append data to queue.
         this.dataArray.add(data);
         // Add callback info.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
         this.flushContext.asyncAddArgsList.add(asyncAddArgs);
         // Calculate bytes-size.
         this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        // trig flush by max records or max size.
+        trigFlushIfReachMaxRecordsOrMaxSize();
+    }
+
+    /**
+     * Change to IO thread and do flush, only called by {@link 
#timingFlushTask}.
+     */
+    private void trigFlushByTimingTask(){
+        singleThreadExecutorForWrite.execute(() -> {
+            if (flushContext.asyncAddArgsList.isEmpty()) {
+                return;
+            }
+            if (metrics != null) {
+                
metrics.triggerFlushByByMaxDelay(this.flushContext.asyncAddArgsList.size(), 
this.bytesSize,
+                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+            }
+            doFlush();
+            // Start the next timing task.
+            nextTimingTrigger();
+        });
+    }
+
+    /**
+     * If reach the thresholds {@link #batchedWriteMaxRecords} or {@link 
#batchedWriteMaxSize}, do flush.
+     */
+    private void trigFlushIfReachMaxRecordsOrMaxSize(){
+        if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords) {
+            if (metrics != null) {
+                
metrics.triggerFlushByRecordsCount(this.flushContext.asyncAddArgsList.size(), 
this.bytesSize,
+                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+            }
+            doFlush();
+            return;
+        }
+        if (this.bytesSize >= batchedWriteMaxSize) {
+            if (metrics != null) {
+                
metrics.triggerFlushByBytesSize(this.flushContext.asyncAddArgsList.size(), 
this.bytesSize,
+                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+            }
+            doFlush();
+        }
+    }
+
+    /**

Review Comment:
   You already wrote that comment above. Only add information that hasn't been 
added there



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -201,30 +213,93 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
         singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
+        // If param-data is too large.
         int len = dataSerializer.getSerializedSize(data);
         if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
-            ByteBuf byteBuf = dataSerializer.serialize(data);
-            managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
-                    AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
+            trigFlushByLargeSingleData(data, callback, ctx);
             return;
         }
-        // Add data.
+        // Append data to queue.
         this.dataArray.add(data);
         // Add callback info.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
         this.flushContext.asyncAddArgsList.add(asyncAddArgs);
         // Calculate bytes-size.
         this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        // trig flush by max records or max size.
+        trigFlushIfReachMaxRecordsOrMaxSize();
+    }
+
+    /**
+     * Change to IO thread and do flush, only called by {@link 
#timingFlushTask}.
+     */
+    private void trigFlushByTimingTask(){
+        singleThreadExecutorForWrite.execute(() -> {
+            if (flushContext.asyncAddArgsList.isEmpty()) {
+                return;
+            }
+            if (metrics != null) {
+                
metrics.triggerFlushByByMaxDelay(this.flushContext.asyncAddArgsList.size(), 
this.bytesSize,
+                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+            }
+            doFlush();
+            // Start the next timing task.
+            nextTimingTrigger();
+        });
+    }
+
+    /**
+     * If reach the thresholds {@link #batchedWriteMaxRecords} or {@link 
#batchedWriteMaxSize}, do flush.
+     */
+    private void trigFlushIfReachMaxRecordsOrMaxSize(){
+        if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords) {

Review Comment:
   Why do write `this.` ?



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".

Review Comment:
   Comment not needed - you have method name for it



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".
+     */
+    @Test
+    public void testMetricsStatsThatTriggeredByMaxRecordCount() throws 
Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger and write counter.
+        Pair<ManagedLedger, AtomicInteger> mlAndWriteCounter = 
mockManagedLedgerWithWriteCounter(mlName);
+        ManagedLedger managedLedger = mlAndWriteCounter.getLeft();
+        AtomicInteger batchFlushCounter = mlAndWriteCounter.getRight();
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("tx-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                new SumStrDataSerializer(), 2, 1024,
+                1000 * 3600, true, metricsStats);
+        // Add some data.
+        int writeCount = 100;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all write finish.
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        /** Assert metrics stats correct. **/
+        verifyTheCounterMetrics(writeCount / 2,0,0,0);
+        verifyTheHistogramMetrics(batchFlushCounter.get(), writeCount, 
writeCount * 4);

Review Comment:
   Now I need to create below what each parameter is named
   what is 4? why *4? Tie together to the params of the bufered writer
   
   ```
   
verifyTriggeredByMaxSizeMetricEqualsTo(mockAddDataCallback.flushCounter.get())
   ```
   Do this call for each you verify



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".
+     */
+    @Test
+    public void testMetricsStatsThatTriggeredByMaxRecordCount() throws 
Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger and write counter.
+        Pair<ManagedLedger, AtomicInteger> mlAndWriteCounter = 
mockManagedLedgerWithWriteCounter(mlName);
+        ManagedLedger managedLedger = mlAndWriteCounter.getLeft();
+        AtomicInteger batchFlushCounter = mlAndWriteCounter.getRight();
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("tx-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                new SumStrDataSerializer(), 2, 1024,
+                1000 * 3600, true, metricsStats);
+        // Add some data.
+        int writeCount = 100;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all write finish.
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);

Review Comment:
   import static assert
   Wonder why we don't use AssertJ as in 
`assertThat(addDataCallbackFailureCount.get()).equalsTo(0)`



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -116,6 +116,16 @@
             AtomicReferenceFieldUpdater
                     .newUpdater(TxnLogBufferedWriter.class, 
TxnLogBufferedWriter.State.class, "state");
 
+    /** Metrics. **/
+    private final TxnLogBufferedWriterMetricsStats metricsStats;
+
+    public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor 
orderedExecutor, Timer timer,

Review Comment:
   Ok. So the `if metrics != null` will go away as well, right?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -201,30 +213,93 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
         singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
+        // If param-data is too large.
         int len = dataSerializer.getSerializedSize(data);
         if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
-            ByteBuf byteBuf = dataSerializer.serialize(data);
-            managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
-                    AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
+            trigFlushByLargeSingleData(data, callback, ctx);
             return;
         }
-        // Add data.
+        // Append data to queue.

Review Comment:
   Change the code below to look like the comment
   ```
   queue.add(data);
   ```
   
   This suggestion applies to all comments in this class.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -201,30 +213,93 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
         singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
+        // If param-data is too large.
         int len = dataSerializer.getSerializedSize(data);
         if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
-            ByteBuf byteBuf = dataSerializer.serialize(data);
-            managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
-                    AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
+            trigFlushByLargeSingleData(data, callback, ctx);
             return;
         }
-        // Add data.
+        // Append data to queue.
         this.dataArray.add(data);
         // Add callback info.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
         this.flushContext.asyncAddArgsList.add(asyncAddArgs);
         // Calculate bytes-size.
         this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        // trig flush by max records or max size.

Review Comment:
   You method name looks exactly like your comment, just with spaces - it's 
redundant.
   



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -201,30 +213,93 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
         singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
+        // If param-data is too large.
         int len = dataSerializer.getSerializedSize(data);
         if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
-            ByteBuf byteBuf = dataSerializer.serialize(data);
-            managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
-                    AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
+            trigFlushByLargeSingleData(data, callback, ctx);
             return;
         }
-        // Add data.
+        // Append data to queue.
         this.dataArray.add(data);
         // Add callback info.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
         this.flushContext.asyncAddArgsList.add(asyncAddArgs);
         // Calculate bytes-size.
         this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        // trig flush by max records or max size.
+        trigFlushIfReachMaxRecordsOrMaxSize();
+    }
+
+    /**
+     * Change to IO thread and do flush, only called by {@link 
#timingFlushTask}.
+     */
+    private void trigFlushByTimingTask(){
+        singleThreadExecutorForWrite.execute(() -> {
+            if (flushContext.asyncAddArgsList.isEmpty()) {
+                return;
+            }
+            if (metrics != null) {
+                
metrics.triggerFlushByByMaxDelay(this.flushContext.asyncAddArgsList.size(), 
this.bytesSize,
+                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+            }
+            doFlush();
+            // Start the next timing task.
+            nextTimingTrigger();
+        });
+    }
+
+    /**
+     * If reach the thresholds {@link #batchedWriteMaxRecords} or {@link 
#batchedWriteMaxSize}, do flush.
+     */
+    private void trigFlushIfReachMaxRecordsOrMaxSize(){
+        if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords) {
+            if (metrics != null) {
+                
metrics.triggerFlushByRecordsCount(this.flushContext.asyncAddArgsList.size(), 
this.bytesSize,
+                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+            }
+            doFlush();
+            return;
+        }
+        if (this.bytesSize >= batchedWriteMaxSize) {
+            if (metrics != null) {
+                
metrics.triggerFlushByBytesSize(this.flushContext.asyncAddArgsList.size(), 
this.bytesSize,
+                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+            }
+            doFlush();
+        }
+    }
+
+    /**
+     * If method {@link #asyncAddData(Object, AddDataCallback, Object)} accept 
a request that {@param data} is too
+     * large (larger than {@link #batchedWriteMaxSize}), then two flushes:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Directly write the large data to BK, this flush event will not 
record to Metrics.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
+    private void trigFlushByLargeSingleData(T data, AddDataCallback callback, 
Object ctx){
+        if (!flushContext.asyncAddArgsList.isEmpty()) {
+            if (metrics != null) {
+                
metrics.triggerFlushByLargeSingleData(this.flushContext.asyncAddArgsList.size(),
 this.bytesSize,
+                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+            }
+            doFlush();
+        }
+        ByteBuf byteBuf = dataSerializer.serialize(data);

Review Comment:
   I would put those two lines back in `internalAsyncAddData` where you called 
`trigFlushByLargeSingleData`, since this method does one thing: trigger a flush 
as the name suggests.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".
+     */
+    @Test
+    public void testMetricsStatsThatTriggeredByMaxRecordCount() throws 
Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger and write counter.

Review Comment:
   Convert this comment into code:
   ```java
   MockManagedLedger mockManagedLedger = createMockManagedLedgerWithCounters()
   ```
   
   where it's a simple data class
   ```java
   class MockManagedLedger {
       ManagedLedger managedLedger;
       AtomicInteger     batchFlushCounter
   }
   ```      
   
   Then you can delete your comment and delete
   ```java
           ManagedLedger managedLedger = mlAndWriteCounter.getLeft();
           AtomicInteger batchFlushCounter = mlAndWriteCounter.getRight();
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java:
##########
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.io.Closeable;
+import java.util.HashMap;
+
+/***
+ * Describes the working status of the {@link TxnLogBufferedWriter}, helps 
users tune the thresholds of
+ * {@link TxnLogBufferedWriter} for best performance.
+ * Note-1: When batch feature is turned off, no data is logged at this. In 
this scenario,users can see the
+ *    {@link org.apache.bookkeeper.mledger.ManagedLedgerMXBean}.
+ * Note-2: Even if enable batch feature, if a single record is too big, it 
still directly write to Bookie without batch,
+ *    property {@link #pulsarBatchedLogTriggeringCountByForce} can indicate 
this case. But this case will not affect
+ *    other metrics, because it would obscure the real situation. E.g. there 
has two record:

Review Comment:
   I don't understand this entire explanation. Why does a metric indicating you 
flushed due to accepting a single large record obscure anything?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -116,6 +116,16 @@
             AtomicReferenceFieldUpdater
                     .newUpdater(TxnLogBufferedWriter.class, 
TxnLogBufferedWriter.State.class, "state");
 
+    /** Metrics. **/

Review Comment:
   I still see the comment. 



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".
+     */
+    @Test
+    public void testMetricsStatsThatTriggeredByMaxRecordCount() throws 
Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger and write counter.
+        Pair<ManagedLedger, AtomicInteger> mlAndWriteCounter = 
mockManagedLedgerWithWriteCounter(mlName);
+        ManagedLedger managedLedger = mlAndWriteCounter.getLeft();
+        AtomicInteger batchFlushCounter = mlAndWriteCounter.getRight();
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =

Review Comment:
   Same here
   ```
   class MockAddDataCallback {
       addDataCallback
       finishCount
       failureCount
   }
   ```



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".
+     */
+    @Test
+    public void testMetricsStatsThatTriggeredByMaxRecordCount() throws 
Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger and write counter.
+        Pair<ManagedLedger, AtomicInteger> mlAndWriteCounter = 
mockManagedLedgerWithWriteCounter(mlName);
+        ManagedLedger managedLedger = mlAndWriteCounter.getLeft();
+        AtomicInteger batchFlushCounter = mlAndWriteCounter.getRight();
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("tx-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                new SumStrDataSerializer(), 2, 1024,
+                1000 * 3600, true, metricsStats);
+        // Add some data.
+        int writeCount = 100;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all write finish.
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        /** Assert metrics stats correct. **/

Review Comment:
   Comment not needed



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".
+     */
+    @Test
+    public void testMetricsStatsThatTriggeredByMaxRecordCount() throws 
Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger and write counter.
+        Pair<ManagedLedger, AtomicInteger> mlAndWriteCounter = 
mockManagedLedgerWithWriteCounter(mlName);
+        ManagedLedger managedLedger = mlAndWriteCounter.getLeft();
+        AtomicInteger batchFlushCounter = mlAndWriteCounter.getRight();
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("tx-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                new SumStrDataSerializer(), 2, 1024,
+                1000 * 3600, true, metricsStats);
+        // Add some data.
+        int writeCount = 100;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all write finish.
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        /** Assert metrics stats correct. **/
+        verifyTheCounterMetrics(writeCount / 2,0,0,0);

Review Comment:
   Make it explicit: What is 2? Why 2?
   ```
   writeCount / maxRecordPerBatch
   ```



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".
+     */
+    @Test
+    public void testMetricsStatsThatTriggeredByMaxRecordCount() throws 
Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger and write counter.
+        Pair<ManagedLedger, AtomicInteger> mlAndWriteCounter = 
mockManagedLedgerWithWriteCounter(mlName);
+        ManagedLedger managedLedger = mlAndWriteCounter.getLeft();
+        AtomicInteger batchFlushCounter = mlAndWriteCounter.getRight();
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.

Review Comment:
   Push this into a method since it's not really interesting - the parameters 
are the interesting ones - the special ones.
   
   ```
   maxBatchSize=2
   ...
   createBufferedWriter(maxBatchSize, ...)
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java:
##########
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.io.Closeable;
+import java.util.HashMap;
+
+/***
+ * Describes the working status of the {@link TxnLogBufferedWriter}, helps 
users tune the thresholds of
+ * {@link TxnLogBufferedWriter} for best performance.
+ * Note-1: When batch feature is turned off, no data is logged at this. In 
this scenario,users can see the
+ *    {@link org.apache.bookkeeper.mledger.ManagedLedgerMXBean}.
+ * Note-2: Even if enable batch feature, if a single record is too big, it 
still directly write to Bookie without batch,
+ *    property {@link #pulsarBatchedLogTriggeringCountByForce} can indicate 
this case. But this case will not affect
+ *    other metrics, because it would obscure the real situation. E.g. there 
has two record:
+ *    [{recordsCount=512, triggerByMaxRecordCount}, {recordCount=1, 
triggerByTooLarge}], we should not tell the users
+ *    that the average batch records is 256, if the users knows that there are 
only 256 records per batch, then users
+ *    will try to increase {@link 
TxnLogBufferedWriter#batchedWriteMaxDelayInMillis} so that there is more data 
per
+ *    batch to improve throughput, but that does not work.
+ */
+public class TxnLogBufferedWriterMetricsStats implements Closeable {
+
+    /**
+     * Key is the name we used to create {@link 
TxnLogBufferedWriterMetricsStats}, and now there are two kinds:
+     * ["pulsar_txn_tc_log", "pulsar_txn_tc_batched_log"]. There can be 
multiple labels in each
+     * {@link TxnLogBufferedWriterMetricsStats}, such as The Transaction 
Coordinator using coordinatorId as label and
+     * The Transaction Pending Ack Store using subscriptionName as label.
+     */
+    private static final HashMap<String, Collector> COLLECTOR_CACHE = new 
HashMap<>();

Review Comment:
   >But the Transaction Log Provider will hold all the Collector of Txn 
Buffered Writer; this is confusing
   
   Can you explain this further? I didn't understand how the provider would 
hold all the Collectors (metrics). It should only be to a single instance of 
the metric class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to