poorbarcode commented on code in PR #16758:
URL: https://github.com/apache/pulsar/pull/16758#discussion_r948045323


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -259,138 +353,65 @@ private void internalAsyncAddData(T data, 
AddDataCallback callback, Object ctx){
 
     }
 
-    /**
-     * Trigger write to bookie once, If the conditions are not met, nothing 
will be done.
-     */
-    public void trigFlush(final boolean force, boolean byScheduleThreads){
-        singleThreadExecutorForWrite.execute(() -> doTrigFlush(force, 
byScheduleThreads));
-    }
-
-    private void doTrigFlush(boolean force, boolean byScheduleThreads){
-        try {
-            if (flushContext.asyncAddArgsList.isEmpty()) {
-                return;
-            }
-            if (force) {
-                doFlush();
-                return;
-            }
-            if (byScheduleThreads) {
-                doFlush();
-                return;
-            }
-            AsyncAddArgs firstAsyncAddArgs = 
flushContext.asyncAddArgsList.get(0);
-            if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >= 
batchedWriteMaxDelayInMillis) {
-                doFlush();
-                return;
-            }
-            if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords) {
-                doFlush();
-                return;
-            }
-            if (this.bytesSize >= batchedWriteMaxSize) {
-                doFlush();
-            }
-        } finally {
-            if (byScheduleThreads) {
-                nextTimingTrigger();
-            }
-        }
-    }
-
     private void doFlush(){
-        // Combine data.
-        ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
-        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
-        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
-        ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
-        ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix, 
actualContent);
-        // We need to release this pairByteBuf after Managed ledger async add 
callback. Just holds by FlushContext.
-        this.flushContext.byteBuf = pairByteBuf;
-        // Flush.
+        // Combine data cached by flushContext, and write to BK.
+        ByteBuf prefixByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
+        prefixByteBuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+        prefixByteBuf.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+        ByteBuf contentByteBuf = dataSerializer.serialize(dataArray);
+        ByteBuf wholeByteBuf = 
Unpooled.wrappedUnmodifiableBuffer(prefixByteBuf, contentByteBuf);
+        flushContext.byteBuf = wholeByteBuf;
         if (State.CLOSING == state || State.CLOSED == state){
             failureCallbackByContextAndRecycle(flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
         } else {
-            managedLedger.asyncAddEntry(pairByteBuf, this, this.flushContext);
-        }
-        // Clear buffers.ok
-        this.dataArray.clear();
-        this.flushContext = FlushContext.newInstance();
-        this.bytesSize = 0;
-    }
-
-    /**
-     * see {@link AsyncCallbacks.AddEntryCallback#addComplete(Position, 
ByteBuf, Object)}.
-     */
-    @Override
-    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-        final FlushContext flushContext = (FlushContext) ctx;
-        try {
-            final int batchSize = flushContext.asyncAddArgsList.size();
-            for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
-                final AsyncAddArgs asyncAddArgs = 
flushContext.asyncAddArgsList.get(batchIndex);
-                final TxnBatchedPositionImpl txnBatchedPosition = new 
TxnBatchedPositionImpl(position, batchSize,
-                        batchIndex);
-                // Because this task already running at ordered task, so just 
"run".
-                try {
-                    asyncAddArgs.callback.addComplete(txnBatchedPosition, 
asyncAddArgs.ctx);
-                } catch (Exception e){
-                    log.error("After writing to the transaction batched log 
complete, the callback failed."
-                            + " managedLedger: " + managedLedger.getName(), e);
-                }
-            }
-        } finally {
-            flushContext.recycle();
+            managedLedger.asyncAddEntry(wholeByteBuf, 
bookKeeperBatchedWriteCallback, flushContext);
         }
+        dataArray.clear();
+        flushContext = FlushContext.newInstance();
+        bytesSize = 0;
     }
 
     /**
-     * see {@link 
AsyncCallbacks.AddEntryCallback#addFailed(ManagedLedgerException, Object)}.
-     */
-    @Override
-    public void addFailed(ManagedLedgerException exception, Object ctx) {
-        final FlushContext flushContext = (FlushContext) ctx;
-        failureCallbackByContextAndRecycle(flushContext, exception);
-    }
-
-    /**
-     * Cancel pending tasks and release resources.
+     * Release resources and cancel pending tasks.
      */
     @Override
     public void close() {
-        // If disabled batch feature, there is no closing state.
+        // If batch feature is disabled, there is nothing to close, so set the 
stat only.
         if (!batchEnabled) {
             STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
             return;
         }
-        // Prevent the reentrant.
+        // If other thread already called "close()", so do nothing.
         if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)){
-            // Other thread also calling "close()".
             return;
         }
         // Cancel pending tasks and release resources.
         singleThreadExecutorForWrite.execute(() -> {
-            if (state == State.CLOSED){
-                return;
-            }
-            // Failure callback to pending request.
-            // If some request has been flushed, Bookie triggers the callback.
-            failureCallbackByContextAndRecycle(this.flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
-            // Cancel task that schedule at fixed rate trig flush.
-            if (timeout == null){
-                log.error("Cancel timeout-task that schedule at fixed rate 
trig flush failure. The field-timeout"
-                        + " is null. managedLedger: " + 
managedLedger.getName());
-            } else if (timeout.isCancelled()){
-                // TODO How decisions the timer-task has been finished ?
-                this.state = State.CLOSED;
-            } else {
-                if (this.timeout.cancel()) {
-                    this.state = State.CLOSED;
+            try {
+                if (state == State.CLOSED) {
+                    return;
+                }
+                // If some requests are flushed, BK will trigger these 
callbacks, and the remaining requests in should
+                // fail.
+                failureCallbackByContextAndRecycle(flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
+                // Cancel the timing task.
+                if (timeout == null) {
+                    log.error("Cancel timeout-task that schedule at fixed rate 
trig flush failure. The field-timeout"

Review Comment:
   Already deleted this check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to