asafm commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r939651956
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ internalAsyncAddData(data, callback, ctx);
+ } catch (Exception e){
+ log.error("Internal async add data fail", e);
Review Comment:
Does it make sense to log the error instead of calling `callback.addFailed`?
@codelipenghui WDYT?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ internalAsyncAddData(data, callback, ctx);
+ } catch (Exception e){
+ log.error("Internal async add data fail", e);
+ }
+ });
}
+ /**
+ * Append data to queue, if reach {@link #batchedWriteMaxRecords} or
{@link #batchedWriteMaxSize}, do flush. And if
+ * accept a request that {@param data} is too large (larger than {@link
#batchedWriteMaxSize}), then two flushes
+ * are executed:
+ * 1. Write the data cached in the queue to BK.
+ * 2. Direct write the large data to BK, this flush event will not
record to Metrics.
+ * This ensures the sequential nature of multiple writes to BK.
+ */
private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
if (state == State.CLOSING || state == State.CLOSED){
callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
return;
}
- int len = dataSerializer.getSerializedSize(data);
- if (len >= batchedWriteMaxSize){
- if (!flushContext.asyncAddArgsList.isEmpty()) {
- doTrigFlush(true, false);
- }
+ int dataLength = dataSerializer.getSerializedSize(data);
+ if (dataLength >= batchedWriteMaxSize){
+ trigFlushByLargeSingleData();
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- // Add data.
- this.dataArray.add(data);
- // Add callback info.
+ // Append data to the data-array.
+ dataArray.add(data);
+ // Append callback to the flushContext.
AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis());
- this.flushContext.asyncAddArgsList.add(asyncAddArgs);
- // Calculate bytes-size.
- this.bytesSize += len;
- // trig flush.
- doTrigFlush(false, false);
+ flushContext.asyncAddArgsList.add(asyncAddArgs);
+ // Calculate bytes size.
Review Comment:
This comment is completely redundant.
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +610,371 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
}
}
+ private static class RandomLenSumStrDataSerializer extends
SumStrDataSerializer {
+
+ @Getter
+ private int totalSize;
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ int size = new Random().nextInt(9);
+ totalSize += size;
+ return size;
+ }
+ }
+
+ private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+ private final int len1;
+
+ private final int len2;
+
+ private AtomicBoolean useLen2 = new AtomicBoolean();
+
+ public TwoLenSumDataSerializer(int len1, int len2){
+ this.len1 = len1;
+ this.len2 = len2;
+ }
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ boolean b = useLen2.get();
+ useLen2.set(!b);
+ return b ? len2 : len1;
+ }
+ }
+
public enum BookieErrorType{
NO_ERROR,
ALWAYS_ERROR,
SOMETIMES_ERROR;
}
+
+ /**
+ * Test Transaction buffered writer stats when disabled batch feature.
+ */
+ @Test
+ public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+ TxnLogBufferedWriterMetricsStats metricsStats = new
TxnLogBufferedWriterMetricsStats(
+ metricsPrefix, metricsLabelNames, metricsLabelValues,
CollectorRegistry.defaultRegistry
+ );
+ ManagedLedger managedLedger = factory.open("tx_test_ledger");
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(5).name("txn-threads").build();
+ // Create TxnLogBufferedWriter.
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ var dataSerializer = new RandomLenSumStrDataSerializer();
+ var txnLogBufferedWriter = new TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor, transactionTimer,
+ dataSerializer, Integer.MAX_VALUE, Integer.MAX_VALUE,
+ Integer.MAX_VALUE, false, metricsStats);
+ // Add some data.
+ int writeCount = 1000;
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ // Assert metrics stat.
+ verifyTheHistogramMetrics(0, 0, 0);
+ // cleanup.
+ txnLogBufferedWriter.close();
+ metricsStats.close();
+ transactionTimer.stop();
+ orderedExecutor.shutdown();
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxRecordCount() throws
Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxRecords = 2;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / batchedWriteMaxRecords;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, batchedWriteMaxRecords, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(expectedBatchFlushCount,0,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxSize() throws Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxSize = 16;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / (batchedWriteMaxSize /
dataSerializer.getSizePerData());
+ int expectedTotalBytesSize = expectedBatchFlushCount *
batchedWriteMaxSize;
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, Integer.MAX_VALUE, batchedWriteMaxSize,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(0, expectedBatchFlushCount,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxDelayTime() throws Exception
{
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int writeCount = 100;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
+ createTxnBufferedWriterContextWithMetrics(dataSerializer,
Integer.MAX_VALUE,
+ Integer.MAX_VALUE, 1);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ Thread.sleep(1);
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
Review Comment:
Where are you checking `actualBatchFlushCount == 1` ?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ internalAsyncAddData(data, callback, ctx);
+ } catch (Exception e){
+ log.error("Internal async add data fail", e);
+ }
+ });
}
+ /**
+ * Append data to queue, if reach {@link #batchedWriteMaxRecords} or
{@link #batchedWriteMaxSize}, do flush. And if
+ * accept a request that {@param data} is too large (larger than {@link
#batchedWriteMaxSize}), then two flushes
+ * are executed:
+ * 1. Write the data cached in the queue to BK.
+ * 2. Direct write the large data to BK, this flush event will not
record to Metrics.
+ * This ensures the sequential nature of multiple writes to BK.
+ */
private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
if (state == State.CLOSING || state == State.CLOSED){
callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
return;
}
- int len = dataSerializer.getSerializedSize(data);
- if (len >= batchedWriteMaxSize){
- if (!flushContext.asyncAddArgsList.isEmpty()) {
- doTrigFlush(true, false);
- }
+ int dataLength = dataSerializer.getSerializedSize(data);
+ if (dataLength >= batchedWriteMaxSize){
+ trigFlushByLargeSingleData();
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- // Add data.
- this.dataArray.add(data);
- // Add callback info.
+ // Append data to the data-array.
Review Comment:
I don't understand: The comment says, "append data to the data array"
and then the code says `dataArray.add(data)`, so the comment says the same
thing as the code --> comment can be deleted
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,64 +336,24 @@ private void internalAsyncAddData(T data, AddDataCallback
callback, Object ctx){
}
- /**
- * Trigger write to bookie once, If the conditions are not met, nothing
will be done.
- */
- public void trigFlush(final boolean force, boolean byScheduleThreads){
- singleThreadExecutorForWrite.execute(() -> doTrigFlush(force,
byScheduleThreads));
- }
-
- private void doTrigFlush(boolean force, boolean byScheduleThreads){
- try {
- if (flushContext.asyncAddArgsList.isEmpty()) {
- return;
- }
- if (force) {
- doFlush();
- return;
- }
- if (byScheduleThreads) {
- doFlush();
- return;
- }
- AsyncAddArgs firstAsyncAddArgs =
flushContext.asyncAddArgsList.get(0);
- if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >=
batchedWriteMaxDelayInMillis) {
- doFlush();
- return;
- }
- if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords) {
- doFlush();
- return;
- }
- if (this.bytesSize >= batchedWriteMaxSize) {
- doFlush();
- }
- } finally {
- if (byScheduleThreads) {
- nextTimingTrigger();
- }
- }
- }
-
private void doFlush(){
- // Combine data.
- ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
- ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
- ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix,
actualContent);
+ // Combine data cached by flushContext, and write to BK.
+ ByteBuf prefixByteFuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
Review Comment:
maybe `prefixByteBuf`?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ internalAsyncAddData(data, callback, ctx);
+ } catch (Exception e){
+ log.error("Internal async add data fail", e);
+ }
+ });
}
+ /**
+ * Append data to queue, if reach {@link #batchedWriteMaxRecords} or
{@link #batchedWriteMaxSize}, do flush. And if
+ * accept a request that {@param data} is too large (larger than {@link
#batchedWriteMaxSize}), then two flushes
+ * are executed:
+ * 1. Write the data cached in the queue to BK.
+ * 2. Direct write the large data to BK, this flush event will not
record to Metrics.
+ * This ensures the sequential nature of multiple writes to BK.
+ */
private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
if (state == State.CLOSING || state == State.CLOSED){
callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
return;
}
- int len = dataSerializer.getSerializedSize(data);
- if (len >= batchedWriteMaxSize){
- if (!flushContext.asyncAddArgsList.isEmpty()) {
- doTrigFlush(true, false);
- }
+ int dataLength = dataSerializer.getSerializedSize(data);
+ if (dataLength >= batchedWriteMaxSize){
+ trigFlushByLargeSingleData();
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- // Add data.
- this.dataArray.add(data);
- // Add callback info.
+ // Append data to the data-array.
+ dataArray.add(data);
+ // Append callback to the flushContext.
AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis());
- this.flushContext.asyncAddArgsList.add(asyncAddArgs);
- // Calculate bytes-size.
- this.bytesSize += len;
- // trig flush.
- doTrigFlush(false, false);
+ flushContext.asyncAddArgsList.add(asyncAddArgs);
+ // Calculate bytes size.
+ bytesSize += dataLength;
+ trigFlushIfReachMaxRecordsOrMaxSize();
+ }
+
+ /**
+ * Change to IO thread and do flush, only called by {@link
#timingFlushTask}.
Review Comment:
Ask yourself this please: Can I read the code and understand it without this
comment?
If yes, delete the comment.
If no:
Can I improve the code so it can be understood without the comment?
Yes?
Improve code
No?
Write comment
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ internalAsyncAddData(data, callback, ctx);
+ } catch (Exception e){
+ log.error("Internal async add data fail", e);
+ }
+ });
}
+ /**
+ * Append data to queue, if reach {@link #batchedWriteMaxRecords} or
{@link #batchedWriteMaxSize}, do flush. And if
+ * accept a request that {@param data} is too large (larger than {@link
#batchedWriteMaxSize}), then two flushes
+ * are executed:
+ * 1. Write the data cached in the queue to BK.
+ * 2. Direct write the large data to BK, this flush event will not
record to Metrics.
+ * This ensures the sequential nature of multiple writes to BK.
+ */
private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
if (state == State.CLOSING || state == State.CLOSED){
callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
return;
}
- int len = dataSerializer.getSerializedSize(data);
- if (len >= batchedWriteMaxSize){
- if (!flushContext.asyncAddArgsList.isEmpty()) {
- doTrigFlush(true, false);
- }
+ int dataLength = dataSerializer.getSerializedSize(data);
+ if (dataLength >= batchedWriteMaxSize){
+ trigFlushByLargeSingleData();
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- // Add data.
- this.dataArray.add(data);
- // Add callback info.
+ // Append data to the data-array.
+ dataArray.add(data);
+ // Append callback to the flushContext.
AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis());
- this.flushContext.asyncAddArgsList.add(asyncAddArgs);
- // Calculate bytes-size.
- this.bytesSize += len;
- // trig flush.
- doTrigFlush(false, false);
+ flushContext.asyncAddArgsList.add(asyncAddArgs);
+ // Calculate bytes size.
+ bytesSize += dataLength;
+ trigFlushIfReachMaxRecordsOrMaxSize();
+ }
+
+ /**
+ * Change to IO thread and do flush, only called by {@link
#timingFlushTask}.
+ */
+ private void trigFlushByTimingTask(){
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ if (flushContext.asyncAddArgsList.isEmpty()) {
+ return;
+ }
+ if (metrics != null) {
+
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(),
bytesSize,
+ System.currentTimeMillis() -
flushContext.asyncAddArgsList.get(0).addedTime);
+ }
+ doFlush();
+ } catch (Exception e){
+ log.error("Trig flush by timing task fail.", e);
+ } finally {
+ // Start the next timing task.
+ nextTimingTrigger();
+ }
+ });
+ }
+
+ /**
+ * If reach the thresholds {@link #batchedWriteMaxRecords} or {@link
#batchedWriteMaxSize}, do flush.
+ */
+ private void trigFlushIfReachMaxRecordsOrMaxSize(){
+ if (flushContext.asyncAddArgsList.size() >= batchedWriteMaxRecords) {
Review Comment:
Why `flushContext.asyncAddArgsList.size()` and not `dataArray.size()`? I
mean, it's odd to check the number of callbacks instead of checking plainly the
size of the data queue. If agree, I would replace below as well
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,64 +336,24 @@ private void internalAsyncAddData(T data, AddDataCallback
callback, Object ctx){
}
- /**
- * Trigger write to bookie once, If the conditions are not met, nothing
will be done.
- */
- public void trigFlush(final boolean force, boolean byScheduleThreads){
- singleThreadExecutorForWrite.execute(() -> doTrigFlush(force,
byScheduleThreads));
- }
-
- private void doTrigFlush(boolean force, boolean byScheduleThreads){
- try {
- if (flushContext.asyncAddArgsList.isEmpty()) {
- return;
- }
- if (force) {
- doFlush();
- return;
- }
- if (byScheduleThreads) {
- doFlush();
- return;
- }
- AsyncAddArgs firstAsyncAddArgs =
flushContext.asyncAddArgsList.get(0);
- if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >=
batchedWriteMaxDelayInMillis) {
- doFlush();
- return;
- }
- if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords) {
- doFlush();
- return;
- }
- if (this.bytesSize >= batchedWriteMaxSize) {
- doFlush();
- }
- } finally {
- if (byScheduleThreads) {
- nextTimingTrigger();
- }
- }
- }
-
private void doFlush(){
- // Combine data.
- ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
- ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
- ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix,
actualContent);
+ // Combine data cached by flushContext, and write to BK.
+ ByteBuf prefixByteFuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+ ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+ ByteBuf wholeByteBuf =
Unpooled.wrappedUnmodifiableBuffer(prefixByteFuf, contentByteBuf);
Review Comment:
Why `Unpooled` and not `PulsarByteBufAllocator`?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ internalAsyncAddData(data, callback, ctx);
+ } catch (Exception e){
+ log.error("Internal async add data fail", e);
+ }
+ });
}
+ /**
+ * Append data to queue, if reach {@link #batchedWriteMaxRecords} or
{@link #batchedWriteMaxSize}, do flush. And if
+ * accept a request that {@param data} is too large (larger than {@link
#batchedWriteMaxSize}), then two flushes
+ * are executed:
+ * 1. Write the data cached in the queue to BK.
+ * 2. Direct write the large data to BK, this flush event will not
record to Metrics.
+ * This ensures the sequential nature of multiple writes to BK.
+ */
private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
if (state == State.CLOSING || state == State.CLOSED){
callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
return;
}
- int len = dataSerializer.getSerializedSize(data);
- if (len >= batchedWriteMaxSize){
- if (!flushContext.asyncAddArgsList.isEmpty()) {
- doTrigFlush(true, false);
- }
+ int dataLength = dataSerializer.getSerializedSize(data);
+ if (dataLength >= batchedWriteMaxSize){
+ trigFlushByLargeSingleData();
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- // Add data.
- this.dataArray.add(data);
- // Add callback info.
+ // Append data to the data-array.
+ dataArray.add(data);
+ // Append callback to the flushContext.
AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis());
- this.flushContext.asyncAddArgsList.add(asyncAddArgs);
- // Calculate bytes-size.
- this.bytesSize += len;
- // trig flush.
- doTrigFlush(false, false);
+ flushContext.asyncAddArgsList.add(asyncAddArgs);
+ // Calculate bytes size.
+ bytesSize += dataLength;
+ trigFlushIfReachMaxRecordsOrMaxSize();
+ }
+
+ /**
+ * Change to IO thread and do flush, only called by {@link
#timingFlushTask}.
+ */
+ private void trigFlushByTimingTask(){
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ if (flushContext.asyncAddArgsList.isEmpty()) {
+ return;
+ }
+ if (metrics != null) {
+
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(),
bytesSize,
+ System.currentTimeMillis() -
flushContext.asyncAddArgsList.get(0).addedTime);
+ }
+ doFlush();
+ } catch (Exception e){
+ log.error("Trig flush by timing task fail.", e);
+ } finally {
+ // Start the next timing task.
+ nextTimingTrigger();
Review Comment:
As I mentioned in the previous PR, it seems that you just implemented
`ScheduledExecutorService.scheduleWithFixedDelay` by yourself using Timer.
Any reason not to replace it with scheduleWithFixedDelay?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ internalAsyncAddData(data, callback, ctx);
+ } catch (Exception e){
+ log.error("Internal async add data fail", e);
+ }
+ });
}
+ /**
+ * Append data to queue, if reach {@link #batchedWriteMaxRecords} or
{@link #batchedWriteMaxSize}, do flush. And if
+ * accept a request that {@param data} is too large (larger than {@link
#batchedWriteMaxSize}), then two flushes
+ * are executed:
+ * 1. Write the data cached in the queue to BK.
+ * 2. Direct write the large data to BK, this flush event will not
record to Metrics.
+ * This ensures the sequential nature of multiple writes to BK.
+ */
private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
if (state == State.CLOSING || state == State.CLOSED){
callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
return;
}
- int len = dataSerializer.getSerializedSize(data);
- if (len >= batchedWriteMaxSize){
- if (!flushContext.asyncAddArgsList.isEmpty()) {
- doTrigFlush(true, false);
- }
+ int dataLength = dataSerializer.getSerializedSize(data);
+ if (dataLength >= batchedWriteMaxSize){
+ trigFlushByLargeSingleData();
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- // Add data.
- this.dataArray.add(data);
- // Add callback info.
+ // Append data to the data-array.
+ dataArray.add(data);
+ // Append callback to the flushContext.
Review Comment:
Maybe
```java
flushContext.addCallback(callback, ctx)
```
instead of
```java
// Append callback to the flushContext.
AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis());
flushContext.asyncAddArgsList.add(asyncAddArgs);
```
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback
callback, Object ctx){
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ internalAsyncAddData(data, callback, ctx);
+ } catch (Exception e){
+ log.error("Internal async add data fail", e);
+ }
+ });
}
+ /**
+ * Append data to queue, if reach {@link #batchedWriteMaxRecords} or
{@link #batchedWriteMaxSize}, do flush. And if
+ * accept a request that {@param data} is too large (larger than {@link
#batchedWriteMaxSize}), then two flushes
+ * are executed:
+ * 1. Write the data cached in the queue to BK.
+ * 2. Direct write the large data to BK, this flush event will not
record to Metrics.
+ * This ensures the sequential nature of multiple writes to BK.
+ */
private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
if (state == State.CLOSING || state == State.CLOSED){
callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
return;
}
- int len = dataSerializer.getSerializedSize(data);
- if (len >= batchedWriteMaxSize){
- if (!flushContext.asyncAddArgsList.isEmpty()) {
- doTrigFlush(true, false);
- }
+ int dataLength = dataSerializer.getSerializedSize(data);
+ if (dataLength >= batchedWriteMaxSize){
+ trigFlushByLargeSingleData();
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- // Add data.
- this.dataArray.add(data);
- // Add callback info.
+ // Append data to the data-array.
+ dataArray.add(data);
+ // Append callback to the flushContext.
AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis());
- this.flushContext.asyncAddArgsList.add(asyncAddArgs);
- // Calculate bytes-size.
- this.bytesSize += len;
- // trig flush.
- doTrigFlush(false, false);
+ flushContext.asyncAddArgsList.add(asyncAddArgs);
+ // Calculate bytes size.
+ bytesSize += dataLength;
+ trigFlushIfReachMaxRecordsOrMaxSize();
+ }
+
+ /**
+ * Change to IO thread and do flush, only called by {@link
#timingFlushTask}.
+ */
+ private void trigFlushByTimingTask(){
+ singleThreadExecutorForWrite.execute(() -> {
+ try {
+ if (flushContext.asyncAddArgsList.isEmpty()) {
+ return;
+ }
+ if (metrics != null) {
+
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(),
bytesSize,
+ System.currentTimeMillis() -
flushContext.asyncAddArgsList.get(0).addedTime);
+ }
+ doFlush();
+ } catch (Exception e){
+ log.error("Trig flush by timing task fail.", e);
Review Comment:
Here you chose not to notify the callbacks because you assume the queue is
still with elements, next timer task will flush them? @codelipenghui WDYT?
I'm comparing with this
```java
if (State.CLOSING == state || State.CLOSED == state){
failureCallbackByContextAndRecycle(flushContext,
BUFFERED_WRITER_CLOSED_EXCEPTION);
```
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,64 +336,24 @@ private void internalAsyncAddData(T data, AddDataCallback
callback, Object ctx){
}
- /**
- * Trigger write to bookie once, If the conditions are not met, nothing
will be done.
- */
- public void trigFlush(final boolean force, boolean byScheduleThreads){
- singleThreadExecutorForWrite.execute(() -> doTrigFlush(force,
byScheduleThreads));
- }
-
- private void doTrigFlush(boolean force, boolean byScheduleThreads){
- try {
- if (flushContext.asyncAddArgsList.isEmpty()) {
- return;
- }
- if (force) {
- doFlush();
- return;
- }
- if (byScheduleThreads) {
- doFlush();
- return;
- }
- AsyncAddArgs firstAsyncAddArgs =
flushContext.asyncAddArgsList.get(0);
- if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >=
batchedWriteMaxDelayInMillis) {
- doFlush();
- return;
- }
- if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords) {
- doFlush();
- return;
- }
- if (this.bytesSize >= batchedWriteMaxSize) {
- doFlush();
- }
- } finally {
- if (byScheduleThreads) {
- nextTimingTrigger();
- }
- }
- }
-
private void doFlush(){
- // Combine data.
- ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
- ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
- ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix,
actualContent);
+ // Combine data cached by flushContext, and write to BK.
+ ByteBuf prefixByteFuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+ ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+ ByteBuf wholeByteBuf =
Unpooled.wrappedUnmodifiableBuffer(prefixByteFuf, contentByteBuf);
// We need to release this pairByteBuf after Managed ledger async add
callback. Just holds by FlushContext.
Review Comment:
You know what's the biggest problem with lengthy comments? When you
refactor, you usually don't change the comments, so they stay out of sync with
the code.
Look here, you called pairByteBuf, but not it's `wholeByteBuf` so the reader
wouldn't really understand this comment.
You're hiding the release of the buffer, created right here in this method,
inside FlushContext.recycle() called from addComplete.
I suggest you create new callback which calls the original callback but also
releases the `wholeByteBuf` so I can see the code that releases this object
right next it's creation.
and the delete this comment, which is also *duplicated* in
```java
/**
* If turning on the Batch feature, we need to release the byteBuf
produced by
* {@link DataSerializer#serialize(ArrayList)} when Managed ledger
async add callback.
* Only carry the ByteBuf objects, no other use.
*/
```
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +610,371 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
}
}
+ private static class RandomLenSumStrDataSerializer extends
SumStrDataSerializer {
+
+ @Getter
+ private int totalSize;
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ int size = new Random().nextInt(9);
+ totalSize += size;
+ return size;
+ }
+ }
+
+ private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+ private final int len1;
+
+ private final int len2;
+
+ private AtomicBoolean useLen2 = new AtomicBoolean();
+
+ public TwoLenSumDataSerializer(int len1, int len2){
+ this.len1 = len1;
+ this.len2 = len2;
+ }
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ boolean b = useLen2.get();
+ useLen2.set(!b);
+ return b ? len2 : len1;
+ }
+ }
+
public enum BookieErrorType{
NO_ERROR,
ALWAYS_ERROR,
SOMETIMES_ERROR;
}
+
+ /**
+ * Test Transaction buffered writer stats when disabled batch feature.
+ */
+ @Test
+ public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+ TxnLogBufferedWriterMetricsStats metricsStats = new
TxnLogBufferedWriterMetricsStats(
+ metricsPrefix, metricsLabelNames, metricsLabelValues,
CollectorRegistry.defaultRegistry
+ );
+ ManagedLedger managedLedger = factory.open("tx_test_ledger");
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(5).name("txn-threads").build();
+ // Create TxnLogBufferedWriter.
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ var dataSerializer = new RandomLenSumStrDataSerializer();
+ var txnLogBufferedWriter = new TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor, transactionTimer,
+ dataSerializer, Integer.MAX_VALUE, Integer.MAX_VALUE,
+ Integer.MAX_VALUE, false, metricsStats);
+ // Add some data.
+ int writeCount = 1000;
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ // Assert metrics stat.
+ verifyTheHistogramMetrics(0, 0, 0);
+ // cleanup.
+ txnLogBufferedWriter.close();
+ metricsStats.close();
+ transactionTimer.stop();
+ orderedExecutor.shutdown();
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxRecordCount() throws
Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxRecords = 2;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / batchedWriteMaxRecords;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, batchedWriteMaxRecords, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(expectedBatchFlushCount,0,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxSize() throws Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxSize = 16;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / (batchedWriteMaxSize /
dataSerializer.getSizePerData());
+ int expectedTotalBytesSize = expectedBatchFlushCount *
batchedWriteMaxSize;
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, Integer.MAX_VALUE, batchedWriteMaxSize,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(0, expectedBatchFlushCount,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxDelayTime() throws Exception
{
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int writeCount = 100;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
+ createTxnBufferedWriterContextWithMetrics(dataSerializer,
Integer.MAX_VALUE,
Review Comment:
I can't guess what is 1, which is the most important number here. Let's name
it: maxBatchDelaySeconds
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,64 +336,24 @@ private void internalAsyncAddData(T data, AddDataCallback
callback, Object ctx){
}
- /**
- * Trigger write to bookie once, If the conditions are not met, nothing
will be done.
- */
- public void trigFlush(final boolean force, boolean byScheduleThreads){
- singleThreadExecutorForWrite.execute(() -> doTrigFlush(force,
byScheduleThreads));
- }
-
- private void doTrigFlush(boolean force, boolean byScheduleThreads){
- try {
- if (flushContext.asyncAddArgsList.isEmpty()) {
- return;
- }
- if (force) {
- doFlush();
- return;
- }
- if (byScheduleThreads) {
- doFlush();
- return;
- }
- AsyncAddArgs firstAsyncAddArgs =
flushContext.asyncAddArgsList.get(0);
- if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >=
batchedWriteMaxDelayInMillis) {
- doFlush();
- return;
- }
- if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords) {
- doFlush();
- return;
- }
- if (this.bytesSize >= batchedWriteMaxSize) {
- doFlush();
- }
- } finally {
- if (byScheduleThreads) {
- nextTimingTrigger();
- }
- }
- }
-
private void doFlush(){
- // Combine data.
- ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
- ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
- ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix,
actualContent);
+ // Combine data cached by flushContext, and write to BK.
+ ByteBuf prefixByteFuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+ ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+ ByteBuf wholeByteBuf =
Unpooled.wrappedUnmodifiableBuffer(prefixByteFuf, contentByteBuf);
// We need to release this pairByteBuf after Managed ledger async add
callback. Just holds by FlushContext.
- this.flushContext.byteBuf = pairByteBuf;
- // Flush.
+ flushContext.byteBuf = wholeByteBuf;
if (State.CLOSING == state || State.CLOSED == state){
failureCallbackByContextAndRecycle(flushContext,
BUFFERED_WRITER_CLOSED_EXCEPTION);
} else {
- managedLedger.asyncAddEntry(pairByteBuf, this, this.flushContext);
+ managedLedger.asyncAddEntry(wholeByteBuf, this, flushContext);
}
- // Clear buffers.ok
- this.dataArray.clear();
- this.flushContext = FlushContext.newInstance();
- this.bytesSize = 0;
+ // Reset the cache.
Review Comment:
Redundant comment
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java:
##########
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.io.Closeable;
+import java.util.HashMap;
+
+/***
+ * Describes the working status of the {@link TxnLogBufferedWriter}, helps
users tune the thresholds of
+ * {@link TxnLogBufferedWriter} for best performance.
+ * Note-1: When batch feature is turned off, no data is logged at this. In
this scenario,users can see the
+ * {@link org.apache.bookkeeper.mledger.ManagedLedgerMXBean}.
+ * Note-2: Even if enable batch feature, if a single record is too big, it
still directly write to Bookie without batch,
+ * property {@link #pulsarBatchedLogTriggeringCountByForce} can indicate
this case. But this case will not affect
+ * other metrics, because it would obscure the real situation. E.g. there
has two record:
Review Comment:
IMO this explanation is confusing and doesn't serve what you truly want to
convey.
I would add the following explanation:
Note-2: A batch has numerous triggers. The metrics in this class count each
type of trigger to allow you to diagnose what mostly causing a batch flush. The
metric also includes a histogram for delay of batch since 1st record entered,
the size of batch in bytes number of records in batch. This will help you to
tune the parameters that control some of the batch flush triggers: maxDelay,
maxRecords, maxSize.
Note that the 4th trigger - a single record larger than batch size -
triggers a flush of the current batch, but the big record itself is not written
in batch hence is not included in the batch metrics written above (batch size,
batch delay, etc). The trigger is of course counter as other trigger types.
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,64 +336,24 @@ private void internalAsyncAddData(T data, AddDataCallback
callback, Object ctx){
}
- /**
- * Trigger write to bookie once, If the conditions are not met, nothing
will be done.
- */
- public void trigFlush(final boolean force, boolean byScheduleThreads){
- singleThreadExecutorForWrite.execute(() -> doTrigFlush(force,
byScheduleThreads));
- }
-
- private void doTrigFlush(boolean force, boolean byScheduleThreads){
- try {
- if (flushContext.asyncAddArgsList.isEmpty()) {
- return;
- }
- if (force) {
- doFlush();
- return;
- }
- if (byScheduleThreads) {
- doFlush();
- return;
- }
- AsyncAddArgs firstAsyncAddArgs =
flushContext.asyncAddArgsList.get(0);
- if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >=
batchedWriteMaxDelayInMillis) {
- doFlush();
- return;
- }
- if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords) {
- doFlush();
- return;
- }
- if (this.bytesSize >= batchedWriteMaxSize) {
- doFlush();
- }
- } finally {
- if (byScheduleThreads) {
- nextTimingTrigger();
- }
- }
- }
-
private void doFlush(){
- // Combine data.
- ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
- prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
- ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
- ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix,
actualContent);
+ // Combine data cached by flushContext, and write to BK.
+ ByteBuf prefixByteFuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+ prefixByteFuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+ ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+ ByteBuf wholeByteBuf =
Unpooled.wrappedUnmodifiableBuffer(prefixByteFuf, contentByteBuf);
// We need to release this pairByteBuf after Managed ledger async add
callback. Just holds by FlushContext.
- this.flushContext.byteBuf = pairByteBuf;
- // Flush.
+ flushContext.byteBuf = wholeByteBuf;
if (State.CLOSING == state || State.CLOSED == state){
failureCallbackByContextAndRecycle(flushContext,
BUFFERED_WRITER_CLOSED_EXCEPTION);
} else {
- managedLedger.asyncAddEntry(pairByteBuf, this, this.flushContext);
+ managedLedger.asyncAddEntry(wholeByteBuf, this, flushContext);
}
- // Clear buffers.ok
- this.dataArray.clear();
- this.flushContext = FlushContext.newInstance();
- this.bytesSize = 0;
+ // Reset the cache.
+ dataArray.clear();
+ flushContext = FlushContext.newInstance();
Review Comment:
I have to say something.
At first, when I saw this line, I was sure there was a bug. How can it be
that you are creating a new flush context without recycling the existing flush
context - you are overwriting the reference - you're going to lose the
reference, it must be a bug!
So I started reading the code. It took me quite some time to figure out
`this` in `managedLedger.asyncAddEntry()` a couple of lines above it, is
actually a reference to a callback. Why the buffered writer also acts as a
callback? Why not create a callback class and reference it here, so it will be
obvious? where to search.
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +610,371 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
}
}
+ private static class RandomLenSumStrDataSerializer extends
SumStrDataSerializer {
+
+ @Getter
+ private int totalSize;
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ int size = new Random().nextInt(9);
+ totalSize += size;
+ return size;
+ }
+ }
+
+ private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+ private final int len1;
+
+ private final int len2;
+
+ private AtomicBoolean useLen2 = new AtomicBoolean();
+
+ public TwoLenSumDataSerializer(int len1, int len2){
+ this.len1 = len1;
+ this.len2 = len2;
+ }
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ boolean b = useLen2.get();
+ useLen2.set(!b);
+ return b ? len2 : len1;
+ }
+ }
+
public enum BookieErrorType{
NO_ERROR,
ALWAYS_ERROR,
SOMETIMES_ERROR;
}
+
+ /**
+ * Test Transaction buffered writer stats when disabled batch feature.
+ */
+ @Test
+ public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+ TxnLogBufferedWriterMetricsStats metricsStats = new
TxnLogBufferedWriterMetricsStats(
+ metricsPrefix, metricsLabelNames, metricsLabelValues,
CollectorRegistry.defaultRegistry
+ );
+ ManagedLedger managedLedger = factory.open("tx_test_ledger");
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(5).name("txn-threads").build();
+ // Create TxnLogBufferedWriter.
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ var dataSerializer = new RandomLenSumStrDataSerializer();
+ var txnLogBufferedWriter = new TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor, transactionTimer,
+ dataSerializer, Integer.MAX_VALUE, Integer.MAX_VALUE,
+ Integer.MAX_VALUE, false, metricsStats);
+ // Add some data.
+ int writeCount = 1000;
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ // Assert metrics stat.
+ verifyTheHistogramMetrics(0, 0, 0);
+ // cleanup.
+ txnLogBufferedWriter.close();
+ metricsStats.close();
+ transactionTimer.stop();
+ orderedExecutor.shutdown();
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxRecordCount() throws
Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxRecords = 2;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / batchedWriteMaxRecords;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, batchedWriteMaxRecords, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(expectedBatchFlushCount,0,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxSize() throws Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxSize = 16;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / (batchedWriteMaxSize /
dataSerializer.getSizePerData());
+ int expectedTotalBytesSize = expectedBatchFlushCount *
batchedWriteMaxSize;
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, Integer.MAX_VALUE, batchedWriteMaxSize,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(0, expectedBatchFlushCount,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxDelayTime() throws Exception
{
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int writeCount = 100;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
+ createTxnBufferedWriterContextWithMetrics(dataSerializer,
Integer.MAX_VALUE,
+ Integer.MAX_VALUE, 1);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ Thread.sleep(1);
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ verifyTheCounterMetrics(0,0, actualBatchFlushCount,0);
+ verifyTheHistogramMetrics(actualBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByLargeSingleData() throws
Exception {
+ // Use TwoLenSumDataSerializer for: write a little data once, then
write a large data once.
+ int bytesSizePerRecordWhichInBatch = 4;
+ int batchedWriteMaxSize = 1024;
+ TwoLenSumDataSerializer dataSerializer =
+ new TwoLenSumDataSerializer(bytesSizePerRecordWhichInBatch,
batchedWriteMaxSize);
+ int writeCount = 100;
+ int singleLargeDataRequestCount = writeCount / 2;
+ int expectedBatchFlushTriggeredByLargeData =
singleLargeDataRequestCount;
+ int expectedTotalBytesSize = expectedBatchFlushTriggeredByLargeData *
bytesSizePerRecordWhichInBatch;
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, Integer.MAX_VALUE, batchedWriteMaxSize,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
i);
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
Review Comment:
Where are you checking that you wrote to managed ledger writeCount times?
I think this test is hard to understand.
I would do:
```
createBufferedWriter(maxBatchSize = batchedWriteMaxSize, maxRecords =
MAX_INT, maxDelay = MAX_INT)
writeToBuffer(randomRecord(size = batchedWriteMaxSize/10)
writeToBuffer(randomRecord(size = batchedWriteMaxSize/10)
writeToBuffer(randomRecord(size = maxBatchSize)
waitForAllWriteToCompleteSuccessfully()
assertThat(actualNumOfFlushes, 1)
assertThat(actualWritesToManagedLedger,2)
```
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -116,6 +116,16 @@
AtomicReferenceFieldUpdater
.newUpdater(TxnLogBufferedWriter.class,
TxnLogBufferedWriter.State.class, "state");
+ /** Metrics. **/
+ private final TxnLogBufferedWriterMetricsStats metricsStats;
+
+ public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor
orderedExecutor, Timer timer,
Review Comment:
Your comment
>Yes, everything works, except metrics
I asked if you will remove the `if metrics != null` statements in the next
PRs.
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -613,9 +610,371 @@ public ByteBuf serialize(ArrayList<Integer> dataArray) {
}
}
+ private static class RandomLenSumStrDataSerializer extends
SumStrDataSerializer {
+
+ @Getter
+ private int totalSize;
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ int size = new Random().nextInt(9);
+ totalSize += size;
+ return size;
+ }
+ }
+
+ private static class TwoLenSumDataSerializer extends JsonDataSerializer {
+
+ private final int len1;
+
+ private final int len2;
+
+ private AtomicBoolean useLen2 = new AtomicBoolean();
+
+ public TwoLenSumDataSerializer(int len1, int len2){
+ this.len1 = len1;
+ this.len2 = len2;
+ }
+
+ /**
+ * After the test, when {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxRecords()} = 256
+ * and {@link TxnLogBufferedWriterConfig#getBatchedWriteMaxSize()} =
1024,
+ * and {@link
TxnLogBufferedWriterConfig#getBatchedWriteMaxDelayInMillis()} = 1, the
random-size baseline
+ * was set as 9, and there was maximum probability that all three
thresholds could be hit.
+ */
+ @Override
+ public int getSerializedSize(Integer data) {
+ boolean b = useLen2.get();
+ useLen2.set(!b);
+ return b ? len2 : len1;
+ }
+ }
+
public enum BookieErrorType{
NO_ERROR,
ALWAYS_ERROR,
SOMETIMES_ERROR;
}
+
+ /**
+ * Test Transaction buffered writer stats when disabled batch feature.
+ */
+ @Test
+ public void testMetricsStatsWhenDisabledBatchFeature() throws Exception {
+ TxnLogBufferedWriterMetricsStats metricsStats = new
TxnLogBufferedWriterMetricsStats(
+ metricsPrefix, metricsLabelNames, metricsLabelValues,
CollectorRegistry.defaultRegistry
+ );
+ ManagedLedger managedLedger = factory.open("tx_test_ledger");
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(5).name("txn-threads").build();
+ // Create TxnLogBufferedWriter.
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ var dataSerializer = new RandomLenSumStrDataSerializer();
+ var txnLogBufferedWriter = new TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor, transactionTimer,
+ dataSerializer, Integer.MAX_VALUE, Integer.MAX_VALUE,
+ Integer.MAX_VALUE, false, metricsStats);
+ // Add some data.
+ int writeCount = 1000;
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all data write finish.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ // Assert metrics stat.
+ verifyTheHistogramMetrics(0, 0, 0);
+ // cleanup.
+ txnLogBufferedWriter.close();
+ metricsStats.close();
+ transactionTimer.stop();
+ orderedExecutor.shutdown();
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxRecordCount() throws
Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxRecords = 2;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / batchedWriteMaxRecords;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ // Create callback with counter.
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, batchedWriteMaxRecords, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(expectedBatchFlushCount,0,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxSize() throws Exception {
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int batchedWriteMaxSize = 16;
+ int writeCount = 100;
+ int expectedBatchFlushCount = writeCount / (batchedWriteMaxSize /
dataSerializer.getSizePerData());
+ int expectedTotalBytesSize = expectedBatchFlushCount *
batchedWriteMaxSize;
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
createTxnBufferedWriterContextWithMetrics(
+ dataSerializer, Integer.MAX_VALUE, batchedWriteMaxSize,
Integer.MAX_VALUE);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ }
+ // Wait for all write finish.
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(
+ () -> callbackWithCounter.finishCounter.get() +
callbackWithCounter.failureCounter.get() == writeCount
+ );
+ int actualBatchFlushCount =
txnLogBufferedWriterContext.mockedManagedLedger.writeCounter.get();
+ assertEquals(callbackWithCounter.failureCounter.get(), 0);
+ assertEquals(expectedBatchFlushCount, actualBatchFlushCount);
+ verifyTheCounterMetrics(0, expectedBatchFlushCount,0,0);
+ verifyTheHistogramMetrics(expectedBatchFlushCount, writeCount,
expectedTotalBytesSize);
+ // cleanup.
+ releaseTxnLogBufferedWriterContext(txnLogBufferedWriterContext);
+ // after close, verify the metrics change to 0.
+ verifyTheCounterMetrics(0,0,0,0);
+ verifyTheHistogramMetrics(0,0,0);
+ }
+
+ @Test
+ public void testMetricsStatsThatTriggeredByMaxDelayTime() throws Exception
{
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ int writeCount = 100;
+ int expectedTotalBytesSize = writeCount *
dataSerializer.getSizePerData();
+ var callbackWithCounter = createCallBackWithCounter();
+ // Create TxnLogBufferedWriter.
+ var txnLogBufferedWriterContext =
+ createTxnBufferedWriterContextWithMetrics(dataSerializer,
Integer.MAX_VALUE,
+ Integer.MAX_VALUE, 1);
+ var txnLogBufferedWriter =
txnLogBufferedWriterContext.txnLogBufferedWriter;
+ // Add some data.
+ for (int i = 0; i < writeCount; i++){
+ txnLogBufferedWriter.asyncAddData(1, callbackWithCounter.callback,
"");
+ Thread.sleep(1);
Review Comment:
Why do you need this sleep at all?
Just write 100 records, and you wait for the 1 second timeout to pass
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java:
##########
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.io.Closeable;
+import java.util.HashMap;
+
+/***
+ * Describes the working status of the {@link TxnLogBufferedWriter}, helps
users tune the thresholds of
+ * {@link TxnLogBufferedWriter} for best performance.
+ * Note-1: When batch feature is turned off, no data is logged at this. In
this scenario,users can see the
+ * {@link org.apache.bookkeeper.mledger.ManagedLedgerMXBean}.
+ * Note-2: Even if enable batch feature, if a single record is too big, it
still directly write to Bookie without batch,
+ * property {@link #pulsarBatchedLogTriggeringCountByForce} can indicate
this case. But this case will not affect
+ * other metrics, because it would obscure the real situation. E.g. there
has two record:
+ * [{recordsCount=512, triggerByMaxRecordCount}, {recordCount=1,
triggerByTooLarge}], we should not tell the users
+ * that the average batch records is 256, if the users knows that there are
only 256 records per batch, then users
+ * will try to increase {@link
TxnLogBufferedWriter#batchedWriteMaxDelayInMillis} so that there is more data
per
+ * batch to improve throughput, but that does not work.
+ */
+public class TxnLogBufferedWriterMetricsStats implements Closeable {
+
+ /**
+ * Key is the name we used to create {@link
TxnLogBufferedWriterMetricsStats}, and now there are two kinds:
+ * ["pulsar_txn_tc_log", "pulsar_txn_tc_batched_log"]. There can be
multiple labels in each
+ * {@link TxnLogBufferedWriterMetricsStats}, such as The Transaction
Coordinator using coordinatorId as label and
+ * The Transaction Pending Ack Store using subscriptionName as label.
+ */
+ private static final HashMap<String, Collector> COLLECTOR_CACHE = new
HashMap<>();
Review Comment:
Either what you wrote is completely the wrong direction or I completely
misunderstood you :)
I thought that in MLTransactionLogImp, when open a ledger you also create a
buffered writer, thus in here you will also pass an instance of
`TxnLogBufferedWriterMetricsStats`.
```java
public void openLedgerComplete(ManagedLedger ledger,
Object ctx) {
MLTransactionLogImpl.this.managedLedger = ledger;
MLTransactionLogImpl.this.bufferedWriter = new
TxnLogBufferedWriter<>(
managedLedger, ((ManagedLedgerImpl)
managedLedger).getExecutor(),
timer, TransactionLogDataSerializer.INSTANCE,
txnLogBufferedWriterConfig.getBatchedWriteMaxRecords(),
txnLogBufferedWriterConfig.getBatchedWriteMaxSize(),
txnLogBufferedWriterConfig.getBatchedWriteMaxDelayInMillis(),
txnLogBufferedWriterConfig.isBatchEnabled());
```
As I understand, this only happens once per instance of
`MLTransactionLogImpl`. So I don't understand why you want to pass metric by
metric. Something doesn't add up here.
I guess my main question is: how many `MLTransactionLogImpl` are there in a
single broker process? More than 1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]