This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new af932ed3e9f [improve] [txn] PIP-160: Txn buffered writer for 
transaction log batch (#16428)
af932ed3e9f is described below

commit af932ed3e9f325cd0d36f57ffd1f4fd4d9f446ad
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 19 09:53:56 2022 +0800

    [improve] [txn] PIP-160: Txn buffered writer for transaction log batch 
(#16428)
---
 .../coordinator/impl/TxnBatchedPositionImpl.java   |  59 +++
 .../coordinator/impl/TxnLogBufferedWriter.java     | 550 +++++++++++++++++++++
 .../coordinator/impl/TxnLogBufferedWriterTest.java | 502 +++++++++++++++++++
 3 files changed, 1111 insertions(+)

diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
new file mode 100644
index 00000000000..e5e9b60cfe8
--- /dev/null
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Objects;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+/***
+ * The difference with {@link PositionImpl} is that there are two more 
parameters:
+ * {@link #batchSize}, {@link #batchIndex}.
+ */
+public class TxnBatchedPositionImpl extends PositionImpl {
+
+    /** The data length of current batch. **/
+    @Getter
+    private final int batchSize;
+
+    /** The position of current batch. **/
+    @Getter
+    private final int batchIndex;
+
+    public TxnBatchedPositionImpl(Position position, int batchSize, int 
batchIndex, long[] ackSet){
+        super(position.getLedgerId(), position.getEntryId(), ackSet);
+        this.batchIndex = batchIndex;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof TxnBatchedPositionImpl other) {
+            return super.equals(o) && batchSize == other.batchSize && 
batchIndex == other.batchIndex;
+        }
+        return false;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), batchSize, batchIndex);
+    }
+}
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
new file mode 100644
index 00000000000..685fbba9b2f
--- /dev/null
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
@@ -0,0 +1,550 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+
+/***
+ * See PIP-160: https://github.com/apache/pulsar/issues/15516.
+ * Buffer requests and flush to Managed Ledger. Transaction Log Store And 
Pending Ack Store will no longer write to
+ * Managed Ledger directly, Change to using this class to write Ledger data.
+ * Caches “write requests” for a certain number or a certain size of request 
data and then writes them to the
+ * Managed Ledger in one go. After Managed Ledger has written complete, 
responds to each request-caller. In this
+ * process, Managed Ledger doesn't care how many records(or what to be 
written) in the Entry, it just treats them as
+ * a single block of data.
+ * The first write-request by transaction components will take a long time to 
receive a response, because we must wait
+ * for subsequent requests to accumulate enough data to actually start writing 
to the Managed Ledger. To control the
+ * maximum latency, we will mark the first request time for each batch, and 
additional timing triggers writes.
+ * You can enable or disabled the batch feature, will use Managed Ledger 
directly and without batching when disabled.
+ */
+@Slf4j
+public class TxnLogBufferedWriter<T> implements 
AsyncCallbacks.AddEntryCallback, Closeable {
+
+    public static final short BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER = 0x0e01;
+
+    public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION = 1;
+
+    private static final ManagedLedgerException 
BUFFERED_WRITER_CLOSED_EXCEPTION =
+            new ManagedLedgerException.ManagedLedgerFencedException(
+                    new Exception("Transaction log buffered write has closed")
+            );
+
+    /**
+     * Enable or disabled the batch feature, will use Managed Ledger directly 
and without batching when disabled.
+     */
+    private final boolean batchEnabled;
+
+    private final ManagedLedger managedLedger;
+
+    /** All write operation will be executed on single thread. **/
+    private final ExecutorService singleThreadExecutorForWrite;
+
+    /** The serializer for the object which called by {@link #asyncAddData}. 
**/
+    private final DataSerializer<T> dataSerializer;
+
+    private ScheduledFuture<?> scheduledFuture;
+
+    /**
+     * Caches “write requests” for a certain for a certain number, if reach 
this threshold, will trig Bookie writes.
+     */
+    private final int batchedWriteMaxRecords;
+
+    /**
+     * Caches “write requests” for a certain size of request data, if reach 
this threshold, will trig Bookie writes.
+     */
+    private final int batchedWriteMaxSize;
+
+    /** Maximum delay for writing to bookie for the earliest request in the 
batch. **/
+    private final int batchedWriteMaxDelayInMillis;
+
+    /** Data cached in the current batch. Will reset to null after each 
batched writes. **/
+    private final ArrayList<T> dataArray;
+
+    /**
+     * Parameters of {@link #asyncAddData} cached in the current batch. Will 
create a new one after each batched writes.
+     */
+    private FlushContext flushContext;
+
+    /** Bytes size of data in current batch. Will reset to 0 after each 
batched writes. **/
+    private long bytesSize;
+
+    /** The main purpose of state maintenance is to prevent written after 
close. **/
+    private volatile State state;
+    private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter, 
TxnLogBufferedWriter.State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater
+                    .newUpdater(TxnLogBufferedWriter.class, 
TxnLogBufferedWriter.State.class, "state");
+
+
+    /**
+     * Constructor.
+     * @param dataSerializer The serializer for the object which called by 
{@link #asyncAddData}.
+     * @param batchedWriteMaxRecords Caches “write requests” for a certain 
number, if reach this threshold, will trig
+     *                               Bookie writes.
+     * @param batchedWriteMaxSize Caches “write requests” for a certain size 
of request data, if reach this threshold,
+     *                           will trig Bookie writes.
+     * @param batchedWriteMaxDelayInMillis Maximum delay for writing to bookie 
for the earliest request in the batch.
+     * @param batchEnabled Enable or disabled the batch feature, will use 
Managed Ledger directly and without batching
+     *                    when disabled.
+     */
+    public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor 
orderedExecutor,
+                                ScheduledExecutorService 
scheduledExecutorService, DataSerializer<T> dataSerializer,
+                                int batchedWriteMaxRecords, int 
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
+                                boolean batchEnabled){
+        this.batchEnabled = batchEnabled;
+        this.managedLedger = managedLedger;
+        this.singleThreadExecutorForWrite = orderedExecutor.chooseThread(
+                managedLedger.getName() == null ? UUID.randomUUID().toString() 
: managedLedger.getName());
+        this.dataSerializer = dataSerializer;
+        this.batchedWriteMaxRecords = batchedWriteMaxRecords;
+        this.batchedWriteMaxSize = batchedWriteMaxSize;
+        this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
+        this.flushContext = FlushContext.newInstance();
+        this.dataArray = new ArrayList<>();
+        // scheduler task.
+        if (batchEnabled) {
+            this.scheduledFuture = 
scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
+                    batchedWriteMaxDelayInMillis, 
batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+        }
+        this.state = State.OPEN;
+    }
+
+    /**
+     * Append a new entry to the end of a managed ledger. All writes will be 
performed in the same thread. Callbacks are
+     * executed in strict write order,but after {@link #close()}, callbacks 
that fail by state check will execute
+     * earlier, and successful callbacks will not be affected.
+     * @param data data entry to be persisted.
+     * @param callback Will call {@link AddDataCallback#addComplete(Position, 
Object)} when
+     *                 add complete.
+     *                 Will call {@link 
AddDataCallback#addFailed(ManagedLedgerException, Object)} when
+     *                 add failure.
+     * @throws ManagedLedgerException
+     */
+    public void asyncAddData(T data, AddDataCallback callback, Object ctx){
+        if (!batchEnabled){
+            if (state == State.CLOSING || state == State.CLOSED){
+                callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
+                return;
+            }
+            ByteBuf byteBuf = dataSerializer.serialize(data);
+            managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
+                    AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
+            return;
+        }
+        singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data, 
callback, ctx));
+    }
+
+    private void internalAsyncAddData(T data, AddDataCallback callback, Object 
ctx){
+        if (state == State.CLOSING || state == State.CLOSED){
+            callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
+            return;
+        }
+        int len = dataSerializer.getSerializedSize(data);
+        if (len >= batchedWriteMaxSize){
+            if (!flushContext.asyncAddArgsList.isEmpty()) {
+                doTrigFlush(true);
+            }
+            ByteBuf byteBuf = dataSerializer.serialize(data);
+            managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
+                    AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
+            return;
+        }
+        // Add data.
+        this.dataArray.add(data);
+        // Add callback info.
+        AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis());
+        this.flushContext.asyncAddArgsList.add(asyncAddArgs);
+        // Calculate bytes-size.
+        this.bytesSize += len;
+        // trig flush.
+        doTrigFlush(false);
+    }
+
+    /***
+     * The serializer for the object which called by {@link #asyncAddData}.
+     */
+    public interface DataSerializer<T>{
+
+        /**
+         * Calculate the number of bytes taken by {@param data} after 
serialization.
+         * @param data The object which called by {@link #asyncAddData}.
+         * @return The number of bytes taken after serialization.
+         */
+        int getSerializedSize(T data);
+
+        /**
+         * Serialize {@param data} to {@link ByteBuf}. The returned ByteBuf 
will be release once after writing to
+         * Bookie complete, and if you still need to use the ByteBuf, should 
call {@link ByteBuf#retain()} in
+         * {@link #serialize(Object)} implementation.
+         * @param data The object which called by {@link #asyncAddData}.
+         * @return byte buf.
+         */
+        ByteBuf serialize(T data);
+
+        /**
+         * Serialize {@param dataArray} to {@link ByteBuf}.. The returned 
ByteBuf will be release once after writing to
+         * Bookie complete, and if you still need to use the ByteBuf, should 
call {@link ByteBuf#retain()} in
+         * {@link #serialize(Object)} implementation.
+         * @param dataArray The objects which called by {@link #asyncAddData}.
+         * @return byte buf.
+         */
+        ByteBuf serialize(ArrayList<T> dataArray);
+
+    }
+
+    /**
+     * Trigger write to bookie once, If the conditions are not met, nothing 
will be done.
+     */
+    public void trigFlush(final boolean force){
+        singleThreadExecutorForWrite.execute(() -> doTrigFlush(force));
+    }
+
+    private void doTrigFlush(boolean force){
+        if (flushContext.asyncAddArgsList.isEmpty()) {
+            return;
+        }
+        if (force){
+            doFlush();
+            return;
+        }
+        AsyncAddArgs firstAsyncAddArgs = flushContext.asyncAddArgsList.get(0);
+        if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime > 
batchedWriteMaxDelayInMillis){
+            doFlush();
+            return;
+        }
+        if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords){
+            doFlush();
+            return;
+        }
+        if (this.bytesSize >= batchedWriteMaxSize){
+            doFlush();
+        }
+    }
+
+    private void doFlush(){
+        // Combine data.
+        ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
+        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+        prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+        ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
+        ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix, 
actualContent);
+        // We need to release this pairByteBuf after Managed ledger async add 
callback. Just holds by FlushContext.
+        this.flushContext.byteBuf = pairByteBuf;
+        // Flush.
+        if (State.CLOSING == state || State.CLOSED == state){
+            failureCallbackByContextAndRecycle(flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
+        } else {
+            managedLedger.asyncAddEntry(pairByteBuf, this, this.flushContext);
+        }
+        // Clear buffers.ok
+        this.dataArray.clear();
+        this.flushContext = FlushContext.newInstance();
+        this.bytesSize = 0;
+    }
+
+    /**
+     * see {@link AsyncCallbacks.AddEntryCallback#addComplete(Position, 
ByteBuf, Object)}.
+     */
+    @Override
+    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+        final FlushContext flushContext = (FlushContext) ctx;
+        try {
+            final int batchSize = flushContext.asyncAddArgsList.size();
+            for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
+                final AsyncAddArgs asyncAddArgs = 
flushContext.asyncAddArgsList.get(batchIndex);
+                BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+                bitSetRecyclable.set(batchIndex);
+                long[] ackSet = bitSetRecyclable.toLongArray();
+                bitSetRecyclable.recycle();
+                final TxnBatchedPositionImpl txnBatchedPosition = new 
TxnBatchedPositionImpl(position, batchSize,
+                        batchIndex, ackSet);
+                // Because this task already running at ordered task, so just 
"run".
+                try {
+                    asyncAddArgs.callback.addComplete(txnBatchedPosition, 
asyncAddArgs.ctx);
+                } catch (Exception e){
+                    log.error("After writing to the transaction batched log 
complete, the callback failed."
+                            + " managedLedger: " + managedLedger.getName(), e);
+                }
+            }
+        } finally {
+            flushContext.recycle();
+        }
+    }
+
+    /**
+     * see {@link 
AsyncCallbacks.AddEntryCallback#addFailed(ManagedLedgerException, Object)}.
+     */
+    @Override
+    public void addFailed(ManagedLedgerException exception, Object ctx) {
+        final FlushContext flushContext = (FlushContext) ctx;
+        failureCallbackByContextAndRecycle(flushContext, exception);
+    }
+
+    /**
+     * Cancel pending tasks and release resources.
+     */
+    @Override
+    public void close() {
+        // If disabled batch feature, there is no closing state.
+        if (!batchEnabled) {
+            STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
+            return;
+        }
+        // Prevent the reentrant.
+        if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)){
+            // Other thread also calling "close()".
+            return;
+        }
+        // Cancel pending tasks and release resources.
+        singleThreadExecutorForWrite.execute(() -> {
+            if (state == State.CLOSED){
+                return;
+            }
+            // Failure callback to pending request.
+            // If some request has been flushed, Bookie triggers the callback.
+            failureCallbackByContextAndRecycle(this.flushContext, 
BUFFERED_WRITER_CLOSED_EXCEPTION);
+            // Cancel task that schedule at fixed rate trig flush.
+            if (scheduledFuture != null && !scheduledFuture.isCancelled() && 
!scheduledFuture.isDone()) {
+                if (this.scheduledFuture.cancel(false)){
+                    this.state = State.CLOSED;
+                } else {
+                    // Cancel task failure, The state will stay at CLOSING.
+                    log.error("Cancel task that schedule at fixed rate trig 
flush failure. The state will stay at"
+                            + " CLOSING. managedLedger: " + 
managedLedger.getName());
+                }
+            }
+        });
+    }
+
+    private void failureCallbackByContextAndRecycle(FlushContext flushContext, 
ManagedLedgerException ex){
+        if (flushContext == null || 
CollectionUtils.isEmpty(flushContext.asyncAddArgsList)){
+            return;
+        }
+        try {
+            for (AsyncAddArgs asyncAddArgs : flushContext.asyncAddArgsList) {
+                failureCallbackByArgs(asyncAddArgs, ex, false);
+            }
+        } finally {
+            flushContext.recycle();
+        }
+    }
+
+    private void failureCallbackByArgs(AsyncAddArgs asyncAddArgs, 
ManagedLedgerException ex, final boolean recycle){
+        if (asyncAddArgs == null) {
+            return;
+        }
+        try {
+            asyncAddArgs.callback.addFailed(ex, asyncAddArgs.ctx);
+        } catch (Exception e){
+            log.error("After writing to the transaction batched log failure, 
the callback executed also"
+                    + " failed. managedLedger: " + managedLedger.getName(), e);
+        } finally {
+            if (recycle) {
+                asyncAddArgs.recycle();
+            }
+        }
+    }
+
+    /***
+     * Holds the parameters of {@link #asyncAddData} for each callback after 
batch write finished. This object is
+     * designed to be reusable, so the recycle mechanism is used。
+     */
+    @ToString
+    private static class AsyncAddArgs {
+
+        private static final Recycler<AsyncAddArgs> ASYNC_ADD_ARGS_RECYCLER = 
new Recycler<>() {
+            @Override
+            protected AsyncAddArgs newObject(Handle<AsyncAddArgs> handle) {
+                return new AsyncAddArgs(handle);
+            }
+        };
+
+        /**
+         * This constructor is used only when batch is enabled.
+         */
+        private static AsyncAddArgs newInstance(AddDataCallback callback, 
Object ctx, long addedTime){
+            AsyncAddArgs asyncAddArgs = ASYNC_ADD_ARGS_RECYCLER.get();
+            asyncAddArgs.callback = callback;
+            asyncAddArgs.ctx = ctx;
+            asyncAddArgs.addedTime = addedTime;
+            return asyncAddArgs;
+        }
+
+        /**
+         * This constructor is used only when batch is disabled, and has 
{@param byteBuf} more than the
+         * {@link AsyncAddArgs#newInstance(AddDataCallback, Object, long)} 
constructor. The {@param byteBuf} will be
+         * released during callback. see {@link AsyncAddArgs#recycle()}.
+         * @param byteBuf produced by {@link DataSerializer#serialize(Object)}
+         */
+        private static AsyncAddArgs newInstance(AddDataCallback callback, 
Object ctx, long addedTime, ByteBuf byteBuf){
+            AsyncAddArgs asyncAddArgs = newInstance(callback, ctx, addedTime);
+            asyncAddArgs.byteBuf = byteBuf;
+            return asyncAddArgs;
+        }
+
+        private AsyncAddArgs(Recycler.Handle<AsyncAddArgs> handle){
+            this.handle = handle;
+        }
+
+        private final Recycler.Handle<AsyncAddArgs> handle;
+
+        /** Argument for {@link #asyncAddData(Object, AddDataCallback, 
Object)}. **/
+        @Getter
+        private AddDataCallback callback;
+
+        /** Argument for {@link #asyncAddData(Object, AddDataCallback, 
Object)}. **/
+        @Getter
+        private Object ctx;
+
+        /** Time of executed {@link #asyncAddData(Object, AddDataCallback, 
Object)}. **/
+        @Getter
+        private long addedTime;
+
+        /**
+         * When turning off the Batch feature, we need to release the byteBuf 
produced by
+         * {@link DataSerializer#serialize(Object)}. Only carry the ByteBuf 
objects, no other use.
+         */
+        private ByteBuf byteBuf;
+
+        public void recycle(){
+            this.callback = null;
+            this.ctx = null;
+            this.addedTime = 0;
+            if (this.byteBuf != null){
+                this.byteBuf.release();
+            }
+            this.handle.recycle(this);
+        }
+    }
+
+    /***
+     * The context for {@link ManagedLedger#asyncAddEntry(byte[], 
AsyncCallbacks.AddEntryCallback, Object)}, Holds the
+     * data array written in batches and callback parameters that need to be 
executed after batched write is complete.
+     */
+    private static class FlushContext{
+
+        private static final Recycler<FlushContext> FLUSH_CONTEXT_RECYCLER = 
new Recycler<FlushContext>() {
+            @Override
+            protected FlushContext newObject(Handle<FlushContext> handle) {
+                return new FlushContext(handle);
+            }
+        };
+
+        private final Recycler.Handle<FlushContext> handle;
+
+        /** Callback parameters for current batch. **/
+        private final ArrayList<AsyncAddArgs> asyncAddArgsList;
+
+        /**
+         * If turning on the Batch feature, we need to release the byteBuf 
produced by
+         * {@link DataSerializer#serialize(ArrayList)} when Managed ledger 
async add callback.
+         * Only carry the ByteBuf objects, no other use.
+         */
+        private ByteBuf byteBuf;
+
+        private FlushContext(Recycler.Handle<FlushContext> handle){
+            this.handle = handle;
+            this.asyncAddArgsList = new ArrayList<>(8);
+        }
+
+        private static FlushContext newInstance(){
+            return FLUSH_CONTEXT_RECYCLER.get();
+        }
+
+        public void recycle(){
+            for (AsyncAddArgs asyncAddArgs : this.asyncAddArgsList){
+                asyncAddArgs.recycle();
+            }
+            if (byteBuf != null){
+                byteBuf.release();
+                byteBuf = null;
+            }
+            this.asyncAddArgsList.clear();
+            this.handle.recycle(this);
+        }
+    }
+
+
+
+    interface AddDataCallback {
+
+        void addComplete(Position position, Object context);
+
+        void addFailed(ManagedLedgerException exception, Object ctx);
+    }
+
+    private enum State{
+        OPEN,
+        CLOSING,
+        CLOSED;
+    }
+
+    /***
+     * Instead origin param-callback for {@link #asyncAddData(Object, 
AddDataCallback, Object)}
+     * when {@link #batchEnabled} == false, Used for ByteBuf release which 
generated by {@link DataSerializer}.
+     */
+    private static class DisabledBatchCallback implements 
AsyncCallbacks.AddEntryCallback {
+
+        private static final DisabledBatchCallback INSTANCE = new 
DisabledBatchCallback();
+
+        private DisabledBatchCallback(){
+
+        }
+
+        @Override
+        public void addComplete(Position position, ByteBuf entryData, Object 
ctx) {
+            AsyncAddArgs asyncAddArgs = (AsyncAddArgs) ctx;
+            try {
+                asyncAddArgs.callback.addComplete(position, asyncAddArgs.ctx);
+            } finally {
+                asyncAddArgs.recycle();
+            }
+        }
+
+        @Override
+        public void addFailed(ManagedLedgerException exception, Object ctx) {
+            AsyncAddArgs asyncAddArgs = (AsyncAddArgs) ctx;
+            try {
+                asyncAddArgs.callback.addFailed(exception, asyncAddArgs.ctx);
+            } finally {
+                asyncAddArgs.recycle();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
new file mode 100644
index 00000000000..78bd4e8a57b
--- /dev/null
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
+
+    /**
+     * Overridden cases:
+     *   1. Enabled the batch feature.
+     *     1-1. Normal.
+     *     1-2. The number of data writes is few (Small data block).
+     *     1-3. The number of data writes is large (Small data block).
+     *     1-4. Large data writes.
+     *     1-5. A batch has only one data
+     *     1-6. A batch has only two data
+     *   2. Disabled the batch feature.
+     *   3. Bookie always error.
+     *      3-1. Enabled the batch feature.
+     *      3-1. Disabled the batch feature.
+     *   4. Bookie sometimes error.
+     *      4-1. Enabled the batch feature.
+     *      4-1. Disabled the batch feature.
+     *   5. {@link TxnLogBufferedWriter} sometimes close.
+     *      5-1. Enabled the batch feature.
+     *      5-1. Disabled the batch feature.
+     */
+    @DataProvider(name= "mainProcessCasesProvider")
+    public Object[][] mainProcessCasesProvider(){
+        Object [][] provider = new Object [13][];
+        // Normal.
+        provider[0] = new Object[]{512, 1024 * 1024, 1, true, 2000, 2, 4, 
BookieErrorType.NO_ERROR, false};
+        // The number of data writes is few.
+        provider[1] = new Object[]{512, 1024 * 1024, 1, true, 100, 2, 4, 
BookieErrorType.NO_ERROR, false};
+        // The number of data writes is large.
+        provider[2] = new Object[]{512, 1024 * 1024, 100, true, 20000, 5, 4, 
BookieErrorType.NO_ERROR, false};
+        // Big data writes.
+        provider[3] = new Object[]{512, 1024, 100, true, 3000, 4, 1024, 
BookieErrorType.NO_ERROR, false};
+        // A batch has only one data
+        provider[4] = new Object[]{1, 1024 * 1024, 100, true, 2000, 4, 4, 
BookieErrorType.NO_ERROR, false};
+        // A batch has only two data
+        provider[5] = new Object[]{2, 1024 * 1024, 100, true, 1999, 4, 4, 
BookieErrorType.NO_ERROR, false};
+        // Disabled the batch feature
+        provider[6] = new Object[]{512, 1024 * 1024, 1, false, 2000, 4, 4, 
BookieErrorType.NO_ERROR, false};
+        // Bookie always error.
+        provider[7] = new Object[]{512, 1024 * 1024, 1, true, 2000, 2, 4, 
BookieErrorType.ALWAYS_ERROR, false};
+        provider[8] = new Object[]{512, 1024 * 1024, 1, false, 2000, 4, 4, 
BookieErrorType.ALWAYS_ERROR, false};
+        // Bookie sometimes error.
+        provider[9] = new Object[]{512, 1024 * 1024, 1, true, 2000, 4, 4, 
BookieErrorType.SOMETIMES_ERROR, false};
+        provider[10] = new Object[]{512, 1024 * 1024, 1, false, 2000, 4, 4, 
BookieErrorType.SOMETIMES_ERROR, false};
+        // TxnLogBufferedWriter sometimes close.
+        provider[11] = new Object[]{512, 1024 * 1024, 1, true, 2000, 4, 4, 
BookieErrorType.NO_ERROR, true};
+        provider[12] = new Object[]{512, 1024 * 1024, 1, false, 2000, 4, 4, 
BookieErrorType.NO_ERROR, true};
+        return provider;
+    }
+
+    /**
+     * Tests all operations from write to callback, including these step:
+     *   1. Write many data.
+     *   2. Verify callback correct.
+     *   3. Read from bookie, verify all data correct.
+     *   4. Verify byte buffer that generated by DataSerializer has been 
released after process finish.
+     *   5. Cleanup.
+     * Overridden cases: see {@link #mainProcessCasesProvider}.
+     *
+     * TODO: Additional validation is required:
+     *   1. Recycle is handled correctly.
+     *   2. ByteBuf generated by data merge before Bookie writes is released 
correctly, including prefix-ByteBuf and
+     *      composite-ByteBuf.
+     *   3. Even if executed "bkc.failAfter", also need to verify the data 
which has already written to bookie.
+     */
+    @Test(dataProvider = "mainProcessCasesProvider")
+    public void testMainProcess(int batchedWriteMaxRecords, int 
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
+                                boolean batchEnabled, final int 
writeCmdExecuteCount, int maxWaitSeconds,
+                                int eachDataBytesLen, BookieErrorType 
bookieErrorType,
+                                boolean closeBufferedWriter) throws Exception {
+        // Assert args.
+        if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType || 
closeBufferedWriter){
+            if (writeCmdExecuteCount < batchedWriteMaxRecords * 2){
+                throw new IllegalArgumentException("if bookieErrorType is 
BookieErrorType.SOMETIMES_ERROR or"
+                        + " closeBufferedWriter is ture, 
param-writeCmdExecuteCount max large than"
+                        + " param-batchedWriteMaxRecords * 2");
+            }
+        }
+        // Calculate the exactly batch-enabled for assert.
+        boolean exactlyBatched = batchEnabled;
+        if (batchedWriteMaxSize <= eachDataBytesLen){
+            exactlyBatched = false;
+        }
+        /**
+         * Create components for tests.
+         *  - managedLedger
+         *  - managedCursor
+         *  - orderedExecutor
+         *  - orderedExecutor
+         *  - scheduledExecutorService
+         *  - dataSerializer
+         */
+        ManagedLedger managedLedger = factory.open("tx_test_ledger");
+        ManagedCursor managedCursor = 
managedLedger.openCursor("tx_test_cursor");
+        if (BookieErrorType.ALWAYS_ERROR == bookieErrorType){
+            managedLedger = Mockito.spy(managedLedger);
+            managedCursor = Mockito.spy(managedCursor);
+            failureManagedLedger(managedLedger);
+        } else if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType){
+            bkc.failAfter(1, BKException.Code.NotEnoughBookiesException);
+            metadataStore.setAlwaysFail(new 
MetadataStoreException.BadVersionException(""));
+        }
+        OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
+                .numThreads(5).name("tx-threads").build();
+        ScheduledExecutorService scheduledExecutorService =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("tx-scheduler-threads"));
+        JsonDataSerializer dataSerializer = new 
JsonDataSerializer(eachDataBytesLen);
+        /**
+         * Execute test task.
+         *   1. Write many times.
+         *   2. Store the param-context and param-position of callback 
function for verify.
+         */
+        // Create TxLogBufferedWriter.
+        TxnLogBufferedWriter txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
+                        managedLedger, orderedExecutor, 
scheduledExecutorService,
+                        dataSerializer, batchedWriteMaxRecords, 
batchedWriteMaxSize,
+                        batchedWriteMaxDelayInMillis, batchEnabled);
+        // Store the param-context, param-position, param-exception of 
callback function and complete-count for verify.
+        ArrayList<Integer> contextArrayOfCallback = new ArrayList<>();
+        ArrayList<ManagedLedgerException> exceptionArrayOfCallback = new 
ArrayList<>();
+        LinkedHashMap<PositionImpl, ArrayList<Position>> positionsOfCallback = 
new LinkedHashMap<>();
+        AtomicBoolean anyFlushCompleted = new AtomicBoolean();
+        TxnLogBufferedWriter.AddDataCallback callback = new 
TxnLogBufferedWriter.AddDataCallback(){
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                anyFlushCompleted.set(true);
+                if 
(contextArrayOfCallback.contains(Integer.valueOf(String.valueOf(ctx)))){
+                    return;
+                }
+                contextArrayOfCallback.add((int)ctx);
+                PositionImpl lightPosition = 
PositionImpl.get(position.getLedgerId(), position.getEntryId());
+                positionsOfCallback.computeIfAbsent(lightPosition, p -> new 
ArrayList<>());
+                positionsOfCallback.get(lightPosition).add(position);
+            }
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                if 
(contextArrayOfCallback.contains(Integer.valueOf(String.valueOf(ctx)))){
+                    return;
+                }
+                contextArrayOfCallback.add((int)ctx);
+                exceptionArrayOfCallback.add(exception);
+            }
+        };
+        // Write many times.
+        int bufferedWriteCloseAtIndex = writeCmdExecuteCount/2
+                + new Random().nextInt(writeCmdExecuteCount / 4 + 1) - 1;
+        for (int i = 0; i < writeCmdExecuteCount; i++){
+            txnLogBufferedWriter.asyncAddData(i, callback, i);
+            // Ensure flush at least once before close buffered writer.
+            if (closeBufferedWriter && i == 0){
+                txnLogBufferedWriter.trigFlush(true);
+            }
+            if (closeBufferedWriter && bufferedWriteCloseAtIndex == i){
+                // Wait for any complete callback, avoid unstable.
+                Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
anyFlushCompleted.get());
+                txnLogBufferedWriter.close();
+            }
+        }
+        /**
+         * Assert callback correct.
+         *   1. callback count.
+         *   2. callback param-context.
+         *   2. if {@param bookieError} is true. verify the ex count.
+         *   3. if {@param bookieError} is false. verify the param-position 
count.
+         *   4. if enabled batch-feature, will verify the attributes 
(batchSize, batchIndex) of callback param-position.
+         * Note: {@link TxnLogBufferedWriter#close()} will make callback that 
fail by state check execute earlier, so if
+         *   {@param closeBufferedWriter} is true, we should sort 
callback-context-array. Other callback-param should
+         *   not sort.
+         */
+        // Assert callback count.
+        Awaitility.await().atMost(maxWaitSeconds, TimeUnit.SECONDS)
+                .until(() -> contextArrayOfCallback.size() == 
writeCmdExecuteCount);
+        // Assert callback param-context, verify that all callbacks are 
executed in strict order.
+        if (closeBufferedWriter){
+            Collections.sort(contextArrayOfCallback);
+        }
+        Assert.assertEquals(contextArrayOfCallback.size(), 
writeCmdExecuteCount);
+        for (int ctxIndex = 0; ctxIndex < writeCmdExecuteCount; ctxIndex++){
+            
Assert.assertEquals(contextArrayOfCallback.get(ctxIndex).intValue(), ctxIndex);
+        }
+        // if {@param bookieError} is true. verify the ex count.
+        // if {@param bookieError} is false. verify the param-position count.
+        int exceptionCallbackCount = exceptionArrayOfCallback.size();
+        int positionCallbackCount = (int) 
positionsOfCallback.values().stream().flatMap(l -> l.stream()).count();
+        if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType ||  
closeBufferedWriter){
+            Assert.assertTrue(exceptionCallbackCount > 0);
+            Assert.assertTrue(positionCallbackCount > 0);
+            Assert.assertEquals(exceptionCallbackCount + 
positionCallbackCount, writeCmdExecuteCount);
+        } else if (BookieErrorType.NO_ERROR == bookieErrorType){
+            Assert.assertEquals(positionCallbackCount, writeCmdExecuteCount);
+        } else {
+            Assert.assertEquals(exceptionCallbackCount, writeCmdExecuteCount);
+        }
+        // if enabled batch-feature, will verify the attributes (batchSize, 
batchIndex) of callback param-position.
+        if (exactlyBatched && BookieErrorType.ALWAYS_ERROR != bookieErrorType){
+            Iterator<ArrayList<Position>> callbackPositionIterator = 
positionsOfCallback.values().iterator();
+            List<String> exactlyFlushedDataArray = 
dataSerializer.getGeneratedJsonArray();
+            for (int batchedEntryIndex = 0; batchedEntryIndex < 
exactlyFlushedDataArray.size() - exceptionCallbackCount;
+                 batchedEntryIndex++) {
+                String json = exactlyFlushedDataArray.get(batchedEntryIndex);
+                List<Integer> batchedData = 
JsonDataSerializer.deserializeMergedData(json);
+                ArrayList<Position> innerPositions = 
callbackPositionIterator.next();
+                for (int i = 0; i < batchedData.size(); i++) {
+                    TxnBatchedPositionImpl innerPosition =
+                            (TxnBatchedPositionImpl) innerPositions.get(i);
+                    Assert.assertEquals(innerPosition.getBatchSize(), 
batchedData.size());
+                    Assert.assertEquals(innerPosition.getBatchIndex(), i);
+                }
+            }
+        }
+        /**
+         * Read entries from Bookie, Assert all data and position correct.
+         *   1. Assert the data of the read matches the data write.
+         *   2. Assert the position of the read matches the position of the 
callback.
+         *   3. Assert callback count equals entry count.
+         * Note: after call {@link PulsarMockBookKeeper#failAfter}, the 
managed ledger could not execute read entries
+         *       anymore, so when {@param bookieErrorType} equals {@link 
BookieErrorType.SOMETIMES_ERROR}, ski verify.
+         * Note2: Verify that all entry was written in strict order.
+         */
+        if (BookieErrorType.NO_ERROR == bookieErrorType) {
+            Iterator<PositionImpl> callbackPositionIterator = 
positionsOfCallback.keySet().iterator();
+            List<String> dataArrayWrite = 
dataSerializer.getGeneratedJsonArray();
+            int entryCounter = 0;
+            while (managedCursor.hasMoreEntries()) {
+                List<Entry> entries = managedCursor.readEntries(1);
+                if (entries == null || entries.isEmpty()) {
+                    continue;
+                }
+                for (int m = 0; m < entries.size(); m++) {
+                    // Get data read.
+                    Entry entry = entries.get(m);
+                    // Assert the position of the read matches the position of 
the callback.
+                    PositionImpl callbackPosition = 
callbackPositionIterator.next();
+                    Assert.assertEquals(entry.getLedgerId(), 
callbackPosition.getLedgerId());
+                    Assert.assertEquals(entry.getEntryId(), 
callbackPosition.getEntryId());
+                    if (exactlyBatched) {
+                        // Get expected entry data from cache of 
DataSerializer.
+                        String expectEntryData = 
dataArrayWrite.get(entryCounter);
+                        ByteBuf entryByteBuf = entry.getDataBuffer();
+                        entryByteBuf.skipBytes(4);
+                        byte[] entryContentBytes = new 
byte[entryByteBuf.readableBytes()];
+                        entryByteBuf.readBytes(entryContentBytes);
+                        String actEntryData = new String(entryContentBytes, 
Charset.defaultCharset());
+                        // Assert the data of the read matches the data write.
+                        Assert.assertEquals(actEntryData, expectEntryData);
+                    } else {
+                        int entryValue = entry.getDataBuffer().readInt();
+                        // Assert the data of the read matches the data write.
+                        Assert.assertEquals(entryValue, entryCounter);
+                    }
+                    entry.release();
+                    entryCounter++;
+                }
+            }
+            // Assert callback count equals entry count.
+            Assert.assertEquals(entryCounter, positionsOfCallback.size());
+        }
+        /** cleanup. **/
+        txnLogBufferedWriter.close();
+        // If we already call {@link PulsarMockBookKeeper#failAfter}, the 
managed ledger could not close anymore.
+        if (BookieErrorType.SOMETIMES_ERROR != bookieErrorType){
+            managedLedger.close();
+        }
+        scheduledExecutorService.shutdown();
+        orderedExecutor.shutdown();
+        /**
+         * Assert all Byte Buf generated by DataSerializer has been released.
+         *   1. Because ManagedLedger holds write cache, some data is not 
actually released until ManagedLedger is
+         *      closed.
+         *   2. If we already call {@link PulsarMockBookKeeper#failAfter(int, 
int)}, the managed ledger could not close
+         *      anymore, so when {@param bookieErrorType} equals {@link 
BookieErrorType.SOMETIMES_ERROR}, skip verify.
+         */
+        if (BookieErrorType.SOMETIMES_ERROR != bookieErrorType) {
+            dataSerializer.assertAllByteBufHasBeenReleased();
+        }
+        dataSerializer.cleanup();
+    }
+
+    private void failureManagedLedger(ManagedLedger managedLedger){
+        Mockito.doAnswer(invocation -> {
+            AsyncCallbacks.AddEntryCallback callback =
+                    (AsyncCallbacks.AddEntryCallback) 
invocation.getArguments()[1];
+            ManagedLedgerException managedLedgerException =
+                    new ManagedLedgerException(new Exception("Fail by mock."));
+            callback.addFailed(managedLedgerException, 
invocation.getArguments()[2]);
+            return null;
+        }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), 
Mockito.any(), Mockito.any());
+    }
+
+    /**
+     * Adjustable thresholds: trigger BookKeeper-write when reaching any one 
of the following conditions
+     *     Max size (bytes)
+     *     Max records count
+     *     Max delay time
+     * Tests these three thresholds.
+     */
+    @Test
+    public void testFlushThresholds() throws Exception{
+        // Create components.
+        String managedLedgerName = "-";
+        ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+        Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+        OrderedExecutor orderedExecutor =  
OrderedExecutor.newBuilder().numThreads(5).name("tx-topic-threads").build();
+        ScheduledExecutorService scheduledExecutorService =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("tx-scheduler-threads"));
+        SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+        // Cache the data flush to Bookie for Asserts.
+        List<Integer> dataArrayFlushedToBookie = new ArrayList<>();
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+                byteBuf.skipBytes(4);
+                dataArrayFlushedToBookie.add(byteBuf.readInt());
+                AsyncCallbacks.AddEntryCallback callback =
+                        (AsyncCallbacks.AddEntryCallback) 
invocation.getArguments()[1];
+                callback.addComplete(PositionImpl.get(1,1), byteBuf,
+                        invocation.getArguments()[2]);
+                return null;
+            }
+        }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), 
Mockito.any(), Mockito.any());
+        // Start tests.
+        TxnLogBufferedWriter txnLogBufferedWriter = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100, 
true);
+        TxnLogBufferedWriter.AddDataCallback callback = 
Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
+        // Test threshold: writeMaxDelayInMillis.
+        txnLogBufferedWriter.asyncAddData(100, callback, 100);
+        Thread.sleep(101);
+        // Test threshold: batchedWriteMaxRecords.
+        for (int i = 0; i < 32; i++){
+            txnLogBufferedWriter.asyncAddData(1, callback, 1);
+        }
+        // Test threshold: batchedWriteMaxSize.
+        TxnLogBufferedWriter txnLogBufferedWriter2 = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                scheduledExecutorService, dataSerializer, 1024, 64 * 4, 100, 
true);
+        for (int i = 0; i < 64; i++){
+            txnLogBufferedWriter2.asyncAddData(1, callback, 1);
+        }
+        // Assert 4 flush.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
dataArrayFlushedToBookie.size() == 3);
+        Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
+        Assert.assertEquals(dataArrayFlushedToBookie.get(1).intValue(), 32);
+        Assert.assertEquals(dataArrayFlushedToBookie.get(2).intValue(), 64);
+        // Assert all resources released
+        dataSerializer.assertAllByteBufHasBeenReleased();
+        // clean up.
+        dataSerializer.cleanup();
+        scheduledExecutorService.shutdown();
+        orderedExecutor.shutdown();
+    }
+
+    private static class JsonDataSerializer implements 
TxnLogBufferedWriter.DataSerializer<Integer>{
+
+        private static ObjectMapper objectMapper = new ObjectMapper();
+
+        private ArrayList<ByteBuf> generatedByteBufArray = new ArrayList<>();
+
+        @Getter
+        private ArrayList<String> generatedJsonArray = new ArrayList<>();
+
+        private int eachDataBytesLen = 4;
+
+        public JsonDataSerializer(){
+
+        }
+
+        public JsonDataSerializer(int eachDataBytesLen){
+            this.eachDataBytesLen = eachDataBytesLen;
+        }
+
+        @Override
+        public int getSerializedSize(Integer data) {
+            return eachDataBytesLen;
+        }
+        @Override
+        public ByteBuf serialize(Integer data) {
+            ByteBuf byteBuf = Unpooled.buffer(4);
+            byteBuf.writeInt(data == null ? 0 : data.intValue());
+            holdsByteBuf(byteBuf);
+            return byteBuf;
+        }
+        @Override
+        public ByteBuf serialize(ArrayList<Integer> dataArray) {
+            try {
+                String str = objectMapper.writeValueAsString(dataArray);
+                generatedJsonArray.add(str);
+                ByteBuf byteBuf = 
Unpooled.copiedBuffer(str.getBytes(Charset.defaultCharset()));
+                holdsByteBuf(byteBuf);
+                return byteBuf;
+            } catch (Exception e){
+                throw new RuntimeException(e);
+            }
+        }
+
+        public static List<Integer> deserializeMergedData(ByteBuf byteBuf){
+            byte[] bytes = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(bytes);
+            return deserializeMergedData(new String(bytes, 
Charset.defaultCharset()));
+        }
+
+        public static List<Integer> deserializeMergedData(String json){
+            try {
+                return objectMapper.readValue(json, ArrayList.class);
+            } catch (Exception e){
+                throw new RuntimeException(e);
+            }
+        }
+
+        protected void holdsByteBuf(ByteBuf byteBuf){
+            generatedByteBufArray.add(byteBuf);
+        }
+
+        protected void cleanup(){
+            // Just for GC.
+            generatedByteBufArray = new ArrayList<>();
+            generatedJsonArray = new ArrayList<>();
+        }
+
+        protected void assertAllByteBufHasBeenReleased(){
+            for (ByteBuf byteBuf : generatedByteBufArray){
+                Assert.assertEquals(byteBuf.refCnt(), 0);
+            }
+        }
+    }
+
+    private static class SumStrDataSerializer extends JsonDataSerializer {
+
+        @Override
+        public ByteBuf serialize(ArrayList<Integer> dataArray) {
+            int sum = CollectionUtils.isEmpty(dataArray) ? 0 : 
dataArray.stream().reduce((a, b) -> a+b).get();
+            ByteBuf byteBuf = Unpooled.buffer(4);
+            byteBuf.writeInt(sum);
+            holdsByteBuf(byteBuf);
+            return byteBuf;
+        }
+    }
+
+    public enum BookieErrorType{
+        NO_ERROR,
+        ALWAYS_ERROR,
+        SOMETIMES_ERROR;
+    }
+}

Reply via email to