poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r948119106


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,138 +353,65 @@ private void internalAsyncAddData(T data, 
AddDataCallback callback, Object ctx){
 
     }
 
-    /**
-     * Trigger write to bookie once, If the conditions are not met, nothing 
will be done.
-     */
-    public void trigFlush(final boolean force, boolean byScheduleThreads){
-        singleThreadExecutorForWrite.execute(() -> doTrigFlush(force, 
byScheduleThreads));
-    }
-
-    private void doTrigFlush(boolean force, boolean byScheduleThreads){
-        try {
-            if (flushContext.asyncAddArgsList.isEmpty()) {
-                return;
-            }
-            if (force) {
-                doFlush();
-                return;
-            }
-            if (byScheduleThreads) {
-                doFlush();
-                return;
-            }
-            AsyncAddArgs firstAsyncAddArgs = 
flushContext.asyncAddArgsList.get(0);
-            if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >= 
batchedWriteMaxDelayInMillis) {
-                doFlush();
-                return;
-            }
-            if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords) {
-                doFlush();
-                return;
-            }
-            if (this.bytesSize >= batchedWriteMaxSize) {
-                doFlush();
-            }
-        } finally {
-            if (byScheduleThreads) {
-                nextTimingTrigger();
-            }
-        }
-    }
-
     private void doFlush(){
-        // Combine data.
-        ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
-        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
-        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
-        ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
-        ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix, 
actualContent);
-        // We need to release this pairByteBuf after Managed ledger async add 
callback. Just holds by FlushContext.
-        this.flushContext.byteBuf = pairByteBuf;
-        // Flush.
+        // Combine data cached by flushContext, and write to BK.
+        ByteBuf prefixByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+        prefixByteBuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+        prefixByteBuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+        ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+        ByteBuf wholeByteBuf = 
Unpooled.wrappedUnmodifiableBuffer(prefixByteBuf, contentByteBuf);
+        flushContext.byteBuf = wholeByteBuf;
         if (State.CLOSING == state || State.CLOSED == state){
             failureCallbackByContextAndRecycle(flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
         } else {
-            managedLedger.asyncAddEntry(pairByteBuf, this, this.flushContext);
-        }
-        // Clear buffers.ok
-        this.dataArray.clear();
-        this.flushContext = FlushContext.newInstance();
-        this.bytesSize = 0;
-    }
-
-    /**
-     * see {@link AsyncCallbacks.AddEntryCallback#addComplete(Position, 
ByteBuf, Object)}.
-     */
-    @Override
-    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-        final FlushContext flushContext = (FlushContext) ctx;
-        try {
-            final int batchSize = flushContext.asyncAddArgsList.size();
-            for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
-                final AsyncAddArgs asyncAddArgs = 
flushContext.asyncAddArgsList.get(batchIndex);
-                final TxnBatchedPositionImpl txnBatchedPosition = new 
TxnBatchedPositionImpl(position, batchSize,
-                        batchIndex);
-                // Because this task already running at ordered task, so just 
"run".
-                try {
-                    asyncAddArgs.callback.addComplete(txnBatchedPosition, 
asyncAddArgs.ctx);
-                } catch (Exception e){
-                    log.error("After writing to the transaction batched log 
complete, the callback failed."
-                            + " managedLedger: " + managedLedger.getName(), e);
-                }
-            }
-        } finally {
-            flushContext.recycle();
+            managedLedger.asyncAddEntry(wholeByteBuf, 
bookKeeperBatchedWriteCallback, flushContext);
         }
+        dataArray.clear();
+        flushContext = FlushContext.newInstance();
+        bytesSize = 0;
     }
 
     /**
-     * see {@link 
AsyncCallbacks.AddEntryCallback#addFailed(ManagedLedgerException, Object)}.
-     */
-    @Override
-    public void addFailed(ManagedLedgerException exception, Object ctx) {
-        final FlushContext flushContext = (FlushContext) ctx;
-        failureCallbackByContextAndRecycle(flushContext, exception);
-    }
-
-    /**
-     * Cancel pending tasks and release resources.
+     * Release resources and cancel pending tasks.
      */
     @Override
     public void close() {
-        // If disabled batch feature, there is no closing state.
+        // If batch feature is disabled, there is nothing to close, so set the 
stat only.
         if (!batchEnabled) {
             STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
             return;
         }
-        // Prevent the reentrant.
+        // If other thread already called "close()", so do nothing.
         if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)){
-            // Other thread also calling "close()".
             return;
         }
         // Cancel pending tasks and release resources.
         singleThreadExecutorForWrite.execute(() -> {
-            if (state == State.CLOSED){
-                return;
-            }
-            // Failure callback to pending request.
-            // If some request has been flushed, Bookie triggers the callback.
-            failureCallbackByContextAndRecycle(this.flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
-            // Cancel task that schedule at fixed rate trig flush.
-            if (timeout == null){
-                log.error("Cancel timeout-task that schedule at fixed rate 
trig flush failure. The field-timeout"
-                        + " is null. managedLedger: " + 
managedLedger.getName());
-            } else if (timeout.isCancelled()){
-                // TODO How decisions the timer-task has been finished ?
-                this.state = State.CLOSED;
-            } else {
-                if (this.timeout.cancel()) {
-                    this.state = State.CLOSED;
+            try {
+                if (state == State.CLOSED) {
+                    return;
+                }
+                // If some requests are flushed, BK will trigger these 
callbacks, and the remaining requests in should
+                // fail.
+                failureCallbackByContextAndRecycle(flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
+                // Cancel the timing task.
+                if (timeout == null) {
+                    log.error("Cancel timeout-task that schedule at fixed rate 
trig flush failure. The field-timeout"
+                            + " is null. managedLedger: " + 
managedLedger.getName());
+                } else if (timeout.isCancelled()) {
+                    // TODO How decisions the timer-task has been finished ?
+                    STATE_UPDATER.set(this, State.CLOSED);
                 } else {
-                    // Cancel task failure, The state will stay at CLOSING.
-                    log.error("Cancel timeout-task that schedule at fixed rate 
trig flush failure. The state will"
-                            + " stay at CLOSING. managedLedger: " + 
managedLedger.getName());
+                    if (this.timeout.cancel()) {
+                        STATE_UPDATER.set(this, State.CLOSED);
+                    } else {
+                        // Cancel task failure, The state will stay at CLOSING.
+                        log.error("Cancel timeout-task that schedule at fixed 
rate trig flush failure. The state will"

Review Comment:
   I have already rewrite the method `close`:
   
   - Just call `timeout.cancel` and set the state to `closed`, don't care if 
it's actually executed.
   - In method `nextTimingTrigger`, Prevents timing trigger.
   
   These two measures ensure that the Timing trigger will not be executed after 
`close`.
   
   ----
   
   And there are two behaviors that are not as expected( So the above plan is 
optimal ):
   
   - if the stat is EXPIRED, not means already executed, this just means 
`expired`(already executed or not). 
   - when we call `timeout.cancel`, will close the task asynchrony and return 
nothing.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to