This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new af932ed3e9f [improve] [txn] PIP-160: Txn buffered writer for
transaction log batch (#16428)
af932ed3e9f is described below
commit af932ed3e9f325cd0d36f57ffd1f4fd4d9f446ad
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 19 09:53:56 2022 +0800
[improve] [txn] PIP-160: Txn buffered writer for transaction log batch
(#16428)
---
.../coordinator/impl/TxnBatchedPositionImpl.java | 59 +++
.../coordinator/impl/TxnLogBufferedWriter.java | 550 +++++++++++++++++++++
.../coordinator/impl/TxnLogBufferedWriterTest.java | 502 +++++++++++++++++++
3 files changed, 1111 insertions(+)
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
new file mode 100644
index 00000000000..e5e9b60cfe8
--- /dev/null
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Objects;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+/***
+ * The difference with {@link PositionImpl} is that there are two more
parameters:
+ * {@link #batchSize}, {@link #batchIndex}.
+ */
+public class TxnBatchedPositionImpl extends PositionImpl {
+
+ /** The data length of current batch. **/
+ @Getter
+ private final int batchSize;
+
+ /** The position of current batch. **/
+ @Getter
+ private final int batchIndex;
+
+ public TxnBatchedPositionImpl(Position position, int batchSize, int
batchIndex, long[] ackSet){
+ super(position.getLedgerId(), position.getEntryId(), ackSet);
+ this.batchIndex = batchIndex;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TxnBatchedPositionImpl other) {
+ return super.equals(o) && batchSize == other.batchSize &&
batchIndex == other.batchIndex;
+ }
+ return false;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), batchSize, batchIndex);
+ }
+}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
new file mode 100644
index 00000000000..685fbba9b2f
--- /dev/null
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
@@ -0,0 +1,550 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+
+/***
+ * See PIP-160: https://github.com/apache/pulsar/issues/15516.
+ * Buffer requests and flush to Managed Ledger. Transaction Log Store And
Pending Ack Store will no longer write to
+ * Managed Ledger directly, Change to using this class to write Ledger data.
+ * Caches “write requests” for a certain number or a certain size of request
data and then writes them to the
+ * Managed Ledger in one go. After Managed Ledger has written complete,
responds to each request-caller. In this
+ * process, Managed Ledger doesn't care how many records(or what to be
written) in the Entry, it just treats them as
+ * a single block of data.
+ * The first write-request by transaction components will take a long time to
receive a response, because we must wait
+ * for subsequent requests to accumulate enough data to actually start writing
to the Managed Ledger. To control the
+ * maximum latency, we will mark the first request time for each batch, and
additional timing triggers writes.
+ * You can enable or disabled the batch feature, will use Managed Ledger
directly and without batching when disabled.
+ */
+@Slf4j
+public class TxnLogBufferedWriter<T> implements
AsyncCallbacks.AddEntryCallback, Closeable {
+
+ public static final short BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER = 0x0e01;
+
+ public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION = 1;
+
+ private static final ManagedLedgerException
BUFFERED_WRITER_CLOSED_EXCEPTION =
+ new ManagedLedgerException.ManagedLedgerFencedException(
+ new Exception("Transaction log buffered write has closed")
+ );
+
+ /**
+ * Enable or disabled the batch feature, will use Managed Ledger directly
and without batching when disabled.
+ */
+ private final boolean batchEnabled;
+
+ private final ManagedLedger managedLedger;
+
+ /** All write operation will be executed on single thread. **/
+ private final ExecutorService singleThreadExecutorForWrite;
+
+ /** The serializer for the object which called by {@link #asyncAddData}.
**/
+ private final DataSerializer<T> dataSerializer;
+
+ private ScheduledFuture<?> scheduledFuture;
+
+ /**
+ * Caches “write requests” for a certain for a certain number, if reach
this threshold, will trig Bookie writes.
+ */
+ private final int batchedWriteMaxRecords;
+
+ /**
+ * Caches “write requests” for a certain size of request data, if reach
this threshold, will trig Bookie writes.
+ */
+ private final int batchedWriteMaxSize;
+
+ /** Maximum delay for writing to bookie for the earliest request in the
batch. **/
+ private final int batchedWriteMaxDelayInMillis;
+
+ /** Data cached in the current batch. Will reset to null after each
batched writes. **/
+ private final ArrayList<T> dataArray;
+
+ /**
+ * Parameters of {@link #asyncAddData} cached in the current batch. Will
create a new one after each batched writes.
+ */
+ private FlushContext flushContext;
+
+ /** Bytes size of data in current batch. Will reset to 0 after each
batched writes. **/
+ private long bytesSize;
+
+ /** The main purpose of state maintenance is to prevent written after
close. **/
+ private volatile State state;
+ private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter,
TxnLogBufferedWriter.State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater
+ .newUpdater(TxnLogBufferedWriter.class,
TxnLogBufferedWriter.State.class, "state");
+
+
+ /**
+ * Constructor.
+ * @param dataSerializer The serializer for the object which called by
{@link #asyncAddData}.
+ * @param batchedWriteMaxRecords Caches “write requests” for a certain
number, if reach this threshold, will trig
+ * Bookie writes.
+ * @param batchedWriteMaxSize Caches “write requests” for a certain size
of request data, if reach this threshold,
+ * will trig Bookie writes.
+ * @param batchedWriteMaxDelayInMillis Maximum delay for writing to bookie
for the earliest request in the batch.
+ * @param batchEnabled Enable or disabled the batch feature, will use
Managed Ledger directly and without batching
+ * when disabled.
+ */
+ public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor
orderedExecutor,
+ ScheduledExecutorService
scheduledExecutorService, DataSerializer<T> dataSerializer,
+ int batchedWriteMaxRecords, int
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
+ boolean batchEnabled){
+ this.batchEnabled = batchEnabled;
+ this.managedLedger = managedLedger;
+ this.singleThreadExecutorForWrite = orderedExecutor.chooseThread(
+ managedLedger.getName() == null ? UUID.randomUUID().toString()
: managedLedger.getName());
+ this.dataSerializer = dataSerializer;
+ this.batchedWriteMaxRecords = batchedWriteMaxRecords;
+ this.batchedWriteMaxSize = batchedWriteMaxSize;
+ this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
+ this.flushContext = FlushContext.newInstance();
+ this.dataArray = new ArrayList<>();
+ // scheduler task.
+ if (batchEnabled) {
+ this.scheduledFuture =
scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
+ batchedWriteMaxDelayInMillis,
batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+ }
+ this.state = State.OPEN;
+ }
+
+ /**
+ * Append a new entry to the end of a managed ledger. All writes will be
performed in the same thread. Callbacks are
+ * executed in strict write order,but after {@link #close()}, callbacks
that fail by state check will execute
+ * earlier, and successful callbacks will not be affected.
+ * @param data data entry to be persisted.
+ * @param callback Will call {@link AddDataCallback#addComplete(Position,
Object)} when
+ * add complete.
+ * Will call {@link
AddDataCallback#addFailed(ManagedLedgerException, Object)} when
+ * add failure.
+ * @throws ManagedLedgerException
+ */
+ public void asyncAddData(T data, AddDataCallback callback, Object ctx){
+ if (!batchEnabled){
+ if (state == State.CLOSING || state == State.CLOSED){
+ callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
+ return;
+ }
+ ByteBuf byteBuf = dataSerializer.serialize(data);
+ managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
+ AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
+ return;
+ }
+ singleThreadExecutorForWrite.execute(() -> internalAsyncAddData(data,
callback, ctx));
+ }
+
+ private void internalAsyncAddData(T data, AddDataCallback callback, Object
ctx){
+ if (state == State.CLOSING || state == State.CLOSED){
+ callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
+ return;
+ }
+ int len = dataSerializer.getSerializedSize(data);
+ if (len >= batchedWriteMaxSize){
+ if (!flushContext.asyncAddArgsList.isEmpty()) {
+ doTrigFlush(true);
+ }
+ ByteBuf byteBuf = dataSerializer.serialize(data);
+ managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
+ AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
+ return;
+ }
+ // Add data.
+ this.dataArray.add(data);
+ // Add callback info.
+ AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis());
+ this.flushContext.asyncAddArgsList.add(asyncAddArgs);
+ // Calculate bytes-size.
+ this.bytesSize += len;
+ // trig flush.
+ doTrigFlush(false);
+ }
+
+ /***
+ * The serializer for the object which called by {@link #asyncAddData}.
+ */
+ public interface DataSerializer<T>{
+
+ /**
+ * Calculate the number of bytes taken by {@param data} after
serialization.
+ * @param data The object which called by {@link #asyncAddData}.
+ * @return The number of bytes taken after serialization.
+ */
+ int getSerializedSize(T data);
+
+ /**
+ * Serialize {@param data} to {@link ByteBuf}. The returned ByteBuf
will be release once after writing to
+ * Bookie complete, and if you still need to use the ByteBuf, should
call {@link ByteBuf#retain()} in
+ * {@link #serialize(Object)} implementation.
+ * @param data The object which called by {@link #asyncAddData}.
+ * @return byte buf.
+ */
+ ByteBuf serialize(T data);
+
+ /**
+ * Serialize {@param dataArray} to {@link ByteBuf}.. The returned
ByteBuf will be release once after writing to
+ * Bookie complete, and if you still need to use the ByteBuf, should
call {@link ByteBuf#retain()} in
+ * {@link #serialize(Object)} implementation.
+ * @param dataArray The objects which called by {@link #asyncAddData}.
+ * @return byte buf.
+ */
+ ByteBuf serialize(ArrayList<T> dataArray);
+
+ }
+
+ /**
+ * Trigger write to bookie once, If the conditions are not met, nothing
will be done.
+ */
+ public void trigFlush(final boolean force){
+ singleThreadExecutorForWrite.execute(() -> doTrigFlush(force));
+ }
+
+ private void doTrigFlush(boolean force){
+ if (flushContext.asyncAddArgsList.isEmpty()) {
+ return;
+ }
+ if (force){
+ doFlush();
+ return;
+ }
+ AsyncAddArgs firstAsyncAddArgs = flushContext.asyncAddArgsList.get(0);
+ if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >
batchedWriteMaxDelayInMillis){
+ doFlush();
+ return;
+ }
+ if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords){
+ doFlush();
+ return;
+ }
+ if (this.bytesSize >= batchedWriteMaxSize){
+ doFlush();
+ }
+ }
+
+ private void doFlush(){
+ // Combine data.
+ ByteBuf prefix = PulsarByteBufAllocator.DEFAULT.buffer(4);
+ prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER);
+ prefix.writeShort(BATCHED_ENTRY_DATA_PREFIX_VERSION);
+ ByteBuf actualContent = this.dataSerializer.serialize(this.dataArray);
+ ByteBuf pairByteBuf = Unpooled.wrappedUnmodifiableBuffer(prefix,
actualContent);
+ // We need to release this pairByteBuf after Managed ledger async add
callback. Just holds by FlushContext.
+ this.flushContext.byteBuf = pairByteBuf;
+ // Flush.
+ if (State.CLOSING == state || State.CLOSED == state){
+ failureCallbackByContextAndRecycle(flushContext,
BUFFERED_WRITER_CLOSED_EXCEPTION);
+ } else {
+ managedLedger.asyncAddEntry(pairByteBuf, this, this.flushContext);
+ }
+ // Clear buffers.ok
+ this.dataArray.clear();
+ this.flushContext = FlushContext.newInstance();
+ this.bytesSize = 0;
+ }
+
+ /**
+ * see {@link AsyncCallbacks.AddEntryCallback#addComplete(Position,
ByteBuf, Object)}.
+ */
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+ final FlushContext flushContext = (FlushContext) ctx;
+ try {
+ final int batchSize = flushContext.asyncAddArgsList.size();
+ for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
+ final AsyncAddArgs asyncAddArgs =
flushContext.asyncAddArgsList.get(batchIndex);
+ BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+ bitSetRecyclable.set(batchIndex);
+ long[] ackSet = bitSetRecyclable.toLongArray();
+ bitSetRecyclable.recycle();
+ final TxnBatchedPositionImpl txnBatchedPosition = new
TxnBatchedPositionImpl(position, batchSize,
+ batchIndex, ackSet);
+ // Because this task already running at ordered task, so just
"run".
+ try {
+ asyncAddArgs.callback.addComplete(txnBatchedPosition,
asyncAddArgs.ctx);
+ } catch (Exception e){
+ log.error("After writing to the transaction batched log
complete, the callback failed."
+ + " managedLedger: " + managedLedger.getName(), e);
+ }
+ }
+ } finally {
+ flushContext.recycle();
+ }
+ }
+
+ /**
+ * see {@link
AsyncCallbacks.AddEntryCallback#addFailed(ManagedLedgerException, Object)}.
+ */
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ final FlushContext flushContext = (FlushContext) ctx;
+ failureCallbackByContextAndRecycle(flushContext, exception);
+ }
+
+ /**
+ * Cancel pending tasks and release resources.
+ */
+ @Override
+ public void close() {
+ // If disabled batch feature, there is no closing state.
+ if (!batchEnabled) {
+ STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
+ return;
+ }
+ // Prevent the reentrant.
+ if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)){
+ // Other thread also calling "close()".
+ return;
+ }
+ // Cancel pending tasks and release resources.
+ singleThreadExecutorForWrite.execute(() -> {
+ if (state == State.CLOSED){
+ return;
+ }
+ // Failure callback to pending request.
+ // If some request has been flushed, Bookie triggers the callback.
+ failureCallbackByContextAndRecycle(this.flushContext,
BUFFERED_WRITER_CLOSED_EXCEPTION);
+ // Cancel task that schedule at fixed rate trig flush.
+ if (scheduledFuture != null && !scheduledFuture.isCancelled() &&
!scheduledFuture.isDone()) {
+ if (this.scheduledFuture.cancel(false)){
+ this.state = State.CLOSED;
+ } else {
+ // Cancel task failure, The state will stay at CLOSING.
+ log.error("Cancel task that schedule at fixed rate trig
flush failure. The state will stay at"
+ + " CLOSING. managedLedger: " +
managedLedger.getName());
+ }
+ }
+ });
+ }
+
+ private void failureCallbackByContextAndRecycle(FlushContext flushContext,
ManagedLedgerException ex){
+ if (flushContext == null ||
CollectionUtils.isEmpty(flushContext.asyncAddArgsList)){
+ return;
+ }
+ try {
+ for (AsyncAddArgs asyncAddArgs : flushContext.asyncAddArgsList) {
+ failureCallbackByArgs(asyncAddArgs, ex, false);
+ }
+ } finally {
+ flushContext.recycle();
+ }
+ }
+
+ private void failureCallbackByArgs(AsyncAddArgs asyncAddArgs,
ManagedLedgerException ex, final boolean recycle){
+ if (asyncAddArgs == null) {
+ return;
+ }
+ try {
+ asyncAddArgs.callback.addFailed(ex, asyncAddArgs.ctx);
+ } catch (Exception e){
+ log.error("After writing to the transaction batched log failure,
the callback executed also"
+ + " failed. managedLedger: " + managedLedger.getName(), e);
+ } finally {
+ if (recycle) {
+ asyncAddArgs.recycle();
+ }
+ }
+ }
+
+ /***
+ * Holds the parameters of {@link #asyncAddData} for each callback after
batch write finished. This object is
+ * designed to be reusable, so the recycle mechanism is used。
+ */
+ @ToString
+ private static class AsyncAddArgs {
+
+ private static final Recycler<AsyncAddArgs> ASYNC_ADD_ARGS_RECYCLER =
new Recycler<>() {
+ @Override
+ protected AsyncAddArgs newObject(Handle<AsyncAddArgs> handle) {
+ return new AsyncAddArgs(handle);
+ }
+ };
+
+ /**
+ * This constructor is used only when batch is enabled.
+ */
+ private static AsyncAddArgs newInstance(AddDataCallback callback,
Object ctx, long addedTime){
+ AsyncAddArgs asyncAddArgs = ASYNC_ADD_ARGS_RECYCLER.get();
+ asyncAddArgs.callback = callback;
+ asyncAddArgs.ctx = ctx;
+ asyncAddArgs.addedTime = addedTime;
+ return asyncAddArgs;
+ }
+
+ /**
+ * This constructor is used only when batch is disabled, and has
{@param byteBuf} more than the
+ * {@link AsyncAddArgs#newInstance(AddDataCallback, Object, long)}
constructor. The {@param byteBuf} will be
+ * released during callback. see {@link AsyncAddArgs#recycle()}.
+ * @param byteBuf produced by {@link DataSerializer#serialize(Object)}
+ */
+ private static AsyncAddArgs newInstance(AddDataCallback callback,
Object ctx, long addedTime, ByteBuf byteBuf){
+ AsyncAddArgs asyncAddArgs = newInstance(callback, ctx, addedTime);
+ asyncAddArgs.byteBuf = byteBuf;
+ return asyncAddArgs;
+ }
+
+ private AsyncAddArgs(Recycler.Handle<AsyncAddArgs> handle){
+ this.handle = handle;
+ }
+
+ private final Recycler.Handle<AsyncAddArgs> handle;
+
+ /** Argument for {@link #asyncAddData(Object, AddDataCallback,
Object)}. **/
+ @Getter
+ private AddDataCallback callback;
+
+ /** Argument for {@link #asyncAddData(Object, AddDataCallback,
Object)}. **/
+ @Getter
+ private Object ctx;
+
+ /** Time of executed {@link #asyncAddData(Object, AddDataCallback,
Object)}. **/
+ @Getter
+ private long addedTime;
+
+ /**
+ * When turning off the Batch feature, we need to release the byteBuf
produced by
+ * {@link DataSerializer#serialize(Object)}. Only carry the ByteBuf
objects, no other use.
+ */
+ private ByteBuf byteBuf;
+
+ public void recycle(){
+ this.callback = null;
+ this.ctx = null;
+ this.addedTime = 0;
+ if (this.byteBuf != null){
+ this.byteBuf.release();
+ }
+ this.handle.recycle(this);
+ }
+ }
+
+ /***
+ * The context for {@link ManagedLedger#asyncAddEntry(byte[],
AsyncCallbacks.AddEntryCallback, Object)}, Holds the
+ * data array written in batches and callback parameters that need to be
executed after batched write is complete.
+ */
+ private static class FlushContext{
+
+ private static final Recycler<FlushContext> FLUSH_CONTEXT_RECYCLER =
new Recycler<FlushContext>() {
+ @Override
+ protected FlushContext newObject(Handle<FlushContext> handle) {
+ return new FlushContext(handle);
+ }
+ };
+
+ private final Recycler.Handle<FlushContext> handle;
+
+ /** Callback parameters for current batch. **/
+ private final ArrayList<AsyncAddArgs> asyncAddArgsList;
+
+ /**
+ * If turning on the Batch feature, we need to release the byteBuf
produced by
+ * {@link DataSerializer#serialize(ArrayList)} when Managed ledger
async add callback.
+ * Only carry the ByteBuf objects, no other use.
+ */
+ private ByteBuf byteBuf;
+
+ private FlushContext(Recycler.Handle<FlushContext> handle){
+ this.handle = handle;
+ this.asyncAddArgsList = new ArrayList<>(8);
+ }
+
+ private static FlushContext newInstance(){
+ return FLUSH_CONTEXT_RECYCLER.get();
+ }
+
+ public void recycle(){
+ for (AsyncAddArgs asyncAddArgs : this.asyncAddArgsList){
+ asyncAddArgs.recycle();
+ }
+ if (byteBuf != null){
+ byteBuf.release();
+ byteBuf = null;
+ }
+ this.asyncAddArgsList.clear();
+ this.handle.recycle(this);
+ }
+ }
+
+
+
+ interface AddDataCallback {
+
+ void addComplete(Position position, Object context);
+
+ void addFailed(ManagedLedgerException exception, Object ctx);
+ }
+
+ private enum State{
+ OPEN,
+ CLOSING,
+ CLOSED;
+ }
+
+ /***
+ * Instead origin param-callback for {@link #asyncAddData(Object,
AddDataCallback, Object)}
+ * when {@link #batchEnabled} == false, Used for ByteBuf release which
generated by {@link DataSerializer}.
+ */
+ private static class DisabledBatchCallback implements
AsyncCallbacks.AddEntryCallback {
+
+ private static final DisabledBatchCallback INSTANCE = new
DisabledBatchCallback();
+
+ private DisabledBatchCallback(){
+
+ }
+
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object
ctx) {
+ AsyncAddArgs asyncAddArgs = (AsyncAddArgs) ctx;
+ try {
+ asyncAddArgs.callback.addComplete(position, asyncAddArgs.ctx);
+ } finally {
+ asyncAddArgs.recycle();
+ }
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ AsyncAddArgs asyncAddArgs = (AsyncAddArgs) ctx;
+ try {
+ asyncAddArgs.callback.addFailed(exception, asyncAddArgs.ctx);
+ } finally {
+ asyncAddArgs.recycle();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
new file mode 100644
index 00000000000..78bd4e8a57b
--- /dev/null
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
+
+ /**
+ * Overridden cases:
+ * 1. Enabled the batch feature.
+ * 1-1. Normal.
+ * 1-2. The number of data writes is few (Small data block).
+ * 1-3. The number of data writes is large (Small data block).
+ * 1-4. Large data writes.
+ * 1-5. A batch has only one data
+ * 1-6. A batch has only two data
+ * 2. Disabled the batch feature.
+ * 3. Bookie always error.
+ * 3-1. Enabled the batch feature.
+ * 3-1. Disabled the batch feature.
+ * 4. Bookie sometimes error.
+ * 4-1. Enabled the batch feature.
+ * 4-1. Disabled the batch feature.
+ * 5. {@link TxnLogBufferedWriter} sometimes close.
+ * 5-1. Enabled the batch feature.
+ * 5-1. Disabled the batch feature.
+ */
+ @DataProvider(name= "mainProcessCasesProvider")
+ public Object[][] mainProcessCasesProvider(){
+ Object [][] provider = new Object [13][];
+ // Normal.
+ provider[0] = new Object[]{512, 1024 * 1024, 1, true, 2000, 2, 4,
BookieErrorType.NO_ERROR, false};
+ // The number of data writes is few.
+ provider[1] = new Object[]{512, 1024 * 1024, 1, true, 100, 2, 4,
BookieErrorType.NO_ERROR, false};
+ // The number of data writes is large.
+ provider[2] = new Object[]{512, 1024 * 1024, 100, true, 20000, 5, 4,
BookieErrorType.NO_ERROR, false};
+ // Big data writes.
+ provider[3] = new Object[]{512, 1024, 100, true, 3000, 4, 1024,
BookieErrorType.NO_ERROR, false};
+ // A batch has only one data
+ provider[4] = new Object[]{1, 1024 * 1024, 100, true, 2000, 4, 4,
BookieErrorType.NO_ERROR, false};
+ // A batch has only two data
+ provider[5] = new Object[]{2, 1024 * 1024, 100, true, 1999, 4, 4,
BookieErrorType.NO_ERROR, false};
+ // Disabled the batch feature
+ provider[6] = new Object[]{512, 1024 * 1024, 1, false, 2000, 4, 4,
BookieErrorType.NO_ERROR, false};
+ // Bookie always error.
+ provider[7] = new Object[]{512, 1024 * 1024, 1, true, 2000, 2, 4,
BookieErrorType.ALWAYS_ERROR, false};
+ provider[8] = new Object[]{512, 1024 * 1024, 1, false, 2000, 4, 4,
BookieErrorType.ALWAYS_ERROR, false};
+ // Bookie sometimes error.
+ provider[9] = new Object[]{512, 1024 * 1024, 1, true, 2000, 4, 4,
BookieErrorType.SOMETIMES_ERROR, false};
+ provider[10] = new Object[]{512, 1024 * 1024, 1, false, 2000, 4, 4,
BookieErrorType.SOMETIMES_ERROR, false};
+ // TxnLogBufferedWriter sometimes close.
+ provider[11] = new Object[]{512, 1024 * 1024, 1, true, 2000, 4, 4,
BookieErrorType.NO_ERROR, true};
+ provider[12] = new Object[]{512, 1024 * 1024, 1, false, 2000, 4, 4,
BookieErrorType.NO_ERROR, true};
+ return provider;
+ }
+
+ /**
+ * Tests all operations from write to callback, including these step:
+ * 1. Write many data.
+ * 2. Verify callback correct.
+ * 3. Read from bookie, verify all data correct.
+ * 4. Verify byte buffer that generated by DataSerializer has been
released after process finish.
+ * 5. Cleanup.
+ * Overridden cases: see {@link #mainProcessCasesProvider}.
+ *
+ * TODO: Additional validation is required:
+ * 1. Recycle is handled correctly.
+ * 2. ByteBuf generated by data merge before Bookie writes is released
correctly, including prefix-ByteBuf and
+ * composite-ByteBuf.
+ * 3. Even if executed "bkc.failAfter", also need to verify the data
which has already written to bookie.
+ */
+ @Test(dataProvider = "mainProcessCasesProvider")
+ public void testMainProcess(int batchedWriteMaxRecords, int
batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
+ boolean batchEnabled, final int
writeCmdExecuteCount, int maxWaitSeconds,
+ int eachDataBytesLen, BookieErrorType
bookieErrorType,
+ boolean closeBufferedWriter) throws Exception {
+ // Assert args.
+ if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType ||
closeBufferedWriter){
+ if (writeCmdExecuteCount < batchedWriteMaxRecords * 2){
+ throw new IllegalArgumentException("if bookieErrorType is
BookieErrorType.SOMETIMES_ERROR or"
+ + " closeBufferedWriter is ture,
param-writeCmdExecuteCount max large than"
+ + " param-batchedWriteMaxRecords * 2");
+ }
+ }
+ // Calculate the exactly batch-enabled for assert.
+ boolean exactlyBatched = batchEnabled;
+ if (batchedWriteMaxSize <= eachDataBytesLen){
+ exactlyBatched = false;
+ }
+ /**
+ * Create components for tests.
+ * - managedLedger
+ * - managedCursor
+ * - orderedExecutor
+ * - orderedExecutor
+ * - scheduledExecutorService
+ * - dataSerializer
+ */
+ ManagedLedger managedLedger = factory.open("tx_test_ledger");
+ ManagedCursor managedCursor =
managedLedger.openCursor("tx_test_cursor");
+ if (BookieErrorType.ALWAYS_ERROR == bookieErrorType){
+ managedLedger = Mockito.spy(managedLedger);
+ managedCursor = Mockito.spy(managedCursor);
+ failureManagedLedger(managedLedger);
+ } else if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType){
+ bkc.failAfter(1, BKException.Code.NotEnoughBookiesException);
+ metadataStore.setAlwaysFail(new
MetadataStoreException.BadVersionException(""));
+ }
+ OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder()
+ .numThreads(5).name("tx-threads").build();
+ ScheduledExecutorService scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("tx-scheduler-threads"));
+ JsonDataSerializer dataSerializer = new
JsonDataSerializer(eachDataBytesLen);
+ /**
+ * Execute test task.
+ * 1. Write many times.
+ * 2. Store the param-context and param-position of callback
function for verify.
+ */
+ // Create TxLogBufferedWriter.
+ TxnLogBufferedWriter txnLogBufferedWriter = new
TxnLogBufferedWriter<Integer>(
+ managedLedger, orderedExecutor,
scheduledExecutorService,
+ dataSerializer, batchedWriteMaxRecords,
batchedWriteMaxSize,
+ batchedWriteMaxDelayInMillis, batchEnabled);
+ // Store the param-context, param-position, param-exception of
callback function and complete-count for verify.
+ ArrayList<Integer> contextArrayOfCallback = new ArrayList<>();
+ ArrayList<ManagedLedgerException> exceptionArrayOfCallback = new
ArrayList<>();
+ LinkedHashMap<PositionImpl, ArrayList<Position>> positionsOfCallback =
new LinkedHashMap<>();
+ AtomicBoolean anyFlushCompleted = new AtomicBoolean();
+ TxnLogBufferedWriter.AddDataCallback callback = new
TxnLogBufferedWriter.AddDataCallback(){
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ anyFlushCompleted.set(true);
+ if
(contextArrayOfCallback.contains(Integer.valueOf(String.valueOf(ctx)))){
+ return;
+ }
+ contextArrayOfCallback.add((int)ctx);
+ PositionImpl lightPosition =
PositionImpl.get(position.getLedgerId(), position.getEntryId());
+ positionsOfCallback.computeIfAbsent(lightPosition, p -> new
ArrayList<>());
+ positionsOfCallback.get(lightPosition).add(position);
+ }
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ if
(contextArrayOfCallback.contains(Integer.valueOf(String.valueOf(ctx)))){
+ return;
+ }
+ contextArrayOfCallback.add((int)ctx);
+ exceptionArrayOfCallback.add(exception);
+ }
+ };
+ // Write many times.
+ int bufferedWriteCloseAtIndex = writeCmdExecuteCount/2
+ + new Random().nextInt(writeCmdExecuteCount / 4 + 1) - 1;
+ for (int i = 0; i < writeCmdExecuteCount; i++){
+ txnLogBufferedWriter.asyncAddData(i, callback, i);
+ // Ensure flush at least once before close buffered writer.
+ if (closeBufferedWriter && i == 0){
+ txnLogBufferedWriter.trigFlush(true);
+ }
+ if (closeBufferedWriter && bufferedWriteCloseAtIndex == i){
+ // Wait for any complete callback, avoid unstable.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
anyFlushCompleted.get());
+ txnLogBufferedWriter.close();
+ }
+ }
+ /**
+ * Assert callback correct.
+ * 1. callback count.
+ * 2. callback param-context.
+ * 2. if {@param bookieError} is true. verify the ex count.
+ * 3. if {@param bookieError} is false. verify the param-position
count.
+ * 4. if enabled batch-feature, will verify the attributes
(batchSize, batchIndex) of callback param-position.
+ * Note: {@link TxnLogBufferedWriter#close()} will make callback that
fail by state check execute earlier, so if
+ * {@param closeBufferedWriter} is true, we should sort
callback-context-array. Other callback-param should
+ * not sort.
+ */
+ // Assert callback count.
+ Awaitility.await().atMost(maxWaitSeconds, TimeUnit.SECONDS)
+ .until(() -> contextArrayOfCallback.size() ==
writeCmdExecuteCount);
+ // Assert callback param-context, verify that all callbacks are
executed in strict order.
+ if (closeBufferedWriter){
+ Collections.sort(contextArrayOfCallback);
+ }
+ Assert.assertEquals(contextArrayOfCallback.size(),
writeCmdExecuteCount);
+ for (int ctxIndex = 0; ctxIndex < writeCmdExecuteCount; ctxIndex++){
+
Assert.assertEquals(contextArrayOfCallback.get(ctxIndex).intValue(), ctxIndex);
+ }
+ // if {@param bookieError} is true. verify the ex count.
+ // if {@param bookieError} is false. verify the param-position count.
+ int exceptionCallbackCount = exceptionArrayOfCallback.size();
+ int positionCallbackCount = (int)
positionsOfCallback.values().stream().flatMap(l -> l.stream()).count();
+ if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType ||
closeBufferedWriter){
+ Assert.assertTrue(exceptionCallbackCount > 0);
+ Assert.assertTrue(positionCallbackCount > 0);
+ Assert.assertEquals(exceptionCallbackCount +
positionCallbackCount, writeCmdExecuteCount);
+ } else if (BookieErrorType.NO_ERROR == bookieErrorType){
+ Assert.assertEquals(positionCallbackCount, writeCmdExecuteCount);
+ } else {
+ Assert.assertEquals(exceptionCallbackCount, writeCmdExecuteCount);
+ }
+ // if enabled batch-feature, will verify the attributes (batchSize,
batchIndex) of callback param-position.
+ if (exactlyBatched && BookieErrorType.ALWAYS_ERROR != bookieErrorType){
+ Iterator<ArrayList<Position>> callbackPositionIterator =
positionsOfCallback.values().iterator();
+ List<String> exactlyFlushedDataArray =
dataSerializer.getGeneratedJsonArray();
+ for (int batchedEntryIndex = 0; batchedEntryIndex <
exactlyFlushedDataArray.size() - exceptionCallbackCount;
+ batchedEntryIndex++) {
+ String json = exactlyFlushedDataArray.get(batchedEntryIndex);
+ List<Integer> batchedData =
JsonDataSerializer.deserializeMergedData(json);
+ ArrayList<Position> innerPositions =
callbackPositionIterator.next();
+ for (int i = 0; i < batchedData.size(); i++) {
+ TxnBatchedPositionImpl innerPosition =
+ (TxnBatchedPositionImpl) innerPositions.get(i);
+ Assert.assertEquals(innerPosition.getBatchSize(),
batchedData.size());
+ Assert.assertEquals(innerPosition.getBatchIndex(), i);
+ }
+ }
+ }
+ /**
+ * Read entries from Bookie, Assert all data and position correct.
+ * 1. Assert the data of the read matches the data write.
+ * 2. Assert the position of the read matches the position of the
callback.
+ * 3. Assert callback count equals entry count.
+ * Note: after call {@link PulsarMockBookKeeper#failAfter}, the
managed ledger could not execute read entries
+ * anymore, so when {@param bookieErrorType} equals {@link
BookieErrorType.SOMETIMES_ERROR}, ski verify.
+ * Note2: Verify that all entry was written in strict order.
+ */
+ if (BookieErrorType.NO_ERROR == bookieErrorType) {
+ Iterator<PositionImpl> callbackPositionIterator =
positionsOfCallback.keySet().iterator();
+ List<String> dataArrayWrite =
dataSerializer.getGeneratedJsonArray();
+ int entryCounter = 0;
+ while (managedCursor.hasMoreEntries()) {
+ List<Entry> entries = managedCursor.readEntries(1);
+ if (entries == null || entries.isEmpty()) {
+ continue;
+ }
+ for (int m = 0; m < entries.size(); m++) {
+ // Get data read.
+ Entry entry = entries.get(m);
+ // Assert the position of the read matches the position of
the callback.
+ PositionImpl callbackPosition =
callbackPositionIterator.next();
+ Assert.assertEquals(entry.getLedgerId(),
callbackPosition.getLedgerId());
+ Assert.assertEquals(entry.getEntryId(),
callbackPosition.getEntryId());
+ if (exactlyBatched) {
+ // Get expected entry data from cache of
DataSerializer.
+ String expectEntryData =
dataArrayWrite.get(entryCounter);
+ ByteBuf entryByteBuf = entry.getDataBuffer();
+ entryByteBuf.skipBytes(4);
+ byte[] entryContentBytes = new
byte[entryByteBuf.readableBytes()];
+ entryByteBuf.readBytes(entryContentBytes);
+ String actEntryData = new String(entryContentBytes,
Charset.defaultCharset());
+ // Assert the data of the read matches the data write.
+ Assert.assertEquals(actEntryData, expectEntryData);
+ } else {
+ int entryValue = entry.getDataBuffer().readInt();
+ // Assert the data of the read matches the data write.
+ Assert.assertEquals(entryValue, entryCounter);
+ }
+ entry.release();
+ entryCounter++;
+ }
+ }
+ // Assert callback count equals entry count.
+ Assert.assertEquals(entryCounter, positionsOfCallback.size());
+ }
+ /** cleanup. **/
+ txnLogBufferedWriter.close();
+ // If we already call {@link PulsarMockBookKeeper#failAfter}, the
managed ledger could not close anymore.
+ if (BookieErrorType.SOMETIMES_ERROR != bookieErrorType){
+ managedLedger.close();
+ }
+ scheduledExecutorService.shutdown();
+ orderedExecutor.shutdown();
+ /**
+ * Assert all Byte Buf generated by DataSerializer has been released.
+ * 1. Because ManagedLedger holds write cache, some data is not
actually released until ManagedLedger is
+ * closed.
+ * 2. If we already call {@link PulsarMockBookKeeper#failAfter(int,
int)}, the managed ledger could not close
+ * anymore, so when {@param bookieErrorType} equals {@link
BookieErrorType.SOMETIMES_ERROR}, skip verify.
+ */
+ if (BookieErrorType.SOMETIMES_ERROR != bookieErrorType) {
+ dataSerializer.assertAllByteBufHasBeenReleased();
+ }
+ dataSerializer.cleanup();
+ }
+
+ private void failureManagedLedger(ManagedLedger managedLedger){
+ Mockito.doAnswer(invocation -> {
+ AsyncCallbacks.AddEntryCallback callback =
+ (AsyncCallbacks.AddEntryCallback)
invocation.getArguments()[1];
+ ManagedLedgerException managedLedgerException =
+ new ManagedLedgerException(new Exception("Fail by mock."));
+ callback.addFailed(managedLedgerException,
invocation.getArguments()[2]);
+ return null;
+ }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class),
Mockito.any(), Mockito.any());
+ }
+
+ /**
+ * Adjustable thresholds: trigger BookKeeper-write when reaching any one
of the following conditions
+ * Max size (bytes)
+ * Max records count
+ * Max delay time
+ * Tests these three thresholds.
+ */
+ @Test
+ public void testFlushThresholds() throws Exception{
+ // Create components.
+ String managedLedgerName = "-";
+ ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+ Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+ OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(5).name("tx-topic-threads").build();
+ ScheduledExecutorService scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("tx-scheduler-threads"));
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ // Cache the data flush to Bookie for Asserts.
+ List<Integer> dataArrayFlushedToBookie = new ArrayList<>();
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+ byteBuf.skipBytes(4);
+ dataArrayFlushedToBookie.add(byteBuf.readInt());
+ AsyncCallbacks.AddEntryCallback callback =
+ (AsyncCallbacks.AddEntryCallback)
invocation.getArguments()[1];
+ callback.addComplete(PositionImpl.get(1,1), byteBuf,
+ invocation.getArguments()[2]);
+ return null;
+ }
+ }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class),
Mockito.any(), Mockito.any());
+ // Start tests.
+ TxnLogBufferedWriter txnLogBufferedWriter = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+ scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100,
true);
+ TxnLogBufferedWriter.AddDataCallback callback =
Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
+ // Test threshold: writeMaxDelayInMillis.
+ txnLogBufferedWriter.asyncAddData(100, callback, 100);
+ Thread.sleep(101);
+ // Test threshold: batchedWriteMaxRecords.
+ for (int i = 0; i < 32; i++){
+ txnLogBufferedWriter.asyncAddData(1, callback, 1);
+ }
+ // Test threshold: batchedWriteMaxSize.
+ TxnLogBufferedWriter txnLogBufferedWriter2 = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+ scheduledExecutorService, dataSerializer, 1024, 64 * 4, 100,
true);
+ for (int i = 0; i < 64; i++){
+ txnLogBufferedWriter2.asyncAddData(1, callback, 1);
+ }
+ // Assert 4 flush.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
dataArrayFlushedToBookie.size() == 3);
+ Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
+ Assert.assertEquals(dataArrayFlushedToBookie.get(1).intValue(), 32);
+ Assert.assertEquals(dataArrayFlushedToBookie.get(2).intValue(), 64);
+ // Assert all resources released
+ dataSerializer.assertAllByteBufHasBeenReleased();
+ // clean up.
+ dataSerializer.cleanup();
+ scheduledExecutorService.shutdown();
+ orderedExecutor.shutdown();
+ }
+
+ private static class JsonDataSerializer implements
TxnLogBufferedWriter.DataSerializer<Integer>{
+
+ private static ObjectMapper objectMapper = new ObjectMapper();
+
+ private ArrayList<ByteBuf> generatedByteBufArray = new ArrayList<>();
+
+ @Getter
+ private ArrayList<String> generatedJsonArray = new ArrayList<>();
+
+ private int eachDataBytesLen = 4;
+
+ public JsonDataSerializer(){
+
+ }
+
+ public JsonDataSerializer(int eachDataBytesLen){
+ this.eachDataBytesLen = eachDataBytesLen;
+ }
+
+ @Override
+ public int getSerializedSize(Integer data) {
+ return eachDataBytesLen;
+ }
+ @Override
+ public ByteBuf serialize(Integer data) {
+ ByteBuf byteBuf = Unpooled.buffer(4);
+ byteBuf.writeInt(data == null ? 0 : data.intValue());
+ holdsByteBuf(byteBuf);
+ return byteBuf;
+ }
+ @Override
+ public ByteBuf serialize(ArrayList<Integer> dataArray) {
+ try {
+ String str = objectMapper.writeValueAsString(dataArray);
+ generatedJsonArray.add(str);
+ ByteBuf byteBuf =
Unpooled.copiedBuffer(str.getBytes(Charset.defaultCharset()));
+ holdsByteBuf(byteBuf);
+ return byteBuf;
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static List<Integer> deserializeMergedData(ByteBuf byteBuf){
+ byte[] bytes = new byte[byteBuf.readableBytes()];
+ byteBuf.readBytes(bytes);
+ return deserializeMergedData(new String(bytes,
Charset.defaultCharset()));
+ }
+
+ public static List<Integer> deserializeMergedData(String json){
+ try {
+ return objectMapper.readValue(json, ArrayList.class);
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void holdsByteBuf(ByteBuf byteBuf){
+ generatedByteBufArray.add(byteBuf);
+ }
+
+ protected void cleanup(){
+ // Just for GC.
+ generatedByteBufArray = new ArrayList<>();
+ generatedJsonArray = new ArrayList<>();
+ }
+
+ protected void assertAllByteBufHasBeenReleased(){
+ for (ByteBuf byteBuf : generatedByteBufArray){
+ Assert.assertEquals(byteBuf.refCnt(), 0);
+ }
+ }
+ }
+
+ private static class SumStrDataSerializer extends JsonDataSerializer {
+
+ @Override
+ public ByteBuf serialize(ArrayList<Integer> dataArray) {
+ int sum = CollectionUtils.isEmpty(dataArray) ? 0 :
dataArray.stream().reduce((a, b) -> a+b).get();
+ ByteBuf byteBuf = Unpooled.buffer(4);
+ byteBuf.writeInt(sum);
+ holdsByteBuf(byteBuf);
+ return byteBuf;
+ }
+ }
+
+ public enum BookieErrorType{
+ NO_ERROR,
+ ALWAYS_ERROR,
+ SOMETIMES_ERROR;
+ }
+}