asafm commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r945512673
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -143,6 +153,10 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger,
OrderedExecutor ordered
DataSerializer<T> dataSerializer,
int batchedWriteMaxRecords, int
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
boolean batchEnabled,
TxnLogBufferedWriterMetricsStats metrics){
+ if (batchedWriteMaxRecords <= 1 && batchEnabled){
+ log.warn("The current maximum number of records per batch is set
to 1. Disabling Batch will improve"
Review Comment:
Perhaps "Transaction Log Buffered Writer has batching enabled yet the
maximum batch size was configured to less than or equal to 1 record, hence due
to performance reasons batching is disabled"
Could it be they wouldn't know per that log line which configuration to
change since this can be used in Pending Ack Store and TxLog?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -217,9 +224,9 @@ public void asyncAddData(T data, AddDataCallback callback,
Object ctx){
// Avoid missing callback, do failed callback when error occur
before add data to the array.
int recordsCountAfter = dataArray.size();
if (recordsCountAfter == recordsCountBeforeAdd){
- callback.addFailed(new
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);
+ callback.addFailed(new ManagedLedgerInterceptException(e),
ctx);
Review Comment:
Your condition above to only call the callback if number of records in data
array haven't changed has a bug.
Look in internalAsyncAddData:
```java
if (dataLength >= batchedWriteMaxSize){
trigFlushByLargeSingleData();
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
```
What happened if asyncAddEntry fails from some reason? You lose the records
`data` and you're notifying this to the callback
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -69,6 +70,10 @@
new Exception("Transaction log buffered write has closed")
);
+ private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter,
TxnLogBufferedWriter.State> STATE_UPDATER =
Review Comment:
Something doesn't add up here. In order to prevent race conditions upon
updates to `state` STATE_UPDATER was created. Why then some updates are done
without it?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -143,6 +153,10 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger,
OrderedExecutor ordered
DataSerializer<T> dataSerializer,
int batchedWriteMaxRecords, int
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
boolean batchEnabled,
TxnLogBufferedWriterMetricsStats metrics){
+ if (batchedWriteMaxRecords <= 1 && batchEnabled){
+ log.warn("The current maximum number of records per batch is set
to 1. Disabling Batch will improve"
Review Comment:
Thinking out loud - @codelipenghui - do you think it's better to do this
type of validation and change when processing configuration?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -217,9 +224,9 @@ public void asyncAddData(T data, AddDataCallback callback,
Object ctx){
// Avoid missing callback, do failed callback when error occur
before add data to the array.
int recordsCountAfter = dataArray.size();
if (recordsCountAfter == recordsCountBeforeAdd){
- callback.addFailed(new
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);
+ callback.addFailed(new ManagedLedgerInterceptException(e),
ctx);
Review Comment:
I don't understand why you chose the Intercept exception - this is not the
error you're witnessing. You tried to asynchronously add data to the buffer and
failed. Why not create a new exception if you can't find a similar exception?
@codelipenghui WDYT?
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -217,9 +224,9 @@ public void asyncAddData(T data, AddDataCallback callback,
Object ctx){
// Avoid missing callback, do failed callback when error occur
before add data to the array.
int recordsCountAfter = dataArray.size();
if (recordsCountAfter == recordsCountBeforeAdd){
- callback.addFailed(new
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);
+ callback.addFailed(new ManagedLedgerInterceptException(e),
ctx);
Review Comment:
If you're notifying through the callback I'm not sure we actually need a log
line in this case. I expect the callback to notify the log, no?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]