This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 666d4bb3444 [feat][txn] Implement the AbortedTxnProcessor for
TransactionBuffer (#17847)
666d4bb3444 is described below
commit 666d4bb34441942d20ddac1e9a1c094284067778
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Nov 1 01:33:57 2022 +0800
[feat][txn] Implement the AbortedTxnProcessor for TransactionBuffer (#17847)
Master Issue: https://github.com/apache/pulsar/issues/16913
### Motivation
Implement an abortedTxnProcessor to handle the storage of the aborted
transaction ID.
### Modifications
The structure overview:

The main idea is to move the logic of the operation of checking and
persistent aborted transaction IDs(take snapshots) and the operation of
updating maxReadPosition into the AbortedTxnProcessor.
And the AbortedTxnProcessor can be implemented in different designs.
**Add `persistentWorker` to handle snapshot persistenting** :
<img width="1003" alt="image"
src="https://user-images.githubusercontent.com/55571188/198528131-3cde19bc-2034-4693-a8b1-4d6345e6db36.png">
The first four items below are the corresponding four tasks in the figure.
The fifth item is not strictly a task, but a part of the first two tasks.
* takeSnapshotSegmentAsync -> writeSnapshotSegmentAsync
* These two method is used to persist the snapshot segment.
* deleteSnapshotSegment
* This method is used to delete the snapshot segment.
* updateIndexMetadataForTheLastSnapshot
* Using to update index metadata (the latest snapshot).
* clearSnapshotSegmentAndIndexes
* Delete all segments and then delete the index of this topic.
* updateSnapshotIndex
* Called by the deleteSnapshotSegment and writeSnapshotSegmentAsync. Do
update the index after writing snapshot segment.
* Called by recovery as a compensation mechanism for updating the index.
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [x] `doc-not-needed`
(Please explain why)
### Matching PR in the forked repository
PR in forked repository: https://github.com/liangyepianzhou/pulsar/pull/7
---
.../broker/service/AbstractBaseDispatcher.java | 3 +-
.../broker/service/persistent/PersistentTopic.java | 4 +-
.../transaction/buffer/AbortedTxnProcessor.java | 77 ++++++
.../transaction/buffer/TransactionBuffer.java | 3 +-
.../buffer/impl/InMemTransactionBuffer.java | 2 +-
.../SingleSnapshotAbortedTxnProcessorImpl.java | 182 +++++++++++++++
.../buffer/impl/TopicTransactionBuffer.java | 259 ++++++---------------
.../TopicTransactionBufferRecoverCallBack.java | 8 -
.../buffer/impl/TransactionBufferDisable.java | 2 +-
.../v2/TransactionBufferSnapshotIndexes.java | 2 +-
... TransactionBufferSnapshotIndexesMetadata.java} | 21 +-
.../broker/service/AbstractBaseDispatcherTest.java | 2 +-
.../TopicTransactionBufferRecoverTest.java | 73 +++---
.../pulsar/broker/transaction/TransactionTest.java | 21 +-
14 files changed, 413 insertions(+), 246 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index b19360d5e10..f9e8e61d400 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -163,7 +163,8 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
entry.release();
continue;
} else if (((PersistentTopic) subscription.getTopic())
- .isTxnAborted(new
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
+ .isTxnAborted(new
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
+ (PositionImpl) entry.getPosition())) {
individualAcknowledgeMessageIfNeeded(entry.getPosition(),
Collections.emptyMap());
entries.set(i, null);
entry.release();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 94cc0964c37..965c1e164a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3337,8 +3337,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return this.transactionBuffer.getMaxReadPosition();
}
- public boolean isTxnAborted(TxnID txnID) {
- return this.transactionBuffer.isTxnAborted(txnID);
+ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
+ return this.transactionBuffer.isTxnAborted(txnID, readPosition);
}
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
new file mode 100644
index 00000000000..e436e1df249
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+ /**
+ * After the transaction buffer writes a transaction aborted marker to the
topic,
+ * the transaction buffer will put the aborted txnID and the aborted
marker position to AbortedTxnProcessor.
+ * @param txnID aborted transaction ID.
+ * @param position the position of the abort txnID
+ */
+ void putAbortedTxnAndPosition(TxnID txnID, PositionImpl position);
+
+ /**
+ * Clean up invalid aborted transactions.
+ */
+ void trimExpiredAbortedTxns();
+
+ /**
+ * Check whether the transaction ID is an aborted transaction ID.
+ * @param txnID the transaction ID that needs to be checked.
+ * @param readPosition the read position of the transaction message, can
be used to find the segment.
+ * @return a boolean, whether the transaction ID is an aborted transaction
ID.
+ */
+ boolean checkAbortedTransaction(TxnID txnID, Position readPosition);
+
+ /**
+ * Recover transaction buffer by transaction buffer snapshot.
+ * @return a Position (startReadCursorPosition) determiner where to start
to recover in the original topic.
+ */
+
+ CompletableFuture<PositionImpl> recoverFromSnapshot();
+
+ /**
+ * Delete the transaction buffer aborted transaction snapshot.
+ * @return a completableFuture.
+ */
+ CompletableFuture<Void> deleteAbortedTxnSnapshot();
+
+ /**
+ * Take aborted transactions snapshot.
+ * @return a completableFuture.
+ */
+ CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl
maxReadPosition);
+
+ /**
+ * Get the lastSnapshotTimestamps.
+ * @return the lastSnapshotTimestamps.
+ */
+ long getLastSnapshotTimestamps();
+
+ CompletableFuture<Void> closeAsync();
+
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index a47af54e32a..99093e42fd7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -141,9 +141,10 @@ public interface TransactionBuffer {
/**
* Close the buffer asynchronously.
* @param txnID {@link TxnID} txnId.
+ * @param readPosition the persitent position of the txn message.
* @return the txnId is aborted.
*/
- boolean isTxnAborted(TxnID txnID);
+ boolean isTxnAborted(TxnID txnID, PositionImpl readPosition);
/**
* Sync max read position for normal publish.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index d843542c8cb..56b49f98efe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -360,7 +360,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
}
@Override
- public boolean isTxnAborted(TxnID txnID) {
+ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
return false;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
new file mode 100644
index 00000000000..a13dd0499a6
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements
AbortedTxnProcessor {
+ private final PersistentTopic topic;
+ private final
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>>
takeSnapshotWriter;
+ /**
+ * Aborts, map for jude message is aborted, linked for remove abort txn in
memory when this
+ * position have been deleted.
+ */
+ private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+
+ private volatile long lastSnapshotTimestamps;
+
+ public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
+ this.topic = topic;
+ this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
+ .getTransactionBufferSnapshotServiceFactory()
+
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+ }
+
+ @Override
+ public void putAbortedTxnAndPosition(TxnID abortedTxnId, PositionImpl
position) {
+ aborts.put(abortedTxnId, position);
+ }
+
+ //In this implementation we clear the invalid aborted txn ID one by one.
+ @Override
+ public void trimExpiredAbortedTxns() {
+ while (!aborts.isEmpty() && !((ManagedLedgerImpl)
topic.getManagedLedger())
+ .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic transaction buffer clear aborted
transaction, TxnId : {}, Position : {}",
+ topic.getName(), aborts.firstKey(),
aborts.get(aborts.firstKey()));
+ }
+ aborts.remove(aborts.firstKey());
+ }
+ }
+
+ @Override
+ public boolean checkAbortedTransaction(TxnID txnID, Position readPosition)
{
+ return aborts.containsKey(txnID);
+ }
+
+
+ @Override
+ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
+ return
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotService()
+
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
+ PositionImpl startReadCursorPosition = null;
+ try {
+ while (reader.hasMoreEvents()) {
+ Message<TransactionBufferSnapshot> message =
reader.readNext();
+ if (topic.getName().equals(message.getKey())) {
+ TransactionBufferSnapshot
transactionBufferSnapshot = message.getValue();
+ if (transactionBufferSnapshot != null) {
+ handleSnapshot(transactionBufferSnapshot);
+ startReadCursorPosition = PositionImpl.get(
+
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+
transactionBufferSnapshot.getMaxReadPositionEntryId());
+ }
+ }
+ }
+ closeReader(reader);
+ return
CompletableFuture.completedFuture(startReadCursorPosition);
+ } catch (Exception ex) {
+ log.error("[{}] Transaction buffer recover fail when
read "
+ + "transactionBufferSnapshot!",
topic.getName(), ex);
+ closeReader(reader);
+ return FutureUtil.failedFuture(ex);
+ }
+
+ },
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+ .getExecutor(this));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAbortedTxnSnapshot() {
+ return this.takeSnapshotWriter.thenCompose(writer -> {
+ TransactionBufferSnapshot snapshot = new
TransactionBufferSnapshot();
+ snapshot.setTopicName(topic.getName());
+ return writer.deleteAsync(snapshot.getTopicName(), snapshot);
+ }).thenRun(() -> {
+ log.info("[{}] Successes to delete the aborted transaction
snapshot", this.topic);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl
maxReadPosition) {
+ return takeSnapshotWriter.thenCompose(writer -> {
+ TransactionBufferSnapshot snapshot = new
TransactionBufferSnapshot();
+ snapshot.setTopicName(topic.getName());
+ snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
+ snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
+ List<AbortTxnMetadata> list = new ArrayList<>();
+ aborts.forEach((k, v) -> {
+ AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
+ abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
+ abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
+ abortTxnMetadata.setLedgerId(v.getLedgerId());
+ abortTxnMetadata.setEntryId(v.getEntryId());
+ list.add(abortTxnMetadata);
+ });
+ snapshot.setAborts(list);
+ return writer.writeAsync(snapshot.getTopicName(),
snapshot).thenAccept(messageId -> {
+ this.lastSnapshotTimestamps = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}]Transaction buffer take snapshot success! "
+ + "messageId : {}", topic.getName(), messageId);
+ }
+ }).exceptionally(e -> {
+ log.warn("[{}]Transaction buffer take snapshot fail! ",
topic.getName(), e.getCause());
+ return null;
+ });
+ });
+ }
+
+ @Override
+ public long getLastSnapshotTimestamps() {
+ return this.lastSnapshotTimestamps;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return
takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+ }
+
+ private void
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
+ reader.closeAsync().exceptionally(e -> {
+ log.error("[{}]Transaction buffer reader close error!",
topic.getName(), e);
+ return null;
+ });
+ }
+
+ private void handleSnapshot(TransactionBufferSnapshot snapshot) {
+ if (snapshot.getAborts() != null) {
+ snapshot.getAborts().forEach(abortTxnMetadata ->
+ aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(),
+ abortTxnMetadata.getTxnIdLeastBits()),
+ PositionImpl.get(abortTxnMetadata.getLedgerId(),
+ abortTxnMetadata.getEntryId())));
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index dfebbba7c66..f3bf4f95923 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -38,23 +37,20 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
-import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
@@ -79,14 +75,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
*/
private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new
LinkedMap<>();
- /**
- * Aborts, map for jude message is aborted, linked for remove abort txn in
memory when this
- * position have been deleted.
- */
- private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
-
- private final
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>>
takeSnapshotWriter;
-
// when add abort or change max read position, the count will +1. Take
snapshot will set 0 into it.
private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new
AtomicLong();
@@ -100,8 +88,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final int takeSnapshotIntervalTime;
- private volatile long lastSnapshotTimestamps;
-
private final CompletableFuture<Void> transactionBufferFuture = new
CompletableFuture<>();
/**
@@ -113,18 +99,18 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final Semaphore handleLowWaterMark = new Semaphore(1);
+ private final AbortedTxnProcessor snapshotAbortedTxnProcessor;
+
public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
this.topic = topic;
- this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
- .getTransactionBufferSnapshotServiceFactory()
-
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
this.timer =
topic.getBrokerService().getPulsar().getTransactionTimer();
this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
this.maxReadPosition = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
+ this.snapshotAbortedTxnProcessor = new
SingleSnapshotAbortedTxnProcessorImpl(topic);
this.recover();
}
@@ -135,9 +121,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public void recoverComplete() {
synchronized (TopicTransactionBuffer.this) {
- // sync maxReadPosition change to LAC when
TopicTransaction buffer have not recover
- // completely the normal message have been sent to
broker and state is
- // not Ready can't sync maxReadPosition when no
ongoing transactions
if (ongoingTxns.isEmpty()) {
maxReadPosition = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
}
@@ -160,9 +143,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public void noNeedToRecover() {
synchronized (TopicTransactionBuffer.this) {
- // sync maxReadPosition change to LAC when
TopicTransaction buffer have not recover
- // completely the normal message have been sent to
broker and state is
- // not NoSnapshot can't sync maxReadPosition
maxReadPosition = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
if (!changeToNoSnapshotState()) {
log.error("[{}]Transaction buffer recover
fail", topic.getName());
@@ -172,20 +152,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
}
-
- @Override
- public void handleSnapshot(TransactionBufferSnapshot
snapshot) {
- maxReadPosition =
PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
- snapshot.getMaxReadPositionEntryId());
- if (snapshot.getAborts() != null) {
- snapshot.getAborts().forEach(abortTxnMetadata ->
- aborts.put(new
TxnID(abortTxnMetadata.getTxnIdMostBits(),
-
abortTxnMetadata.getTxnIdLeastBits()),
-
PositionImpl.get(abortTxnMetadata.getLedgerId(),
-
abortTxnMetadata.getEntryId())));
- }
- }
-
@Override
public void handleTxnEntry(Entry entry) {
ByteBuf metadataAndPayload = entry.getDataBuffer();
@@ -197,7 +163,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
PositionImpl position =
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
if (Markers.isTxnMarker(msgMetadata)) {
if (Markers.isTxnAbortMarker(msgMetadata)) {
- aborts.put(txnID, position);
+
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
}
updateMaxReadPosition(txnID);
} else {
@@ -225,7 +191,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
recoverTime.setRecoverEndTime(System.currentTimeMillis());
topic.close(true);
}
- }, this.topic, this, takeSnapshotWriter));
+ }, this.topic, this, snapshotAbortedTxnProcessor));
}
@Override
@@ -241,7 +207,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
transactionBufferFuture.thenRun(() -> {
if (checkIfNoSnapshot()) {
- takeSnapshot().thenRun(() -> {
+
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(()
-> {
if (changeToReadyStateFromNoSnapshot()) {
timer.newTimeout(TopicTransactionBuffer.this,
takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
@@ -308,7 +274,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
private void handleTransactionMessage(TxnID txnId, Position position) {
- if (!ongoingTxns.containsKey(txnId) && !aborts.containsKey(txnId)) {
+ if (!ongoingTxns.containsKey(txnId) &&
!this.snapshotAbortedTxnProcessor
+ .checkAbortedTransaction(txnId, position)) {
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition =
ongoingTxns.get(ongoingTxns.firstKey());
//max read position is less than first ongoing transaction message
position, so entryId -1
@@ -316,7 +283,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
-
@Override
public CompletableFuture<TransactionBufferReader>
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
return null;
@@ -339,7 +305,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
synchronized (TopicTransactionBuffer.this) {
updateMaxReadPosition(txnID);
handleLowWaterMark(txnID, lowWaterMark);
- clearAbortedTransactions();
+
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
}
txnCommittedCounter.increment();
@@ -383,10 +349,9 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
- aborts.put(txnID, (PositionImpl) position);
updateMaxReadPosition(txnID);
-
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
- clearAbortedTransactions();
+
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, maxReadPosition);
+
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
}
txnAbortedCounter.increment();
@@ -454,60 +419,20 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private void takeSnapshotByChangeTimes() {
if (changeMaxReadPositionAndAddAbortTimes.get() >=
takeSnapshotIntervalNumber) {
- takeSnapshot();
+ this.changeMaxReadPositionAndAddAbortTimes.set(0);
+
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
}
}
private void takeSnapshotByTimeout() {
if (changeMaxReadPositionAndAddAbortTimes.get() > 0) {
- takeSnapshot();
+ this.changeMaxReadPositionAndAddAbortTimes.set(0);
+
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
}
this.timer.newTimeout(TopicTransactionBuffer.this,
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
}
- private CompletableFuture<Void> takeSnapshot() {
- changeMaxReadPositionAndAddAbortTimes.set(0);
- return takeSnapshotWriter.thenCompose(writer -> {
- TransactionBufferSnapshot snapshot = new
TransactionBufferSnapshot();
- synchronized (TopicTransactionBuffer.this) {
- snapshot.setTopicName(topic.getName());
-
snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
-
snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
- List<AbortTxnMetadata> list = new ArrayList<>();
- aborts.forEach((k, v) -> {
- AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
- abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
- abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
- abortTxnMetadata.setLedgerId(v.getLedgerId());
- abortTxnMetadata.setEntryId(v.getEntryId());
- list.add(abortTxnMetadata);
- });
- snapshot.setAborts(list);
- }
- return writer.writeAsync(snapshot.getTopicName(),
snapshot).thenAccept(messageId-> {
- this.lastSnapshotTimestamps = System.currentTimeMillis();
- if (log.isDebugEnabled()) {
- log.debug("[{}]Transaction buffer take snapshot success! "
- + "messageId : {}", topic.getName(), messageId);
- }
- }).exceptionally(e -> {
- log.warn("[{}]Transaction buffer take snapshot fail! ",
topic.getName(), e);
- return null;
- });
- });
- }
- private void clearAbortedTransactions() {
- while (!aborts.isEmpty() && !((ManagedLedgerImpl)
topic.getManagedLedger())
- .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
- if (log.isDebugEnabled()) {
- aborts.firstKey();
- log.debug("[{}] Topic transaction buffer clear aborted
transaction, TxnId : {}, Position : {}",
- topic.getName(), aborts.firstKey(),
aborts.get(aborts.firstKey()));
- }
- aborts.remove(aborts.firstKey());
- }
- }
void updateMaxReadPosition(TxnID txnID) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
ongoingTxns.remove(txnID);
@@ -530,22 +455,18 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public CompletableFuture<Void> clearSnapshot() {
- return this.takeSnapshotWriter.thenCompose(writer -> {
- TransactionBufferSnapshot snapshot = new
TransactionBufferSnapshot();
- snapshot.setTopicName(topic.getName());
- return writer.deleteAsync(snapshot.getTopicName(), snapshot);
- }).thenCompose(__ -> CompletableFuture.completedFuture(null));
+ return snapshotAbortedTxnProcessor.deleteAbortedTxnSnapshot();
}
@Override
public CompletableFuture<Void> closeAsync() {
changeToCloseState();
- return
this.takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+ return this.snapshotAbortedTxnProcessor.closeAsync();
}
@Override
- public boolean isTxnAborted(TxnID txnID) {
- return aborts.containsKey(txnID);
+ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
+ return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID,
readPosition);
}
@Override
@@ -554,7 +475,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
// thread is the same tread, in this time the lastAddConfirm don't
content transaction message.
synchronized (TopicTransactionBuffer.this) {
if (checkIfNoSnapshot()) {
- maxReadPosition = position;
+ this.maxReadPosition = position;
} else if (checkIfReady()) {
if (ongoingTxns.isEmpty()) {
maxReadPosition = position;
@@ -576,7 +497,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
TransactionInBufferStats transactionInBufferStats = new
TransactionInBufferStats();
- transactionInBufferStats.aborted = isTxnAborted(txnID);
+ transactionInBufferStats.aborted = isTxnAborted(txnID, null);
if (ongoingTxns.containsKey(txnID)) {
transactionInBufferStats.startPosition =
ongoingTxns.get(txnID).toString();
}
@@ -586,7 +507,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public TransactionBufferStats getStats(boolean lowWaterMarks) {
TransactionBufferStats transactionBufferStats = new
TransactionBufferStats();
- transactionBufferStats.lastSnapshotTimestamps =
this.lastSnapshotTimestamps;
+ transactionBufferStats.lastSnapshotTimestamps =
this.snapshotAbortedTxnProcessor.getLastSnapshotTimestamps();
transactionBufferStats.state = this.getState().name();
transactionBufferStats.maxReadPosition =
this.maxReadPosition.toString();
if (lowWaterMarks) {
@@ -602,7 +523,9 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public void run(Timeout timeout) {
if (checkIfReady()) {
- takeSnapshotByTimeout();
+ synchronized (TopicTransactionBuffer.this) {
+ takeSnapshotByTimeout();
+ }
}
}
@@ -625,108 +548,76 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final TopicTransactionBuffer topicTransactionBuffer;
- private final
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>>
takeSnapshotWriter;
+ private final AbortedTxnProcessor abortedTxnProcessor;
private
TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack,
PersistentTopic topic,
- TopicTransactionBuffer
transactionBuffer, CompletableFuture<
- SystemTopicClient.Writer<TransactionBufferSnapshot>>
takeSnapshotWriter) {
+ TopicTransactionBuffer
transactionBuffer,
+ AbortedTxnProcessor
abortedTxnProcessor) {
this.topic = topic;
this.callBack = callBack;
this.entryQueue = new SpscArrayQueue<>(2000);
this.topicTransactionBuffer = transactionBuffer;
- this.takeSnapshotWriter = takeSnapshotWriter;
+ this.abortedTxnProcessor = abortedTxnProcessor;
}
@SneakyThrows
@Override
public void run() {
- this.takeSnapshotWriter.thenRunAsync(() -> {
- if (!this.topicTransactionBuffer.changeToInitializingState()) {
- log.warn("TransactionBuffer {} of topic {} can not change
state to Initializing",
- this, topic.getName());
+ if (!this.topicTransactionBuffer.changeToInitializingState()) {
+ log.warn("TransactionBuffer {} of topic {} can not change
state to Initializing",
+ this, topic.getName());
+ return;
+ }
+
abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition
-> {
+ //Transaction is not use for this topic, so just make
maxReadPosition as LAC.
+ if (startReadCursorPosition == null) {
+ callBack.noNeedToRecover();
+ return;
+ } else {
+ this.startReadCursorPosition = startReadCursorPosition;
+ }
+ ManagedCursor managedCursor;
+ try {
+ managedCursor = topic.getManagedLedger()
+ .newNonDurableCursor(this.startReadCursorPosition,
SUBSCRIPTION_NAME);
+ } catch (ManagedLedgerException e) {
+ callBack.recoverExceptionally(e);
+ log.error("[{}]Transaction buffer recover fail when open
cursor!", topic.getName(), e);
return;
}
-
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
-
.getTxnBufferSnapshotService().createReader(TopicName.get(topic.getName()))
- .thenAcceptAsync(reader -> {
+ PositionImpl lastConfirmedEntry =
+ (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
+ PositionImpl currentLoadPosition = (PositionImpl)
this.startReadCursorPosition;
+ FillEntryQueueCallback fillEntryQueueCallback = new
FillEntryQueueCallback(entryQueue,
+ managedCursor, TopicTransactionBufferRecover.this);
+ if (lastConfirmedEntry.getEntryId() != -1) {
+ while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
+ && fillEntryQueueCallback.fillQueue()) {
+ Entry entry = entryQueue.poll();
+ if (entry != null) {
try {
- boolean hasSnapshot = false;
- while (reader.hasMoreEvents()) {
- Message<TransactionBufferSnapshot> message
= reader.readNext();
- if
(topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshot
transactionBufferSnapshot = message.getValue();
- if (transactionBufferSnapshot != null)
{
- hasSnapshot = true;
-
callBack.handleSnapshot(transactionBufferSnapshot);
- this.startReadCursorPosition =
PositionImpl.get(
-
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-
transactionBufferSnapshot.getMaxReadPositionEntryId());
- }
- }
- }
- if (!hasSnapshot) {
- closeReader(reader);
- callBack.noNeedToRecover();
- return;
- }
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover
fail when read "
- + "transactionBufferSnapshot!",
topic.getName(), ex);
- callBack.recoverExceptionally(ex);
- closeReader(reader);
- return;
+ currentLoadPosition =
PositionImpl.get(entry.getLedgerId(),
+ entry.getEntryId());
+ callBack.handleTxnEntry(entry);
+ } finally {
+ entry.release();
}
- closeReader(reader);
-
- ManagedCursor managedCursor;
+ } else {
try {
- managedCursor = topic.getManagedLedger()
-
.newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME);
- } catch (ManagedLedgerException e) {
- callBack.recoverExceptionally(e);
- log.error("[{}]Transaction buffer recover fail
when open cursor!", topic.getName(), e);
- return;
- }
- PositionImpl lastConfirmedEntry =
- (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
- PositionImpl currentLoadPosition = (PositionImpl)
this.startReadCursorPosition;
- FillEntryQueueCallback fillEntryQueueCallback =
new FillEntryQueueCallback(entryQueue,
- managedCursor,
TopicTransactionBufferRecover.this);
- if (lastConfirmedEntry.getEntryId() != -1) {
- while
(lastConfirmedEntry.compareTo(currentLoadPosition) > 0
- && fillEntryQueueCallback.fillQueue())
{
- Entry entry = entryQueue.poll();
- if (entry != null) {
- try {
- currentLoadPosition =
PositionImpl.get(entry.getLedgerId(),
- entry.getEntryId());
- callBack.handleTxnEntry(entry);
- } finally {
- entry.release();
- }
- } else {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- //no-op
- }
- }
- }
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ //no-op
}
+ }
+ }
+ }
- closeCursor(SUBSCRIPTION_NAME);
- callBack.recoverComplete();
- },
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this)).exceptionally(e -> {
- callBack.recoverExceptionally(e.getCause());
- log.error("[{}]Transaction buffer new snapshot
reader fail!", topic.getName(), e);
- return null;
- });
+ closeCursor(SUBSCRIPTION_NAME);
+ callBack.recoverComplete();
},
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this)).exceptionally(e -> {
callBack.recoverExceptionally(e.getCause());
- log.error("[{}]Transaction buffer create snapshot writer
fail!",
- topic.getName(), e);
+ log.error("[{}]Transaction buffer failed to recover
snapshot!", topic.getName(), e);
return null;
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
index 32ff0ebe16b..7dedb2cb6b7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.transaction.buffer.impl;
import org.apache.bookkeeper.mledger.Entry;
-import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
public interface TopicTransactionBufferRecoverCallBack {
@@ -34,13 +33,6 @@ public interface TopicTransactionBufferRecoverCallBack {
*/
void noNeedToRecover();
- /**
- * Handle transactionBufferSnapshot.
- *
- * @param snapshot the transaction buffer snapshot
- */
- void handleSnapshot(TransactionBufferSnapshot snapshot);
-
/**
* Handle transaction entry beyond the snapshot.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index c1d26b2eaec..22ba8e2d2e8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -79,7 +79,7 @@ public class TransactionBufferDisable implements
TransactionBuffer {
}
@Override
- public boolean isTxnAborted(TxnID txnID) {
+ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
return false;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
index 613083e5fb7..d548835ed8f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
@@ -35,6 +35,6 @@ public class TransactionBufferSnapshotIndexes {
private List<TransactionBufferSnapshotIndex> indexList;
- private TransactionBufferSnapshotSegment snapshot;
+ private TransactionBufferSnapshotIndexesMetadata snapshot;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexesMetadata.java
similarity index 75%
copy from
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
copy to
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexesMetadata.java
index 613083e5fb7..9a468d250bb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexesMetadata.java
@@ -18,23 +18,16 @@
*/
package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
-import java.util.List;
+import java.util.Set;
import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Getter;
+import lombok.Data;
import lombok.NoArgsConstructor;
-import lombok.Setter;
+@Data
@AllArgsConstructor
@NoArgsConstructor
-@Getter
-@Setter
-@Builder
-public class TransactionBufferSnapshotIndexes {
- private String topicName;
-
- private List<TransactionBufferSnapshotIndex> indexList;
-
- private TransactionBufferSnapshotSegment snapshot;
-
+public class TransactionBufferSnapshotIndexesMetadata {
+ private long maxReadPositionLedgerId;
+ private long maxReadPositionEntryId;
+ private Set<TxnIDData> aborts;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index a4f2b6ec282..554ef1c3f96 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -130,7 +130,7 @@ public class AbstractBaseDispatcherTest {
PersistentTopic mockTopic = mock(PersistentTopic.class);
when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
- when(mockTopic.isTxnAborted(any(TxnID.class))).thenReturn(true);
+ when(mockTopic.isTxnAborted(any(TxnID.class), any())).thenReturn(true);
List<Entry> entries = new ArrayList<>();
entries.add(EntryImpl.create(1, 1, createTnxMessage("message1", 1)));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index c1e347aa8c7..39c324d92f3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -30,16 +30,12 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import java.io.IOException;
import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -61,6 +57,8 @@ import
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import
org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
@@ -128,6 +126,14 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
};
}
+ @DataProvider(name = "enableSnapshotSegment")
+ public Object[] testSnapshot() {
+ return new Boolean[] {
+ true,
+ false
+ };
+ }
+
@Test(dataProvider = "testTopic")
private void recoverTest(String testTopic) throws Exception {
PulsarClient pulsarClient = this.pulsarClient;
@@ -169,7 +175,7 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
Message<String> message = consumer.receive(2, TimeUnit.SECONDS);
assertNull(message);
- tnx1.commit();
+ tnx1.commit().get();
// only can receive message 1
message = consumer.receive(2, TimeUnit.SECONDS);
@@ -242,9 +248,7 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
}
- @Test
- private void testTakeSnapshot() throws IOException, ExecutionException,
InterruptedException {
-
+ private void testTakeSnapshot() throws Exception {
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
@@ -314,8 +318,9 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
}
- @Test
- private void testTopicTransactionBufferDeleteAbort() throws Exception {
+ @Test(dataProvider = "enableSnapshotSegment")
+ private void testTopicTransactionBufferDeleteAbort(Boolean
enableSnapshotSegment) throws Exception {
+
getPulsarServiceList().get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(enableSnapshotSegment);
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
@@ -390,22 +395,34 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
field.setAccessible(true);
TopicTransactionBuffer topicTransactionBuffer =
(TopicTransactionBuffer)
field.get(persistentTopic);
- field =
TopicTransactionBuffer.class.getDeclaredField("aborts");
+ field =
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
field.setAccessible(true);
- LinkedMap<TxnID, PositionImpl> linkedMap =
- (LinkedMap<TxnID, PositionImpl>)
field.get(topicTransactionBuffer);
- assertEquals(linkedMap.size(), 1);
-
assertEquals(linkedMap.get(linkedMap.firstKey()).getLedgerId(),
- ((MessageIdImpl)
message.getMessageId()).getLedgerId());
- exist = true;
+ AbortedTxnProcessor abortedTxnProcessor =
(AbortedTxnProcessor) field.get(topicTransactionBuffer);
+
+ if (enableSnapshotSegment) {
+ //TODO
+ exist = true;
+ } else {
+ Field abortsField =
SingleSnapshotAbortedTxnProcessorImpl.class.getDeclaredField("aborts");
+ abortsField.setAccessible(true);
+
+ LinkedMap<TxnID, PositionImpl> linkedMap =
+ (LinkedMap<TxnID, PositionImpl>)
abortsField.get(abortedTxnProcessor);
+ assertEquals(linkedMap.size(), 1);
+
assertEquals(linkedMap.get(linkedMap.firstKey()).getLedgerId(),
+ ((MessageIdImpl)
message.getMessageId()).getLedgerId());
+ exist = true;
+ }
+
}
}
}
assertTrue(exist);
}
- @Test
- public void clearTransactionBufferSnapshotTest() throws Exception {
+ @Test(dataProvider = "enableSnapshotSegment")
+ public void clearTransactionBufferSnapshotTest(Boolean
enableSnapshotSegment) throws Exception {
+
getPulsarServiceList().get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(enableSnapshotSegment);
String topic = NAMESPACE1 + "/tb-snapshot-delete-" +
RandomUtils.nextInt();
Producer<byte[]> producer = pulsarClient
@@ -426,9 +443,11 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
PersistentTopic originalTopic = (PersistentTopic)
getPulsarServiceList().get(0)
.getBrokerService().getTopic(TopicName.get(topic).toString(),
false).get().get();
TopicTransactionBuffer topicTransactionBuffer =
(TopicTransactionBuffer) originalTopic.getTransactionBuffer();
- Method takeSnapshotMethod =
TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
- takeSnapshotMethod.setAccessible(true);
- takeSnapshotMethod.invoke(topicTransactionBuffer);
+ Field abortedTxnProcessorField =
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
+ abortedTxnProcessorField.setAccessible(true);
+ AbortedTxnProcessor abortedTxnProcessor =
+ (AbortedTxnProcessor)
abortedTxnProcessorField.get(topicTransactionBuffer);
+
abortedTxnProcessor.takeAbortedTxnsSnapshot(topicTransactionBuffer.getMaxReadPosition());
TopicName transactionBufferTopicName =
NamespaceEventsSystemTopicFactory.getSystemTopicName(
@@ -667,8 +686,9 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
snapshot.setSequenceId(1L);
snapshot.setMaxReadPositionLedgerId(2L);
snapshot.setMaxReadPositionEntryId(3L);
- snapshot.setAborts(Collections.singletonList(
- new TxnIDData(1, 1)));
+ LinkedList<TxnIDData> txnIDSet = new LinkedList<>();
+ txnIDSet.add(new TxnIDData(1, 1));
+ snapshot.setAborts(txnIDSet );
segmentWriter.write(buildKey(snapshot), snapshot);
snapshot.setSequenceId(2L);
@@ -711,14 +731,15 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
ByteBuf headersAndPayload = entry.getDataBuffer();
//skip metadata
MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
- snapshot =
Schema.AVRO(TransactionBufferSnapshotSegment.class).decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());
+ snapshot = Schema.AVRO(TransactionBufferSnapshotSegment.class)
+ .decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());
//verify snapshot
assertEquals(snapshot.getTopicName(), snapshotTopic);
assertEquals(snapshot.getSequenceId(), 2L);
assertEquals(snapshot.getMaxReadPositionLedgerId(), 2L);
assertEquals(snapshot.getMaxReadPositionEntryId(), 3L);
- assertEquals(snapshot.getAborts().get(0), new TxnIDData(1, 1));
+ assertEquals(snapshot.getAborts().toArray()[0], new TxnIDData(1, 1));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 64dca965267..33305a2b8df 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -91,6 +91,7 @@ import
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
@@ -942,9 +943,11 @@ public class TransactionTest extends TransactionTestBase {
filed1.set(persistentTopic, managedLedger);
TopicTransactionBuffer topicTransactionBuffer =
(TopicTransactionBuffer) field2.get(persistentTopic);
- Method method =
TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
- method.setAccessible(true);
- CompletableFuture<Void> completableFuture = (CompletableFuture<Void>)
method.invoke(topicTransactionBuffer);
+ Field processorField =
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
+ processorField.setAccessible(true);
+ AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor)
processorField.get(topicTransactionBuffer);
+ CompletableFuture<Void> completableFuture =
abortedTxnProcessor.takeAbortedTxnsSnapshot(
+ topicTransactionBuffer.getMaxReadPosition());
completableFuture.get();
doReturn(PositionImpl.LATEST).when(managedLedger).getLastConfirmedEntry();
@@ -1026,9 +1029,15 @@ public class TransactionTest extends TransactionTestBase
{
.getTopic(NAMESPACE1 +
"/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
.get().get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
- Field field =
TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
- field.setAccessible(true);
- AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong)
field.get(buffer);
+ Field processorField =
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
+ processorField.setAccessible(true);
+
+ AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor)
processorField.get(buffer);
+ Field changeTimeField = TopicTransactionBuffer
+
.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
+ changeTimeField.setAccessible(true);
+ AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong)
changeTimeField.get(buffer);
+
Field field1 =
TopicTransactionBufferState.class.getDeclaredField("state");
field1.setAccessible(true);