poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r936949315
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +620,322 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
}
}
+ private static class RandomLenSumStrDataSerializer extends
JsonDataSerializer {
+
+ @Getter
+ private int totalSize;
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ int size = new Random().nextInt(9);
+ totalSize += size;
+ return size;
+ }
+ }
+
public enum BookieErrorType{
NO_ERROR,
ALWAYS_ERROR,
SOMETIMES_ERROR;
}
+
+ /**
+ * 1. Verify Transaction buffered writer stats correct when enabled batch
feature. Exclusive "triggerByForceFlush",
+ * this property verified by {@link #testMetricsStatsWhenForceFlush()}.
+ * 2. Verify metrics will be release after {@link
TxnLogBufferedWriterMetricsStats#clone()}.
+ */
+ @Test
+ public void testMetricsStatsWhenEnabled() throws Exception {
+ TxnLogBufferedWriterMetricsStats metricsStats = new
TxnLogBufferedWriterMetricsStats(
+ metricsPrefix, metricsLabelNames, metricsLabelValues,
CollectorRegistry.defaultRegistry
+ );
+ // Mock managed ledger to get the count of refresh event.
+ AtomicInteger refreshCount = new AtomicInteger();
+ String managedLedgerName = "-";
+ ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+ Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ refreshCount.incrementAndGet();
+ AsyncCallbacks.AddEntryCallback callback =
+ (AsyncCallbacks.AddEntryCallback)
invocation.getArguments()[1];
+ callback.addComplete(PositionImpl.get(1,1),
(ByteBuf)invocation.getArguments()[0],
+ invocation.getArguments()[2]);
+ return null;
+ }
+ }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class),
Mockito.any(), Mockito.any());
+ // Mock addDataCallbackCount to get the count of add data finish count;
+ AtomicInteger addDataCallbackFinishCount = new AtomicInteger();
+ AtomicInteger addDataCallbackFailureCount = new AtomicInteger();
+ TxnLogBufferedWriter.AddDataCallback addDataCallback = new
TxnLogBufferedWriter.AddDataCallback(){
+ @Override
+ public void addComplete(Position position, Object context) {
+ addDataCallbackFinishCount.incrementAndGet();
+ }
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ addDataCallbackFailureCount.incrementAndGet();
+ }
+ };
+ // Create TxnLogBufferedWriter.
+ OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder()
+ .numThreads(5).name("tx-threads").build();
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ RandomLenSumStrDataSerializer dataSerializer = new
RandomLenSumStrDataSerializer();
+ TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new
TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor, transactionTimer,
+ dataSerializer, 256, 1024,
+ 1, true, metricsStats);
+ // Add some data.
+ int writeCount = 3000;
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> addDataCallbackFinishCount.get() +
addDataCallbackFailureCount.get() == writeCount
+ );
+ Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+ /** Assert metrics stats correct. **/
+
Assert.assertEquals(getCounterValue(String.format("%s_batched_log_triggering_count_by_force",
metricsPrefix)), 0D);
+ Assert.assertEquals(
+
getCounterValue(String.format("%s_batched_log_triggering_count_by_records",
metricsPrefix))
+ +
getCounterValue(String.format("%s_batched_log_triggering_count_by_size",
metricsPrefix))
+ +
getCounterValue(String.format("%s_batched_log_triggering_count_by_delay_time",
metricsPrefix)),
+ (double)refreshCount.get());
+ Assert.assertEquals(
+
getHistogramCount(String.format("%s_batched_log_records_count_per_entry",
metricsPrefix)),
+ refreshCount.get());
+ Assert.assertEquals(
+
getHistogramSum(String.format("%s_batched_log_records_count_per_entry",
metricsPrefix)),
+ writeCount);
+ Assert.assertEquals(
+
getHistogramCount(String.format("%s_batched_log_entry_size_bytes",
metricsPrefix)),
+ refreshCount.get());
+ Assert.assertEquals(
+
getHistogramSum(String.format("%s_batched_log_entry_size_bytes",
metricsPrefix)),
+ dataSerializer.getTotalSize());
+ Assert.assertEquals(
+
getHistogramCount(String.format("%s_batched_log_oldest_record_delay_time_seconds",
metricsPrefix)),
+ refreshCount.get());
+ /**
+ * Assert all metrics will be released after {@link
TxnLogBufferedWriter#close()}
+ * 1. Register another {@link TxnLogBufferedWriter}
+ * 2. Close first {@link TxnLogBufferedWriter}, verify the labels of
metrics will be released after
+ * {@link TxnLogBufferedWriter#close()}
+ * 3. Close second {@link TxnLogBufferedWriter},verify all metrics
will be released after all
+ * {@link TxnLogBufferedWriter#close()}
+ */
+ TxnLogBufferedWriterMetricsStats anotherMetricsStat = new
TxnLogBufferedWriterMetricsStats(
+ metricsPrefix, metricsLabelNames, new String[]{"2"},
CollectorRegistry.defaultRegistry);
+ TxnLogBufferedWriter<Integer> anotherTxnLogBufferedWriter = new
TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor, transactionTimer,
+ dataSerializer, 256, 1024,
+ 1, true, anotherMetricsStat);
+ anotherTxnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> addDataCallbackFinishCount.get() +
addDataCallbackFailureCount.get() == writeCount + 1
+ );
+ // Close first-writer, verify the labels will be released after
writer-close.
+ txnLogBufferedWriter.close();
+ metricsStats.close();
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+ () -> getHistogramCount(
+
String.format("%s_batched_log_oldest_record_delay_time_seconds",
metricsPrefix)) == 0
+ );
+ Assert.assertEquals(
+
getCounterValue(String.format("%s_batched_log_triggering_count_by_records",
metricsPrefix)),
+ 0D
+ );
+ Assert.assertEquals(
+
getCounterValue(String.format("%s_batched_log_triggering_count_by_size",
metricsPrefix)),
+ 0D
+ );
+ Assert.assertEquals(
+
getCounterValue(String.format("%s_batched_log_triggering_count_by_delay_time",
metricsPrefix)),
+ 0D
+ );
+ Assert.assertEquals(
+
getHistogramCount(String.format("%s_batched_log_records_count_per_entry",
metricsPrefix)),
+ 0D
+ );
+ Assert.assertEquals(
+
getHistogramCount(String.format("%s_batched_log_entry_size_bytes",
metricsPrefix)),
+ 0D
+ );
+ // Close second-writer, verify all metrics will be released after all
writer-close.
+ anotherTxnLogBufferedWriter.close();
+ anotherMetricsStat.close();
+ // cleanup.
+ transactionTimer.stop();
+ managedLedger.close();
+ orderedExecutor.shutdown();
+ }
+
+ private double getCounterValue(String name) {
+ Double d = CollectorRegistry.defaultRegistry.getSampleValue(
+ name + "_total",
+ metricsLabelNames,
+ metricsLabelValues);
+ return d == null ? 0: d.doubleValue();
+ }
+
+ private double getHistogramCount(String name) {
+ Double d = CollectorRegistry.defaultRegistry.getSampleValue(
+ name + "_count",
+ metricsLabelNames,
+ metricsLabelValues);
+ return d == null ? 0: d.doubleValue();
+ }
+
+ private double getHistogramSum(String name) {
+ Double d = CollectorRegistry.defaultRegistry.getSampleValue(
+ name + "_sum",
+ metricsLabelNames,
+ metricsLabelValues);
+ return d == null ? 0: d.doubleValue();
+ }
+
+ /**
+ * Test Transaction buffered writer stats when disabled batch feature.
+ */
+ @Test
+ public void testMetricsStatsWhenDisabled() throws Exception {
+ TxnLogBufferedWriterMetricsStats metricsStats = new
TxnLogBufferedWriterMetricsStats(
+ metricsPrefix, metricsLabelNames, metricsLabelValues,
CollectorRegistry.defaultRegistry
+ );
+ ManagedLedger managedLedger = factory.open("tx_test_ledger");
+ // Mock addDataCallbackCount to get the count of add data finish count;
+ AtomicInteger addDataCallbackFinishCount = new AtomicInteger();
+ AtomicInteger addDataCallbackFailureCount = new AtomicInteger();
+ TxnLogBufferedWriter.AddDataCallback addDataCallback = new
TxnLogBufferedWriter.AddDataCallback(){
+ @Override
+ public void addComplete(Position position, Object context) {
+ addDataCallbackFinishCount.incrementAndGet();
+ }
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ addDataCallbackFailureCount.incrementAndGet();
+ }
+ };
+ // Create TxnLogBufferedWriter.
+ OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder()
+ .numThreads(5).name("txn-threads").build();
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ RandomLenSumStrDataSerializer dataSerializer = new
RandomLenSumStrDataSerializer();
+ TxnLogBufferedWriter<Integer> txnLogBufferedWriter = new
TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor, transactionTimer,
+ dataSerializer, 256, 1024,
+ 1, false, metricsStats);
+ // Add some data.
+ int writeCount = 1000;
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, addDataCallback, "");
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+ () -> addDataCallbackFinishCount.get() +
addDataCallbackFailureCount.get() == writeCount
+ );
+ Assert.assertEquals(addDataCallbackFailureCount.get(), 0);
+ // Assert metrics stat.
+
Assert.assertEquals(getCounterValue(String.format("%s_batched_log_triggering_count_by_force",
metricsPrefix)), 0D);
+ Assert.assertEquals(
+
getCounterValue(String.format("%s_batched_log_triggering_count_by_records",
metricsPrefix))
+ +
getCounterValue(String.format("%s_batched_log_triggering_count_by_size",
metricsPrefix))
+ +
getCounterValue(String.format("%s_batched_log_triggering_count_by_delay_time",
metricsPrefix)),
+ 0D);
+ Assert.assertEquals(
+
getHistogramCount(String.format("%s_batched_log_records_count_per_entry",
metricsPrefix)),
+ 0D);
+ Assert.assertEquals(
+
getHistogramCount(String.format("%s_batched_log_entry_size_bytes",
metricsPrefix)),
+ 0D);
+ Assert.assertEquals(
+
getHistogramCount(String.format("%s_batched_log_oldest_record_delay_time_seconds",
metricsPrefix)),
+ 0D);
+ // cleanup.
+ txnLogBufferedWriter.close();
+ metricsStats.close();
+ transactionTimer.stop();
+ orderedExecutor.shutdown();
+ }
+
+ /**
+ * Test {@link TxnLogBufferedWriterMetricsStats#triggerFlushByForce(int,
long, long)}.
+ */
+ @Test
+ public void testMetricsStatsWhenForceFlush() throws Exception {
Review Comment:
> If you remove public force flush then this test can be removed?
No, I will rename this method to
`testMetricsStatsWhenTriggeredLargeSingleData`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]