poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r936949315


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +620,322 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
         }
     }
 
+    private static class RandomLenSumStrDataSerializer extends 
JsonDataSerializer {
+
+        @Getter
+        private int totalSize;
+
+        /**
+         * After the test, when {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+         *   and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} = 
1024,
+         *   and {@link 
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the 
random-size baseline
+         *   was set as 9, and there was maximum probability that all three 
thresholds could be hit.
+         */
+        @Override
+        public int getSerializedSize(Integer data) {
+            int size = new Random().nextInt(9);
+            totalSize += size;
+            return size;
+        }
+    }
+
     public enum BookieErrorType{
         NO_ERROR,
         ALWAYS_ERROR,
         SOMETIMES_ERROR;
     }
+
+    /**
+     * 1. Verify Transaction buffered writer stats correct when enabled batch 
feature. Exclusive "triggerByForceFlush",
+     *    this property verified by {@link #testMetricsStatsWhenForceFlush()}.
+     * 2. Verify metrics will be release after {@link 
TxnLogBufferedWriterMetricsStats#clone()}.
+     */
+    @Test
+    public void testMetricsStatsWhenEnabled() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        // Mock managed ledger to get the count of refresh event.
+        AtomicInteger refreshCount = new AtomicInteger();
+        String managedLedgerName = "-";
+        ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+        Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                refreshCount.incrementAndGet();
+                AsyncCallbacks.AddEntryCallback callback =
+                        (AsyncCallbacks.AddEntryCallback) 
invocation.getArguments()[1];
+                callback.addComplete(PositionImpl.get(1,1), 
(ByteBuf)invocation.getArguments()[0],
+                        invocation.getArguments()[2]);
+                return null;
+            }
+        }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), 
Mockito.any(), Mockito.any());
+        // Mock addDataCallbackCount to get the count of add data finish count;
+        AtomicInteger addDataCallbackFinishCount = new AtomicInteger();
+        AtomicInteger addDataCallbackFailureCount = new AtomicInteger();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = new 
TxnLogBufferedWriter.AddDataCallback(){
+            @Override
+            public void addComplete(Position position, Object context) {
+                addDataCallbackFinishCount.incrementAndGet();
+            }
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                addDataCallbackFailureCount.incrementAndGet();
+            }
+        };
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("tx-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, true, metricsStats);
+        // Add some data.
+        int writeCount = 3000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        /** Assert metrics stats correct. **/
+        
Assert.assertEquals(getCounterValue(String.format("%s_batched_log_triggering_count_by_force",
 metricsPrefix)), 0D);
+        Assert.assertEquals(
+                
getCounterValue(String.format("%s_batched_log_triggering_count_by_records", 
metricsPrefix))
+                + 
getCounterValue(String.format("%s_batched_log_triggering_count_by_size", 
metricsPrefix))
+                + 
getCounterValue(String.format("%s_batched_log_triggering_count_by_delay_time", 
metricsPrefix)),
+                (double)refreshCount.get());
+        Assert.assertEquals(
+                
getHistogramCount(String.format("%s_batched_log_records_count_per_entry", 
metricsPrefix)),
+                refreshCount.get());
+        Assert.assertEquals(
+                
getHistogramSum(String.format("%s_batched_log_records_count_per_entry", 
metricsPrefix)),
+                writeCount);
+        Assert.assertEquals(
+                
getHistogramCount(String.format("%s_batched_log_entry_size_bytes", 
metricsPrefix)),
+                refreshCount.get());
+        Assert.assertEquals(
+                
getHistogramSum(String.format("%s_batched_log_entry_size_bytes", 
metricsPrefix)),
+                dataSerializer.getTotalSize());
+        Assert.assertEquals(
+                
getHistogramCount(String.format("%s_batched_log_oldest_record_delay_time_seconds",
 metricsPrefix)),
+                refreshCount.get());
+        /**
+         * Assert all metrics will be released after {@link 
TxnLogBufferedWriter#close()}
+         *   1. Register another {@link TxnLogBufferedWriter}
+         *   2. Close first {@link TxnLogBufferedWriter}, verify the labels of 
metrics will be released after
+         *      {@link TxnLogBufferedWriter#close()}
+         *   3. Close second {@link TxnLogBufferedWriter},verify all metrics 
will be released after all
+         *      {@link TxnLogBufferedWriter#close()}
+         */
+        TxnLogBufferedWriterMetricsStats anotherMetricsStat = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, new String[]{"2"}, 
CollectorRegistry.defaultRegistry);
+        TxnLogBufferedWriter<Integer> anotherTxnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, true, anotherMetricsStat);
+        anotherTxnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount + 1
+        );
+        // Close first-writer, verify the labels will be released after 
writer-close.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> getHistogramCount(
+                        
String.format("%s_batched_log_oldest_record_delay_time_seconds", 
metricsPrefix)) == 0
+        );
+        Assert.assertEquals(
+                
getCounterValue(String.format("%s_batched_log_triggering_count_by_records", 
metricsPrefix)),
+                0D
+        );
+        Assert.assertEquals(
+                
getCounterValue(String.format("%s_batched_log_triggering_count_by_size", 
metricsPrefix)),
+                0D
+        );
+        Assert.assertEquals(
+                
getCounterValue(String.format("%s_batched_log_triggering_count_by_delay_time", 
metricsPrefix)),
+                0D
+        );
+        Assert.assertEquals(
+                
getHistogramCount(String.format("%s_batched_log_records_count_per_entry", 
metricsPrefix)),
+                0D
+        );
+        Assert.assertEquals(
+                
getHistogramCount(String.format("%s_batched_log_entry_size_bytes", 
metricsPrefix)),
+                0D
+        );
+        // Close second-writer, verify all metrics will be released after all 
writer-close.
+        anotherTxnLogBufferedWriter.close();
+        anotherMetricsStat.close();
+        // cleanup.
+        transactionTimer.stop();
+        managedLedger.close();
+        orderedExecutor.shutdown();
+    }
+
+    private double getCounterValue(String name) {
+        Double d = CollectorRegistry.defaultRegistry.getSampleValue(
+                name + "_total",
+                metricsLabelNames,
+                metricsLabelValues);
+        return d == null ? 0: d.doubleValue();
+    }
+
+    private double getHistogramCount(String name) {
+        Double d = CollectorRegistry.defaultRegistry.getSampleValue(
+                name + "_count",
+                metricsLabelNames,
+                metricsLabelValues);
+        return d == null ? 0: d.doubleValue();
+    }
+
+    private double getHistogramSum(String name) {
+        Double d = CollectorRegistry.defaultRegistry.getSampleValue(
+                name + "_sum",
+                metricsLabelNames,
+                metricsLabelValues);
+        return d == null ? 0: d.doubleValue();
+    }
+
+    /**
+     * Test Transaction buffered writer stats when disabled batch feature.
+     */
+    @Test
+    public void testMetricsStatsWhenDisabled() throws Exception {
+        TxnLogBufferedWriterMetricsStats metricsStats = new 
TxnLogBufferedWriterMetricsStats(
+                metricsPrefix, metricsLabelNames, metricsLabelValues, 
CollectorRegistry.defaultRegistry
+        );
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        // Mock addDataCallbackCount to get the count of add data finish count;
+        AtomicInteger addDataCallbackFinishCount = new AtomicInteger();
+        AtomicInteger addDataCallbackFailureCount = new AtomicInteger();
+        TxnLogBufferedWriter.AddDataCallback addDataCallback = new 
TxnLogBufferedWriter.AddDataCallback(){
+            @Override
+            public void addComplete(Position position, Object context) {
+                addDataCallbackFinishCount.incrementAndGet();
+            }
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                addDataCallbackFailureCount.incrementAndGet();
+            }
+        };
+        // Create TxnLogBufferedWriter.
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("txn-threads").build();
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        RandomLenSumStrDataSerializer dataSerializer = new 
RandomLenSumStrDataSerializer();
+        TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                managedLedger, orderedExecutor, transactionTimer,
+                dataSerializer, 256, 1024,
+                1, false, metricsStats);
+        // Add some data.
+        int writeCount = 1000;
+        for (int i = 0; i < writeCount; i++){
+            txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+        }
+        // Wait for all data write finish.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> addDataCallbackFinishCount.get() + 
addDataCallbackFailureCount.get() == writeCount
+        );
+        Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+        // Assert metrics stat.
+        
Assert.assertEquals(getCounterValue(String.format("%s_batched_log_triggering_count_by_force",
 metricsPrefix)), 0D);
+        Assert.assertEquals(
+                
getCounterValue(String.format("%s_batched_log_triggering_count_by_records", 
metricsPrefix))
+                    + 
getCounterValue(String.format("%s_batched_log_triggering_count_by_size", 
metricsPrefix))
+                    + 
getCounterValue(String.format("%s_batched_log_triggering_count_by_delay_time", 
metricsPrefix)),
+                0D);
+        Assert.assertEquals(
+                
getHistogramCount(String.format("%s_batched_log_records_count_per_entry", 
metricsPrefix)),
+                0D);
+        Assert.assertEquals(
+                
getHistogramCount(String.format("%s_batched_log_entry_size_bytes", 
metricsPrefix)),
+                0D);
+        Assert.assertEquals(
+                
getHistogramCount(String.format("%s_batched_log_oldest_record_delay_time_seconds",
 metricsPrefix)),
+                0D);
+        // cleanup.
+        txnLogBufferedWriter.close();
+        metricsStats.close();
+        transactionTimer.stop();
+        orderedExecutor.shutdown();
+    }
+
+    /**
+     * Test {@link TxnLogBufferedWriterMetricsStats#triggerFlushByForce(int, 
long, long)}.
+     */
+    @Test
+    public void testMetricsStatsWhenForceFlush() throws Exception {

Review Comment:
   > If you remove public force flush then this test can be removed?
   
   No, I will rename this method to 
`testMetricsStatsWhenTriggeredLargeSingleData`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to