poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r943513656


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -210,9 +210,15 @@ public void asyncAddData(T data, AddDataCallback callback, 
Object ctx){
             return;
         }
         singleThreadExecutorForWrite.execute(() -> {
+            int recordsCountBeforeAdd = dataArray.size();
             try {
                 internalAsyncAddData(data, callback, ctx);
             } catch (Exception e){
+                // Avoid missing callback, do failed callback when error occur 
before add data to the array.
+                int recordsCountAfter = dataArray.size();
+                if (recordsCountAfter == recordsCountBeforeAdd){
+                    callback.addFailed(new 
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);

Review Comment:
   Hi @asafm 
   
   > Can you please explain why did you choose the Fenced exception for this 
specific error which you don't know its nature?
   
   I'm sorry I misunderstood. You are correct, should not choose to use the 
Fenced exception here. 
   
   > I'm not referring to static variable, but to add an import static statement
   
   already fixed, thanks.
   
   Hi @codelipenghui 
   
   > we should not throw ManagedLedgerException out of ManagedLedger.
   IMO, we should use PersistenceException instead.
   
   Could I use `ManagedLedgerException.ManagedLedgerInterceptException` here? 
because: 
   - `MlTransactionLogImpl` and `MlPendingAckStore` already used 
`ManagedLedgerException` because they originally use the `void 
asyncAddEntry(byte[] data, AddEntryCallback callback, Object ctx)` method
   - TxnBufferedWriter cannot access the `PersistenceException` because module 
`pulsar-transaction` can not import the dependency `broker`
   
   



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                internalAsyncAddData(data, callback, ctx);
+            } catch (Exception e){
+                log.error("Internal async add data fail", e);

Review Comment:
   Already changed it to `Failed to add data asynchronously`



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -217,9 +224,9 @@ public void asyncAddData(T data, AddDataCallback callback, 
Object ctx){
                 // Avoid missing callback, do failed callback when error occur 
before add data to the array.
                 int recordsCountAfter = dataArray.size();
                 if (recordsCountAfter == recordsCountBeforeAdd){
-                    callback.addFailed(new 
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);
+                    callback.addFailed(new ManagedLedgerInterceptException(e), 
ctx);

Review Comment:
   Good catch. I have used a variable `shouldCompensateCallBackWhenWriteFail` 
to mark whether a callback needs to be compensated



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                internalAsyncAddData(data, callback, ctx);
+            } catch (Exception e){
+                log.error("Internal async add data fail", e);
+            }
+        });
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK,  this flush event will not 
record to Metrics.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
-        int len = dataSerializer.getSerializedSize(data);
-        if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
+        int dataLength = dataSerializer.getSerializedSize(data);
+        if (dataLength >= batchedWriteMaxSize){
+            trigFlushByLargeSingleData();
             ByteBuf byteBuf = dataSerializer.serialize(data);
             managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        // Add data.
-        this.dataArray.add(data);
-        // Add callback info.
+        // Append data to the data-array.
+        dataArray.add(data);
+        // Append callback to the flushContext.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
-        this.flushContext.asyncAddArgsList.add(asyncAddArgs);
-        // Calculate bytes-size.
-        this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        flushContext.asyncAddArgsList.add(asyncAddArgs);
+        // Calculate bytes size.
+        bytesSize += dataLength;
+        trigFlushIfReachMaxRecordsOrMaxSize();
+    }
+
+    /**
+     * Change to IO thread and do flush, only called by {@link 
#timingFlushTask}.
+     */
+    private void trigFlushByTimingTask(){
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                if (flushContext.asyncAddArgsList.isEmpty()) {
+                    return;
+                }
+                if (metrics != null) {
+                    
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), 
bytesSize,
+                            System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+                }
+                doFlush();
+            } catch (Exception e){
+                log.error("Trig flush by timing task fail.", e);
+            } finally {
+                // Start the next timing task.
+                nextTimingTrigger();

Review Comment:
   I mistook you said `java.util.concurrent.ScheduledExecutorService`, I have 
to read more of `org.apache.bookkeeper.common.util.OrderedScheduler`.
   
   But now using the `timer` can save one thread.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -198,33 +209,99 @@ public void asyncAddData(T data, AddDataCallback 
callback, Object ctx){
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                internalAsyncAddData(data, callback, ctx);
+            } catch (Exception e){
+                log.error("Internal async add data fail", e);
+            }
+        });
     }
 
+    /**
+     * Append data to queue, if reach {@link #batchedWriteMaxRecords} or 
{@link #batchedWriteMaxSize}, do flush. And if
+     * accept a request that {@param data} is too large (larger than {@link 
#batchedWriteMaxSize}), then two flushes
+     * are executed:
+     *    1. Write the data cached in the queue to BK.
+     *    2. Direct write the large data to BK,  this flush event will not 
record to Metrics.
+     * This ensures the sequential nature of multiple writes to BK.
+     */
     private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
         if (state == State.CLOSING || state == State.CLOSED){
             callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
             return;
         }
-        int len = dataSerializer.getSerializedSize(data);
-        if (len >= batchedWriteMaxSize){
-            if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true, false);
-            }
+        int dataLength = dataSerializer.getSerializedSize(data);
+        if (dataLength >= batchedWriteMaxSize){
+            trigFlushByLargeSingleData();
             ByteBuf byteBuf = dataSerializer.serialize(data);
             managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        // Add data.
-        this.dataArray.add(data);
-        // Add callback info.
+        // Append data to the data-array.
+        dataArray.add(data);
+        // Append callback to the flushContext.
         AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
-        this.flushContext.asyncAddArgsList.add(asyncAddArgs);
-        // Calculate bytes-size.
-        this.bytesSize += len;
-        // trig flush.
-        doTrigFlush(false, false);
+        flushContext.asyncAddArgsList.add(asyncAddArgs);
+        // Calculate bytes size.
+        bytesSize += dataLength;
+        trigFlushIfReachMaxRecordsOrMaxSize();
+    }
+
+    /**
+     * Change to IO thread and do flush, only called by {@link 
#timingFlushTask}.
+     */
+    private void trigFlushByTimingTask(){
+        singleThreadExecutorForWrite.execute(() -> {
+            try {
+                if (flushContext.asyncAddArgsList.isEmpty()) {
+                    return;
+                }
+                if (metrics != null) {
+                    
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), 
bytesSize,
+                            System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+                }
+                doFlush();
+            } catch (Exception e){
+                log.error("Trig flush by timing task fail.", e);
+            } finally {
+                // Start the next timing task.
+                nextTimingTrigger();
+            }
+        });
+    }
+
+    /**
+     * If reach the thresholds {@link #batchedWriteMaxRecords} or {@link 
#batchedWriteMaxSize}, do flush.
+     */
+    private void trigFlushIfReachMaxRecordsOrMaxSize(){
+        if (flushContext.asyncAddArgsList.size() >= batchedWriteMaxRecords) {

Review Comment:
   When "addComplete and addFailed"  can access "flushContext.asyncAddArgList", 
`singleThreadExecutorForWrite` is no longer able to access the object 
`flushContext`. In method `doFlush`, `singleThreadExecutorForWrite` has already 
started holding another object `flushContext`: 
   
   ```java
   private void doFlush(){
         ......
         flushContext = FlushContext.newInstance();
         bytesSize = 0;
     }
   ```
   
   After `doFlush`, the thread `singleThreadExecutorForWrite` will not modify 
`flushContext` any more



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -143,6 +153,10 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, 
OrderedExecutor ordered
                                 DataSerializer<T> dataSerializer,
                                 int batchedWriteMaxRecords, int 
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
                                 boolean batchEnabled, 
TxnLogBufferedWriterMetricsStats metrics){
+        if (batchedWriteMaxRecords <= 1 && batchEnabled){
+            log.warn("The current maximum number of records per batch is set 
to 1. Disabling Batch will improve"

Review Comment:
   Already change it to this:
   
   ```
   if (metrics != null){
       log.warn("Transaction Log Buffered Writer with the metrics name 
beginning with {} has batching enabled"
               + " yet the maximum batch size was configured to less than or 
equal to 1 record, hence due to"
               + " performance reasons batching is disabled", 
metrics.getMetricsPrefix());
   } else {
       log.warn("Transaction Log Buffered Writer has batching enabled"
               + " yet the maximum batch size was configured to less than or 
equal to 1 record, hence due to"
               + " performance reasons batching is disabled");
   }
   ```
   
   the case `else` will be removed at the next PR



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -69,6 +70,10 @@
                     new Exception("Transaction log buffered write has closed")
             );
 
+    private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter, 
TxnLogBufferedWriter.State> STATE_UPDATER =

Review Comment:
   It has been changed to: Use `STATE_UPDATER ` to modify all state changes



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -112,10 +117,31 @@
 
     /** The main purpose of state maintenance is to prevent written after 
close. **/
     private volatile State state;
-    private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter, 
TxnLogBufferedWriter.State> STATE_UPDATER =
-            AtomicReferenceFieldUpdater
-                    .newUpdater(TxnLogBufferedWriter.class, 
TxnLogBufferedWriter.State.class, "state");
 
+    private final BookKeeperBatchedWriteCallback 
bookKeeperBatchedWriteCallback = new BookKeeperBatchedWriteCallback();
+
+    private final TxnLogBufferedWriterMetricsStats metrics;
+
+    /**
+     * In the {@link #asyncAddData}, exceptions may occur. To avoid losing the 
callback, use a variable to mark whether
+     * a callback needs to be compensated.
+     */
+    private boolean shouldCallBackWhenWriteFail;

Review Comment:
   Good suggestion. Already replaced with method-local variables



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java:
##########
@@ -118,84 +107,71 @@ public TxnLogBufferedWriterMetricsStats(String 
metricsPrefix, String[] labelName
 
         String recordsPerBatchMetricName =
                 String.format("%s_bufferedwriter_batch_record_count", 
metricsPrefix);
-        recordsPerBatchMetric = (Histogram) COLLECTOR_CACHE.computeIfAbsent(
-                recordsPerBatchMetricName,
-                k -> new Histogram.Builder()
+        this.recordsPerBatchMetric = new Histogram.Builder()

Review Comment:
   OK, already remove "this."



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -217,9 +224,9 @@ public void asyncAddData(T data, AddDataCallback callback, 
Object ctx){
                 // Avoid missing callback, do failed callback when error occur 
before add data to the array.
                 int recordsCountAfter = dataArray.size();
                 if (recordsCountAfter == recordsCountBeforeAdd){
-                    callback.addFailed(new 
ManagedLedgerException.ManagedLedgerFencedException(e), ctx);
+                    callback.addFailed(new ManagedLedgerInterceptException(e), 
ctx);

Review Comment:
   Log printing has been added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to