poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r939680362
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +610,371 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
}
}
+ private static class RandomLenSumStrDataSerializer extends
SumStrDataSerializer {
+
+ @Getter
+ private int totalSize;
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ int size = new Random().nextInt(9);
+ totalSize += size;
+ return size;
+ }
+ }
+
+ private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+ private final int len1;
+
+ private final int len2;
+
+ private AtomicBoolean useLen2 = new AtomicBoolean();
+
+ public TwoLenSumDataSerializer(int len1, int len2){
+ this.len1 = len1;
+ this.len2 = len2;
+ }
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ boolean b = useLen2.get();
+ useLen2.set(!b);
+ return b ? len2 : len1;
+ }
+ }
+
public enum BookieErrorType{
NO_ERROR,
ALWAYS_ERROR,
SOMETIMES_ERROR;
}
+
+ /**
+ * Test Transaction buffered writer stats when disabled batch feature.
+ */
+ @Test
+ public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+ TxnLogBufferedWriterMetricsStats metricsStats = new
TxnLogBufferedWriterMetricsStats(
+ metricsPrefix, metricsLabelNames, metricsLabelValues,
CollectorRegistry.defaultRegistry
+ );
+ ManagedLedger managedLedger = factory.open("tx_test_ledger");
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(5).name("txn-threads").build();
+ // Create TxnLogBufferedWriter.
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ var dataSerializer = new RandomLenSumStrDataSerializer();
+ var txnLogBufferedWriter = new TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor, transactionTimer,
+ dataSerializer, Integer.MAX_VALUE, Integer.MAX_VALUE,
+ Integer.MAX_VALUE, false, metricsStats);
+ // Add some data.
+ int writeCount = 1000;
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ // Assert metrics stat.
+ verifyTheHistogramMetrics(0, 0, 0);
+ // cleanup.
+ txnLogBufferedWriter.close();
+ metricsStats.close();
+ transactionTimer.stop();
+ orderedExecutor.shutdown();
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxRecordCount() throws
Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxRecords = 2;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / batchedWriteMaxRecords;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, batchedWriteMaxRecords, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(expectedBatchFlushCount,0,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxSize() throws Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxSize = 16;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / (batchedWriteMaxSize /
dataSerializer.getSizePerData());
+ int expectedTotalBytesSize = expectedBatchFlushCount *
batchedWriteMaxSize;
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, Integer.MAX_VALUE, batchedWriteMaxSize,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(0, expectedBatchFlushCount,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxDelayTime() throws Exception
{
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int writeCount = 100;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
+ createTxnBufferedWriterContextWithMetrics(dataSerializer,
Integer.MAX_VALUE,
+ Integer.MAX_VALUE, 1);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ Thread.sleep(1);
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ verifyTheCounterMetrics(0,0, actualBatchFlushCount,0);
+ verifyTheHistogramMetrics(actualBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByLargeSingleData() throws
Exception {
+ // Use TwoLenSumDataSerializer for: write a little data once, then
write a large data once.
+ int bytesSizePerRecordWhichInBatch = 4;
+ int batchedWriteMaxSize = 1024;
+ TwoLenSumDataSerializer dataSerializer =
+ new TwoLenSumDataSerializer(bytesSizePerRecordWhichInBatch,
batchedWriteMaxSize);
+ int writeCount = 100;
+ int singleLargeDataRequestCount = writeCount / 2;
+ int expectedBatchFlushTriggeredByLargeData =
singleLargeDataRequestCount;
+ int expectedTotalBytesSize = expectedBatchFlushTriggeredByLargeData *
bytesSizePerRecordWhichInBatch;
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, Integer.MAX_VALUE, batchedWriteMaxSize,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
i);
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
Review Comment:
It seems to make no difference. In this unit test, we use
`TwoLenSumDataSerializer` to control the size of each write: [4,
batchedWriteMaxSize, batchedWriteMaxSize, 4, batchedWriteMaxSize....]
And the code you write is easier to read.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]