poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r937560276


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +608,402 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
SumStrDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
+    private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+        private final int len1;
+
+        private final int len2;
+
+        private AtomicBoolean useLen2 = new AtomicBoolean();
+
+        public TwoLenSumDataSerializer(int len1, int len2){
+            this.len1 = len1;
+            this.len2 = len2;
+        }
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            boolean b = useLen2.get();
+            useLen2.set(!b);
+            return b ? len2 : len1;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        verifyTheHistogramMetrics(0, 0, 0);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * For metrics scenario "max records count".
+     */
+    @Test
+    public void testMetricsStatsThatTriggeredByMaxRecordCount() throws 
Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger and write counter.
+        Pair<ManagedLedger, AtomicInteger> mlAndWriteCounter = 
mockManagedLedgerWithWriteCounter(mlName);
+        ManagedLedger managedLedger = mlAndWriteCounter.getLeft();
+        AtomicInteger batchFlushCounter = mlAndWriteCounter.getRight();
+        // Create callback with counter.
+        Triple<TxnLogBufferedWriter.AddDataCallback, AtomicInteger, 
AtomicInteger> callbackWithCounter =
+                createCallBackWithCounter();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = 
callbackWithCounter.getLeft();
+        AtomicInteger addDataCallbackFinishCount = 
callbackWithCounter.getMiddle();
+        AtomicInteger addDataCallbackFailureCount = 
callbackWithCounter.getRight();
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("tx-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                new SumStrDataSerializer(), 2, 1024,
+                1000 * 3600, true, metricsStats);
+        // Add some data.
+        int writeCount = 100;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all write finish.
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        /** Assert metrics stats correct. **/
+        verifyTheCounterMetrics(writeCount / 2,0,0,0);
+        verifyTheHistogramMetrics(batchFlushCounter.get(), writeCount, 
writeCount * 4);

Review Comment:
   > Do this call for each you verify
   
   Correct.  I've used a better way: calculate `expectedBacthFlushCount` and 
verify it equals to `batchFlushCounter.get()`, then replace these expressions 
with meaningful variable names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to