asafm commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r945512673


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -143,6 +153,10 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, 
OrderedExecutor ordered
                                 DataSerializer<T> dataSerializer,
                                 int batchedWriteMaxRecords, int 
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
                                 boolean batchEnabled, 
TxnLogBufferedWriterMetricsStats metrics){
+        if (batchedWriteMaxRecords <= 1 && batchEnabled){
+            log.warn("The current maximum number of records per batch is set 
to 1. Disabling Batch will improve"

Review Comment:
   Perhaps "Transaction Log Buffered Writer has batching enabled yet the 
maximum batch size was configured to less than or equal to 1 record, hence due 
to performance reasons batching is disabled"
   
   Could it be they wouldn't know per that log line which configuration to 
change since this can be used in Pending Ack Store and TxLog?
   



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -217,9 +224,9 @@ public void asyncAddData(T data, AddDataCallback callback, 
Object ctx){
                 // Avoid missing callback, do failed callback when error occur 
before add data to the array.
                 int recordsCountAfter = dataArray.size();
                 if (recordsCountAfter == recordsCountBeforeAdd){
-                    callback.addFailed(new 
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);
+                    callback.addFailed(new ManagedLedgerInterceptException(e), 
ctx);

Review Comment:
   Your condition above to only call the callback if number of records in data 
array haven't changed has a bug.
   Look in internalAsyncAddData:
   ```java
           if (dataLength >= batchedWriteMaxSize){
               trigFlushByLargeSingleData();
               ByteBuf byteBuf = dataSerializer.serialize(data);
               managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
                       AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
               return;
           }
   ```
   What happened if asyncAddEntry fails from some reason? You lose the records 
`data` and you're notifying this to the callback



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -69,6 +70,10 @@
                     new Exception("Transaction log buffered write has closed")
             );
 
+    private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter, 
TxnLogBufferedWriter.State> STATE_UPDATER =

Review Comment:
   Something doesn't add up here. In order to prevent race conditions upon 
updates to `state` STATE_UPDATER was created. Why then some updates are done 
without it?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -143,6 +153,10 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, 
OrderedExecutor ordered
                                 DataSerializer<T> dataSerializer,
                                 int batchedWriteMaxRecords, int 
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
                                 boolean batchEnabled, 
TxnLogBufferedWriterMetricsStats metrics){
+        if (batchedWriteMaxRecords <= 1 && batchEnabled){
+            log.warn("The current maximum number of records per batch is set 
to 1. Disabling Batch will improve"

Review Comment:
   Thinking out loud - @codelipenghui - do you think it's better to do this 
type of validation and change when processing configuration?
   



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -217,9 +224,9 @@ public void asyncAddData(T data, AddDataCallback callback, 
Object ctx){
                 // Avoid missing callback, do failed callback when error occur 
before add data to the array.
                 int recordsCountAfter = dataArray.size();
                 if (recordsCountAfter == recordsCountBeforeAdd){
-                    callback.addFailed(new 
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);
+                    callback.addFailed(new ManagedLedgerInterceptException(e), 
ctx);

Review Comment:
   I don't understand why you chose the Intercept exception - this is not the 
error you're witnessing. You tried to asynchronously add data to the buffer and 
failed. Why not create a new exception if you can't find a similar exception?
   @codelipenghui WDYT?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -217,9 +224,9 @@ public void asyncAddData(T data, AddDataCallback callback, 
Object ctx){
                 // Avoid missing callback, do failed callback when error occur 
before add data to the array.
                 int recordsCountAfter = dataArray.size();
                 if (recordsCountAfter == recordsCountBeforeAdd){
-                    callback.addFailed(new 
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);
+                    callback.addFailed(new ManagedLedgerInterceptException(e), 
ctx);

Review Comment:
   If you're notifying through the callback I'm not sure we actually need a log 
line in this case. I expect the callback to notify the log, no?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to