This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4a2286c7e6b [improve][txn] PIP-160: Pending ack log store enables the
batch feature (#16707)
4a2286c7e6b is described below
commit 4a2286c7e6b8294cf2eb325fa9b3d6e6be71ed58
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 27 12:21:37 2022 +0800
[improve][txn] PIP-160: Pending ack log store enables the batch feature
(#16707)
Master Issue: #15370
### Motivation
see #15370
### Modifications
I will complete proposal #15370 with these pull requests( *current pull
request is the step-4* ):
1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. Transaction log store enables the batch feature.
4. Pending ack log store enables the batch feature.
5. Supports dynamic configuration.
6. Append admin API for transaction batch log and docs( admin and
configuration doc ).
7. Append metrics support for transaction batch log.
---
.../bookkeeper/mledger/impl/PositionImpl.java | 12 +
.../pendingack/impl/MLPendingAckStore.java | 223 +++++++++++++----
.../pendingack/impl/MLPendingAckStoreProvider.java | 26 +-
.../pulsar/broker/transaction/TransactionTest.java | 10 +-
.../pendingack/PendingAckMetadataTest.java | 14 +-
.../pendingack/impl/MLPendingAckStoreTest.java | 263 +++++++++++++++++++++
.../coordinator/impl/TxnLogBufferedWriter.java | 2 +-
7 files changed, 506 insertions(+), 44 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index dedb7beb0ce..b70836f433d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -115,6 +115,18 @@ public class PositionImpl implements Position,
Comparable<PositionImpl> {
return 0;
}
+ public int compareTo(long ledgerId, long entryId) {
+ if (this.ledgerId != ledgerId) {
+ return (this.ledgerId < ledgerId ? -1 : 1);
+ }
+
+ if (this.entryId != entryId) {
+ return (this.entryId < entryId ? -1 : 1);
+ }
+
+ return 0;
+ }
+
@Override
public int hashCode() {
int result = (int) (ledgerId ^ (ledgerId >>> 32));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index dd58fe774a8..3ff4ed8e768 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -18,8 +18,14 @@
*/
package org.apache.pulsar.broker.transaction.pendingack.impl;
+import static
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER;
+import static
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LEN;
+import static
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.BATCHED_ENTRY_DATA_PREFIX_VERSION_LEN;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -28,17 +34,21 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckReplyCallBack;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import
org.apache.pulsar.broker.transaction.pendingack.proto.BatchedPendingAckMetadataEntry;
import
org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadata;
import
org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadataEntry;
import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckOp;
@@ -48,6 +58,9 @@ import
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl;
+import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter;
+import
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
@@ -76,6 +89,14 @@ public class MLPendingAckStore implements PendingAckStore {
protected PositionImpl maxAckPosition = PositionImpl.EARLIEST;
private final LogIndexLagBackoff logIndexBackoff;
+ /**
+ * If the Batch feature is enabled by {@link #bufferedWriter}, {@link
#handleMetadataEntry(PositionImpl, List)} is
+ * executed after all data in the batch is written, instead of
+ * {@link #handleMetadataEntry(PositionImpl, PendingAckMetadataEntry)}
after each data is written. This is because
+ * method {@link #clearUselessLogData()} deletes the data in the unit of
Entry.
+ */
+ private final ArrayList<PendingAckMetadataEntry>
batchedPendingAckLogsWaitingForHandle;
+
/**
* The map is for pending ack store clear useless data.
* <p>
@@ -89,12 +110,16 @@ public class MLPendingAckStore implements PendingAckStore {
* If the max position (key) is smaller than the subCursor mark
delete position,
* the log cursor will mark delete the position before log
position (value).
*/
- private final ConcurrentSkipListMap<PositionImpl, PositionImpl>
pendingAckLogIndex;
+ final ConcurrentSkipListMap<PositionImpl, PositionImpl> pendingAckLogIndex;
private final ManagedCursor subManagedCursor;
+ private TxnLogBufferedWriter<PendingAckMetadataEntry> bufferedWriter;
+
public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor,
- ManagedCursor subManagedCursor, long
transactionPendingAckLogIndexMinLag) {
+ ManagedCursor subManagedCursor, long
transactionPendingAckLogIndexMinLag,
+ TxnLogBufferedWriterConfig bufferedWriterConfig,
+ Timer timer) {
this.managedLedger = managedLedger;
this.cursor = cursor;
this.currentLoadPosition = (PositionImpl)
this.cursor.getMarkDeletedPosition();
@@ -104,6 +129,11 @@ public class MLPendingAckStore implements PendingAckStore {
this.subManagedCursor = subManagedCursor;
this.logIndexBackoff = new
LogIndexLagBackoff(transactionPendingAckLogIndexMinLag, Long.MAX_VALUE, 1);
this.maxIndexLag = logIndexBackoff.next(0);
+ this.bufferedWriter = new TxnLogBufferedWriter(managedLedger,
((ManagedLedgerImpl) managedLedger).getExecutor(),
+ timer, PendingAckLogSerializer.INSTANCE,
+ bufferedWriterConfig.getBatchedWriteMaxRecords(),
bufferedWriterConfig.getBatchedWriteMaxSize(),
+ bufferedWriterConfig.getBatchedWriteMaxDelayInMillis(),
bufferedWriterConfig.isBatchEnabled());
+ this.batchedPendingAckLogsWaitingForHandle = new ArrayList<>();
}
@Override
@@ -131,6 +161,7 @@ public class MLPendingAckStore implements PendingAckStore {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] MLPendingAckStore closed
successfully!", managedLedger.getName(), ctx);
}
+ bufferedWriter.close();
completableFuture.complete(null);
}
@@ -213,22 +244,34 @@ public class MLPendingAckStore implements PendingAckStore
{
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
pendingAckMetadataEntry.setTxnidLeastBits(txnID.getLeastSigBits());
pendingAckMetadataEntry.setTxnidMostBits(txnID.getMostSigBits());
- int transactionMetadataEntrySize =
pendingAckMetadataEntry.getSerializedSize();
- ByteBuf buf =
PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize,
transactionMetadataEntrySize);
- pendingAckMetadataEntry.writeTo(buf);
- managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback()
{
+ bufferedWriter.asyncAddData(pendingAckMetadataEntry, new
TxnLogBufferedWriter.AddDataCallback() {
@Override
- public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ public void addComplete(Position position, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] MLPendingAckStore message append
success at {} txnId: {}, operation : {}",
managedLedger.getName(), ctx, position, txnID,
pendingAckMetadataEntry.getPendingAckOp());
}
currentIndexLag.incrementAndGet();
- handleMetadataEntry((PositionImpl) position,
pendingAckMetadataEntry);
- buf.release();
+ /**
+ * If the Batch feature is enabled by {@link #bufferedWriter},
+ * {@link #handleMetadataEntry(PositionImpl, List)} is
executed after all data in the batch is written,
+ * instead of {@link #handleMetadataEntry(PositionImpl,
PendingAckMetadataEntry)} after each data is
+ * written. This is because method {@link
#clearUselessLogData()} deletes the data in the unit of Entry.
+ * {@link TxnLogBufferedWriter.AddDataCallback#addComplete}
for elements in a batch is executed
+ * simultaneously and in strict order, so when the last
element in a batch is complete, the whole
+ * batch is complete.
+ */
+ if (position instanceof TxnBatchedPositionImpl
batchedPosition){
+
batchedPendingAckLogsWaitingForHandle.add(pendingAckMetadataEntry);
+ if (batchedPosition.getBatchIndex() ==
batchedPosition.getBatchSize() - 1){
+ handleMetadataEntry((PositionImpl) position,
batchedPendingAckLogsWaitingForHandle);
+ batchedPendingAckLogsWaitingForHandle.clear();
+ }
+ } else {
+ handleMetadataEntry((PositionImpl) position,
pendingAckMetadataEntry);
+ }
completableFuture.complete(null);
-
clearUselessLogData();
}
@@ -240,40 +283,66 @@ public class MLPendingAckStore implements PendingAckStore
{
if (exception instanceof
ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
managedLedger.readyToCreateNewLedger();
}
- buf.release();
completableFuture.completeExceptionally(new
PersistenceException(exception));
}
}, null);
return completableFuture;
}
- private void handleMetadataEntry(PositionImpl logPosition,
PendingAckMetadataEntry pendingAckMetadataEntry) {
+ /**
+ * Build the index mapping of Transaction pending ack log (aka t-log) and
Topic message log (aka m-log).
+ * When m-log has been ack, t-log which holds m-log is no longer useful,
this method builder the mapping of them.
+ *
+ * If a Ledger Entry has many t-log, we only need to care about the record
that carries the largest acknowledgement
+ * info. Because all Commit/Abort log after this record describes behavior
acknowledgement, if the behavior
+ * acknowledgement has been handle correct, these Commit/Abort log is no
longer useful.
+ * @param logPosition The position of batch log Entry.
+ * @param logList Pending ack log records in a batch log Entry.
+ */
+ private void handleMetadataEntry(PositionImpl logPosition,
+ List<PendingAckMetadataEntry> logList) {
+ Stream<PendingAckMetadata> pendingAckMetaStream = logList.stream()
+ .filter(log -> bothNotAbortAndCommitPredicate.test(log))
+ .flatMap(log -> log.getPendingAckMetadatasList().stream());
+ handleMetadataEntry(logPosition, pendingAckMetaStream);
+ }
+
+ private final Predicate<PendingAckMetadataEntry>
bothNotAbortAndCommitPredicate = (pendingAckLog) ->
+ pendingAckLog.getPendingAckOp() != PendingAckOp.ABORT
+ && pendingAckLog.getPendingAckOp() != PendingAckOp.COMMIT;
+
+ private void handleMetadataEntry(PositionImpl logPosition,
+ PendingAckMetadataEntry
pendingAckMetadataEntry) {
// store the persistent position in to memory
// store the max position of this entry retain
- if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
- && pendingAckMetadataEntry.getPendingAckOp() !=
PendingAckOp.COMMIT) {
- Optional<PendingAckMetadata> optional =
pendingAckMetadataEntry.getPendingAckMetadatasList()
- .stream().max((o1, o2) ->
ComparisonChain.start().compare(o1.getLedgerId(),
- o2.getLedgerId()).compare(o1.getEntryId(),
o2.getEntryId()).result());
-
- optional.ifPresent(pendingAckMetadata -> {
- PositionImpl nowPosition =
PositionImpl.get(pendingAckMetadata.getLedgerId(),
- pendingAckMetadata.getEntryId());
-
- if (nowPosition.compareTo(maxAckPosition) > 0) {
- maxAckPosition = nowPosition;
- }
- if (currentIndexLag.get() >= maxIndexLag) {
- pendingAckLogIndex.compute(maxAckPosition,
- (thisPosition, otherPosition) -> logPosition);
- maxIndexLag =
logIndexBackoff.next(pendingAckLogIndex.size());
- currentIndexLag.set(0);
- }
- });
+ if (bothNotAbortAndCommitPredicate.test(pendingAckMetadataEntry)) {
+ handleMetadataEntry(logPosition,
pendingAckMetadataEntry.getPendingAckMetadatasList().stream());
}
}
- private void clearUselessLogData() {
+ private void handleMetadataEntry(PositionImpl logPosition,
Stream<PendingAckMetadata> pendingAckListStream) {
+ // store the persistent position in to memory
+ // store the max position of this entry retain
+ Optional<PendingAckMetadata> optional = pendingAckListStream
+ .max((o1, o2) ->
ComparisonChain.start().compare(o1.getLedgerId(),
+ o2.getLedgerId()).compare(o1.getEntryId(),
o2.getEntryId()).result());
+ optional.ifPresent(pendingAckMetadata -> {
+ PositionImpl nowPosition =
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+ pendingAckMetadata.getEntryId());
+ if (nowPosition.compareTo(maxAckPosition) > 0) {
+ maxAckPosition = nowPosition;
+ }
+ if (currentIndexLag.get() >= maxIndexLag) {
+ pendingAckLogIndex.compute(maxAckPosition,
+ (thisPosition, otherPosition) -> logPosition);
+ maxIndexLag = logIndexBackoff.next(pendingAckLogIndex.size());
+ currentIndexLag.set(0);
+ }
+ });
+ }
+
+ @VisibleForTesting
+ void clearUselessLogData() {
if (!pendingAckLogIndex.isEmpty()) {
PositionImpl deletePosition = null;
while (!pendingAckLogIndex.isEmpty()
@@ -332,14 +401,24 @@ public class MLPendingAckStore implements PendingAckStore
{
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
&& fillEntryQueueCallback.fillQueue()) {
Entry entry = entryQueue.poll();
if (entry != null) {
- ByteBuf buffer = entry.getDataBuffer();
currentLoadPosition =
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
- PendingAckMetadataEntry pendingAckMetadataEntry = new
PendingAckMetadataEntry();
- pendingAckMetadataEntry.parseFrom(buffer,
buffer.readableBytes());
- currentIndexLag.incrementAndGet();
- handleMetadataEntry(new
PositionImpl(entry.getLedgerId(), entry.getEntryId()),
- pendingAckMetadataEntry);
-
pendingAckReplyCallBack.handleMetadataEntry(pendingAckMetadataEntry);
+ List<PendingAckMetadataEntry> logs =
deserializeEntry(entry);
+ if (logs.isEmpty()){
+ continue;
+ } else if (logs.size() == 1){
+ currentIndexLag.incrementAndGet();
+ PendingAckMetadataEntry log = logs.get(0);
+ handleMetadataEntry(new
PositionImpl(entry.getLedgerId(), entry.getEntryId()), log);
+ pendingAckReplyCallBack.handleMetadataEntry(log);
+ } else {
+ int batchSize = logs.size();
+ for (int batchIndex = 0; batchIndex < batchSize;
batchIndex++){
+ PendingAckMetadataEntry log =
logs.get(batchIndex);
+
pendingAckReplyCallBack.handleMetadataEntry(log);
+ }
+ currentIndexLag.addAndGet(batchSize);
+ handleMetadataEntry(new
PositionImpl(entry.getLedgerId(), entry.getEntryId()), logs);
+ }
entry.release();
clearUselessLogData();
} else {
@@ -362,6 +441,25 @@ public class MLPendingAckStore implements PendingAckStore {
}
}
+ private List<PendingAckMetadataEntry> deserializeEntry(Entry entry){
+ ByteBuf buffer = entry.getDataBuffer();
+ // Check whether it is batched Entry.
+ buffer.markReaderIndex();
+ short magicNum = buffer.readShort();
+ buffer.resetReaderIndex();
+ if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+ // skip version
+ buffer.skipBytes(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LEN +
BATCHED_ENTRY_DATA_PREFIX_VERSION_LEN);
+ BatchedPendingAckMetadataEntry batchedPendingAckMetadataEntry =
new BatchedPendingAckMetadataEntry();
+ batchedPendingAckMetadataEntry.parseFrom(buffer,
buffer.readableBytes());
+ return batchedPendingAckMetadataEntry.getPendingAckLogsList();
+ } else {
+ PendingAckMetadataEntry pendingAckMetadataEntry = new
PendingAckMetadataEntry();
+ pendingAckMetadataEntry.parseFrom(buffer, buffer.readableBytes());
+ return Collections.singletonList(pendingAckMetadataEntry);
+ }
+ }
+
class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback
{
private volatile boolean isReadable = true;
@@ -421,4 +519,49 @@ public class MLPendingAckStore implements PendingAckStore {
}
private static final Logger log =
LoggerFactory.getLogger(MLPendingAckStore.class);
+
+ /**
+ * Used only for buffered writer. Since all cmd-writes in buffered writer
are in the same thread, so we can use
+ * threadLocal variables here. Why need to be on the same thread ?
+ * Because {@link BatchedPendingAckMetadataEntry#clear()} will modifies
the elements in the attribute
+ * {@link BatchedPendingAckMetadataEntry#getPendingAckLogsList()}, this
will cause problems by multi-thread write.
+ */
+ private static final FastThreadLocal<BatchedPendingAckMetadataEntry>
batchedMetaThreadLocalForBufferedWriter =
+ new FastThreadLocal<>() {
+ @Override
+ protected BatchedPendingAckMetadataEntry initialValue() throws
Exception {
+ return new BatchedPendingAckMetadataEntry();
+ }
+ };
+
+ private static class PendingAckLogSerializer
+ implements
TxnLogBufferedWriter.DataSerializer<PendingAckMetadataEntry>{
+
+ private static final PendingAckLogSerializer INSTANCE = new
PendingAckLogSerializer();
+
+ @Override
+ public int getSerializedSize(PendingAckMetadataEntry data) {
+ return data.getSerializedSize();
+ }
+
+ @Override
+ public ByteBuf serialize(PendingAckMetadataEntry data) {
+ int batchSize = data.getSerializedSize();
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(batchSize,
batchSize);
+ data.writeTo(buf);
+ return buf;
+ }
+
+ @Override
+ public ByteBuf serialize(ArrayList<PendingAckMetadataEntry> dataArray)
{
+ // Since all writes are in the same thread, so we can use
threadLocal variables here.
+ BatchedPendingAckMetadataEntry batch =
batchedMetaThreadLocalForBufferedWriter.get();
+ batch.clear();
+ batch.addAllPendingAckLogs(dataArray);
+ int batchSize = batch.getSerializedSize();
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(batchSize,
batchSize);
+ batch.writeTo(buf);
+ return buf;
+ }
+ }
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index 6b84d6e329a..fdff9f59146 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.broker.transaction.pendingack.impl;
+import io.netty.util.Timer;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
@@ -31,6 +34,7 @@ import
org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.TopicName;
+import
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
/**
@@ -49,7 +53,25 @@ public class MLPendingAckStoreProvider implements
TransactionPendingAckStoreProv
.TransactionPendingAckStoreProviderException("The
subscription is null."));
return pendingAckStoreFuture;
}
+
PersistentTopic originPersistentTopic = (PersistentTopic)
subscription.getTopic();
+ PulsarService pulsarService =
originPersistentTopic.getBrokerService().getPulsar();
+
+ final Timer brokerClientSharedTimer =
+ pulsarService.getBrokerClientSharedTimer();
+ final ServiceConfiguration serviceConfiguration =
pulsarService.getConfiguration();
+ final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new
TxnLogBufferedWriterConfig();
+
txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionPendingAckBatchedWriteEnabled());
+ txnLogBufferedWriterConfig.setBatchedWriteMaxRecords(
+
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxRecords()
+ );
+ txnLogBufferedWriterConfig.setBatchedWriteMaxSize(
+
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxSize()
+ );
+ txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis(
+
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxDelayInMillis()
+ );
+
String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(),
subscription.getName());
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
@@ -81,7 +103,9 @@ public class MLPendingAckStoreProvider implements
TransactionPendingAckStoreProv
.getBrokerService()
.getPulsar()
.getConfiguration()
-
.getTransactionPendingAckLogIndexMinLag()));
+
.getTransactionPendingAckLogIndexMinLag(),
+
txnLogBufferedWriterConfig,
+
brokerClientSharedTimer));
if
(log.isDebugEnabled()) {
log.debug("{},{}
open MLPendingAckStore cursor success",
originPersistentTopic.getName(),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 2758a5f5819..8ef53590fde 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -612,6 +612,10 @@ public class TransactionTest extends TransactionTestBase {
@Test
public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws
Exception{
+ TxnLogBufferedWriterConfig bufferedWriterConfig = new
TxnLogBufferedWriterConfig();
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+
String topic = NAMESPACE1 +
"/testEndTPRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
@@ -643,7 +647,8 @@ public class TransactionTest extends TransactionTestBase {
TransactionPendingAckStoreProvider pendingAckStoreProvider =
mock(TransactionPendingAckStoreProvider.class);
doReturn(CompletableFuture.completedFuture(
- new MLPendingAckStore(persistentTopic.getManagedLedger(),
managedCursor, null, 500)))
+ new MLPendingAckStore(persistentTopic.getManagedLedger(),
managedCursor, null,
+ 500, bufferedWriterConfig, transactionTimer)))
.when(pendingAckStoreProvider).newPendingAckStore(any());
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());
@@ -676,6 +681,9 @@ public class TransactionTest extends TransactionTestBase {
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle3.getStats(false).state,
"Ready"));
+
+ // cleanup
+ transactionTimer.stop();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
index 14dbcdb8897..fe5c7fa1969 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -31,6 +34,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
+import
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
@@ -48,6 +52,10 @@ public class PendingAckMetadataTest extends
MockedBookKeeperTestCase {
@Test
public void testPendingAckManageLedgerWriteFailState() throws Exception {
+ TxnLogBufferedWriterConfig bufferedWriterConfig = new
TxnLogBufferedWriterConfig();
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+
ManagedLedgerFactoryConfig factoryConf = new
ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);
@@ -72,7 +80,8 @@ public class PendingAckMetadataTest extends
MockedBookKeeperTestCase {
ManagedCursor cursor = completableFuture.get().openCursor("test");
ManagedCursor subCursor = completableFuture.get().openCursor("test");
MLPendingAckStore pendingAckStore =
- new MLPendingAckStore(completableFuture.get(), cursor,
subCursor, 500);
+ new MLPendingAckStore(completableFuture.get(), cursor,
subCursor, 500,
+ bufferedWriterConfig, transactionTimer);
Field field =
MLPendingAckStore.class.getDeclaredField("managedLedger");
field.setAccessible(true);
@@ -90,9 +99,12 @@ public class PendingAckMetadataTest extends
MockedBookKeeperTestCase {
}
pendingAckStore.appendAbortMark(new TxnID(1, 1),
CommandAck.AckType.Cumulative).get();
+ // cleanup.
+ pendingAckStore.closeAsync();
completableFuture.get().close();
cursor.close();
subCursor.close();
+ transactionTimer.stop();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
new file mode 100644
index 00000000000..86eb14d78fc
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.util.FutureUtil;
+import
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class MLPendingAckStoreTest extends TransactionTestBase {
+
+ private PersistentSubscription persistentSubscriptionMock;
+
+ private ManagedCursor managedCursorMock;
+
+ private ExecutorService internalPinnedExecutor;
+
+ private int pendingAckLogIndexMinLag = 1;
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ setUpBase(1, 1, NAMESPACE1 + "/test", 0);
+ String topic = NAMESPACE1 + "/test-txn-topic";
+ admin.topics().createNonPartitionedTopic(topic);
+ PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0).getBrokerService()
+ .getTopic(topic, false).get().get();
+
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(pendingAckLogIndexMinLag);
+ CompletableFuture<Subscription> subscriptionFuture = persistentTopic
.createSubscription("test",
+ CommandSubscribe.InitialPosition.Earliest, false,
null);
+ PersistentSubscription subscription = (PersistentSubscription)
subscriptionFuture.get();
+ ManagedCursor managedCursor = subscription.getCursor();
+ this.managedCursorMock = spy(managedCursor);
+ this.persistentSubscriptionMock = spy(subscription);
+
when(this.persistentSubscriptionMock.getCursor()).thenReturn(managedCursorMock);
+ this.internalPinnedExecutor = this.persistentSubscriptionMock
+ .getTopic()
+ .getBrokerService()
+ .getPulsar()
+ .getTransactionExecutorProvider()
+ .getExecutor(this);
+ }
+
+ @AfterMethod
+ public void cleanup(){
+ super.internalCleanup();
+ }
+
+ private MLPendingAckStore createPendingAckStore(TxnLogBufferedWriterConfig
txnLogBufferedWriterConfig)
+ throws Exception {
+ MLPendingAckStoreProvider mlPendingAckStoreProvider = new
MLPendingAckStoreProvider();
+ ServiceConfiguration serviceConfiguration =
+
persistentSubscriptionMock.getTopic().getBrokerService().getPulsar().getConfiguration();
+ serviceConfiguration.setTransactionPendingAckBatchedWriteMaxRecords(
+ txnLogBufferedWriterConfig.getBatchedWriteMaxRecords()
+ );
+ serviceConfiguration.setTransactionPendingAckBatchedWriteMaxSize(
+ txnLogBufferedWriterConfig.getBatchedWriteMaxSize()
+ );
+
serviceConfiguration.setTransactionPendingAckBatchedWriteMaxDelayInMillis(
+ txnLogBufferedWriterConfig.getBatchedWriteMaxDelayInMillis()
+ );
+
serviceConfiguration.setTransactionPendingAckBatchedWriteEnabled(txnLogBufferedWriterConfig.isBatchEnabled());
+ return (MLPendingAckStore)
mlPendingAckStoreProvider.newPendingAckStore(persistentSubscriptionMock).get();
+ }
+
+ /**
+ * Overridden cases:
+ * 1. Batched write and replay with batched feature.
+ * 1. Non-batched write and replay without batched feature
+ * 1. Batched write and replay without batched feature.
+ * 1. Non-batched write and replay with batched feature.
+ */
+ @DataProvider(name = "mainProcessArgs")
+ public Object[][] mainProcessArgsProvider(){
+ Object[][] args = new Object[4][];
+ args[0] = new Object[]{true, true};
+ args[1] = new Object[]{false, false};
+ args[2] = new Object[]{true, false};
+ args[3] = new Object[]{false, true};
+ return args;
+ }
+
+ /**
+ * This method executed the following steps of validation:
+ * 1. Write some data, verify indexes build correct after write.
+ * 2. Replay data that has been written, verify indexes build correct
after replay.
+ * 3. Verify that position deletion is in sync with {@link
PersistentSubscription}.
+ * @param writeWithBatch Whether to enable batch feature when writing data.
+ * @param readWithBatch Whether to enable batch feature when replay.
+ */
+ @Test(dataProvider = "mainProcessArgs")
+ public void testMainProcess(boolean writeWithBatch, boolean readWithBatch)
throws Exception {
+ // Write some data.
+ TxnLogBufferedWriterConfig configForWrite = new
TxnLogBufferedWriterConfig();
+ configForWrite.setBatchEnabled(writeWithBatch);
+ configForWrite.setBatchedWriteMaxRecords(2);
+ // Denied scheduled flush.
+ configForWrite.setBatchedWriteMaxDelayInMillis(1000 * 3600);
+ MLPendingAckStore mlPendingAckStoreForWrite =
createPendingAckStore(configForWrite);
+ List<CompletableFuture<Void>> futureList = new ArrayList<>();
+ for (int i = 0; i < 20; i++){
+ TxnID txnID = new TxnID(i, i);
+ PositionImpl position = PositionImpl.get(i, i);
+
futureList.add(mlPendingAckStoreForWrite.appendCumulativeAck(txnID, position));
+ }
+ for (int i = 0; i < 10; i++){
+ TxnID txnID = new TxnID(i, i);
+ futureList.add(mlPendingAckStoreForWrite.appendCommitMark(txnID,
CommandAck.AckType.Cumulative));
+ }
+ for (int i = 10; i < 20; i++){
+ TxnID txnID = new TxnID(i, i);
+ futureList.add(mlPendingAckStoreForWrite.appendAbortMark(txnID,
CommandAck.AckType.Cumulative));
+ }
+ for (int i = 40; i < 50; i++){
+ TxnID txnID = new TxnID(i, i);
+ PositionImpl position = PositionImpl.get(i, i);
+
futureList.add(mlPendingAckStoreForWrite.appendCumulativeAck(txnID, position));
+ }
+ FutureUtil.waitForAll(futureList).get();
+ // Verify build sparse indexes correct after add many cmd-ack.
+ ArrayList<Long> positionList = new ArrayList<>();
+ for (long i = 0; i < 50; i++){
+ positionList.add(i);
+ }
+ // The indexes not contains the data which is commit or abort.
+ LinkedHashSet<Long> skipSet = new LinkedHashSet<>();
+ for (long i = 20; i < 40; i++){
+ skipSet.add(i);
+ }
+ if (writeWithBatch) {
+ for (long i = 0; i < 50; i++){
+ if (i % 2 == 0){
+ // The indexes contains only the last position in the
batch.
+ skipSet.add(i);
+ }
+ }
+ }
+ LinkedHashSet<Long> expectedPositions =
calculatePendingAckIndexes(positionList, skipSet);
+ Assert.assertEquals(
+ mlPendingAckStoreForWrite.pendingAckLogIndex.keySet().stream()
+
.map(PositionImpl::getEntryId).collect(Collectors.toList()),
+ new ArrayList<>(expectedPositions)
+ );
+ // Replay.
+ TxnLogBufferedWriterConfig configForReplay = new
TxnLogBufferedWriterConfig();
+ configForReplay.setBatchEnabled(readWithBatch);
+ configForReplay.setBatchedWriteMaxRecords(2);
+ // Denied scheduled flush.
+ configForReplay.setBatchedWriteMaxDelayInMillis(1000 * 3600);
+ MLPendingAckStore mlPendingAckStoreForRead =
createPendingAckStore(configForReplay);
+ PendingAckHandleImpl pendingAckHandle =
mock(PendingAckHandleImpl.class);
+
when(pendingAckHandle.getInternalPinnedExecutor()).thenReturn(internalPinnedExecutor);
+ when(pendingAckHandle.changeToReadyState()).thenReturn(true);
+ // Process controller, mark the replay task already finish.
+ final AtomicInteger processController = new AtomicInteger();
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ processController.incrementAndGet();
+ return null;
+ }
+ }).when(pendingAckHandle).completeHandleFuture();
+ mlPendingAckStoreForRead.replayAsync(pendingAckHandle,
internalPinnedExecutor);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
processController.get() == 1);
+ // Verify build sparse indexes correct after replay.
+ Assert.assertEquals(mlPendingAckStoreForRead.pendingAckLogIndex.size(),
+ mlPendingAckStoreForWrite.pendingAckLogIndex.size());
+ Iterator<Map.Entry<PositionImpl, PositionImpl>> iteratorReplay =
+
mlPendingAckStoreForRead.pendingAckLogIndex.entrySet().iterator();
+ Iterator<Map.Entry<PositionImpl, PositionImpl>> iteratorWrite =
+
mlPendingAckStoreForWrite.pendingAckLogIndex.entrySet().iterator();
+ while (iteratorReplay.hasNext()){
+ Map.Entry<PositionImpl, PositionImpl> replayEntry =
iteratorReplay.next();
+ Map.Entry<PositionImpl, PositionImpl> writeEntry =
iteratorWrite.next();
+ Assert.assertEquals(replayEntry.getKey(), writeEntry.getKey());
+ Assert.assertEquals(replayEntry.getValue().getLedgerId(),
writeEntry.getValue().getLedgerId());
+ Assert.assertEquals(replayEntry.getValue().getEntryId(),
writeEntry.getValue().getEntryId());
+ }
+ // Verify delete correct.
+
when(managedCursorMock.getPersistentMarkDeletedPosition()).thenReturn(PositionImpl.get(19,
19));
+ mlPendingAckStoreForWrite.clearUselessLogData();
+ mlPendingAckStoreForRead.clearUselessLogData();
+
Assert.assertTrue(mlPendingAckStoreForWrite.pendingAckLogIndex.keySet().iterator().next().getEntryId()
> 19);
+
Assert.assertTrue(mlPendingAckStoreForRead.pendingAckLogIndex.keySet().iterator().next().getEntryId()
> 19);
+
+ // cleanup.
+ mlPendingAckStoreForWrite.closeAsync().get();
+ mlPendingAckStoreForRead.closeAsync().get();
+ }
+
+ /**
+ * Build a sparse index from the {@param positionList}, the logic same as
{@link MLPendingAckStore}.
+ * @param positionList the position add to pending ack log/
+ * @param skipSet the position which should increment the count but not
marked to indexes. aka: commit & abort.
+ */
+ private LinkedHashSet<Long> calculatePendingAckIndexes(List<Long>
positionList, LinkedHashSet<Long> skipSet){
+ LogIndexLagBackoff logIndexLagBackoff = new
LogIndexLagBackoff(pendingAckLogIndexMinLag, Long.MAX_VALUE, 1);
+ long nextCount = logIndexLagBackoff.next(0);
+ long recordCountInCurrentLoop = 0;
+ LinkedHashSet<Long> indexes = new LinkedHashSet<>();
+ for (int i = 0; i < positionList.size(); i++){
+ recordCountInCurrentLoop ++;
+ long value = positionList.get(i);
+ if (skipSet.contains(value)){
+ continue;
+ }
+ if (recordCountInCurrentLoop >= nextCount){
+ indexes.add(value);
+ nextCount = logIndexLagBackoff.next(indexes.size());
+ recordCountInCurrentLoop = 0;
+ }
+ }
+ return indexes;
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
index 49b93db0f9c..90116047402 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
@@ -545,7 +545,7 @@ public class TxnLogBufferedWriter<T> implements
AsyncCallbacks.AddEntryCallback,
- interface AddDataCallback {
+ public interface AddDataCallback {
void addComplete(Position position, Object context);