This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 40c8c23123e [improve][broker] Reduce the CPU pressure from the
transaction buffer in rolling restarts (#23062)
40c8c23123e is described below
commit 40c8c23123e6c4e1c403445cc98e8fed6997f5b0
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jul 29 20:48:15 2024 +0800
[improve][broker] Reduce the CPU pressure from the transaction buffer in
rolling restarts (#23062)
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../SystemTopicTxnBufferSnapshotService.java | 15 +-
.../TransactionBufferSnapshotServiceFactory.java | 26 +-
.../SingleSnapshotAbortedTxnProcessorImpl.java | 73 +---
.../SnapshotSegmentAbortedTxnProcessorImpl.java | 394 +++++++++------------
.../broker/transaction/buffer/impl/TableView.java | 97 +++++
.../buffer/impl/TopicTransactionBuffer.java | 5 +-
.../java/org/apache/pulsar/utils/SimpleCache.java | 83 +++++
.../TopicTransactionBufferRecoverTest.java | 46 ++-
.../org/apache/pulsar/utils/SimpleCacheTest.java | 81 +++++
10 files changed, 486 insertions(+), 336 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c623f5d4e5b..b23851a5ec4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -992,7 +992,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
- this.transactionBufferSnapshotServiceFactory = new
TransactionBufferSnapshotServiceFactory(getClient());
+ this.transactionBufferSnapshotServiceFactory = new
TransactionBufferSnapshotServiceFactory(this);
this.transactionTimer =
new HashedWheelTimer(new
DefaultThreadFactory("pulsar-transaction-timer"));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index bd1b9098169..ba6cbee3557 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -22,12 +22,16 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -42,6 +46,8 @@ public class SystemTopicTxnBufferSnapshotService<T> {
protected final EventType systemTopicType;
private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>>
refCountedWriterMap;
+ @Getter
+ private final TableView<T> tableView;
// The class ReferenceCountedWriter will maintain the reference count,
// when the reference count decrement to 0, it will be removed from
writerFutureMap, the writer will be closed.
@@ -95,13 +101,16 @@ public class SystemTopicTxnBufferSnapshotService<T> {
}
- public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType
systemTopicType,
- Class<T> schemaType) {
+ public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType
systemTopicType,
+ Class<T> schemaType) throws
PulsarServerException {
+ final var client = (PulsarClientImpl) pulsar.getClient();
this.namespaceEventsSystemTopicFactory = new
NamespaceEventsSystemTopicFactory(client);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
this.refCountedWriterMap = new ConcurrentHashMap<>();
+ this.tableView = new TableView<>(this::createReader,
+ client.getConfiguration().getOperationTimeoutMs(),
pulsar.getExecutor());
}
public CompletableFuture<SystemTopicClient.Reader<T>>
createReader(TopicName topicName) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
index 4b8548fae47..d54f65572f5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.broker.service;
+import lombok.Getter;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.events.EventType;
+@Getter
public class TransactionBufferSnapshotServiceFactory {
private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot>
txnBufferSnapshotService;
@@ -33,29 +36,16 @@ public class TransactionBufferSnapshotServiceFactory {
private
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
txnBufferSnapshotIndexService;
- public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
- this.txnBufferSnapshotSegmentService = new
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ public TransactionBufferSnapshotServiceFactory(PulsarService pulsar)
throws PulsarServerException {
+ this.txnBufferSnapshotSegmentService = new
SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
TransactionBufferSnapshotSegment.class);
- this.txnBufferSnapshotIndexService = new
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ this.txnBufferSnapshotIndexService = new
SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES,
TransactionBufferSnapshotIndexes.class);
- this.txnBufferSnapshotService = new
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ this.txnBufferSnapshotService = new
SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT,
TransactionBufferSnapshot.class);
}
- public
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
getTxnBufferSnapshotIndexService() {
- return this.txnBufferSnapshotIndexService;
- }
-
- public
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
- getTxnBufferSnapshotSegmentService() {
- return this.txnBufferSnapshotSegmentService;
- }
-
- public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot>
getTxnBufferSnapshotService() {
- return this.txnBufferSnapshotService;
- }
-
public void close() throws Exception {
if (this.txnBufferSnapshotIndexService != null) {
this.txnBufferSnapshotIndexService.close();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 5c9075e9a38..1649349e3e6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,26 +21,19 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.collections4.map.LinkedMap;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
-import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
public class SingleSnapshotAbortedTxnProcessorImpl implements
AbortedTxnProcessor {
@@ -91,48 +84,27 @@ public class SingleSnapshotAbortedTxnProcessorImpl
implements AbortedTxnProcesso
return aborts.containsKey(txnID);
}
- private long getSystemClientOperationTimeoutMs() throws Exception {
- PulsarClientImpl pulsarClient = (PulsarClientImpl)
topic.getBrokerService().getPulsar().getClient();
- return pulsarClient.getConfiguration().getOperationTimeoutMs();
- }
-
@Override
public CompletableFuture<Position> recoverFromSnapshot() {
- return
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotService()
-
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- try {
- Position startReadCursorPosition = null;
- while (reader.hasMoreEvents()) {
- Message<TransactionBufferSnapshot> message =
reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(),
TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshot
transactionBufferSnapshot = message.getValue();
- if (transactionBufferSnapshot != null) {
- handleSnapshot(transactionBufferSnapshot);
- startReadCursorPosition =
PositionFactory.create(
-
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-
transactionBufferSnapshot.getMaxReadPositionEntryId());
- }
- }
- }
- return
CompletableFuture.completedFuture(startReadCursorPosition);
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction
buffer recover fail by read "
- + "transactionBufferSnapshot timeout!",
topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(
- new
BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when
read "
- + "transactionBufferSnapshot!",
topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
- }
- },
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
+ final var future = new CompletableFuture<Position>();
+ final var pulsar = topic.getBrokerService().getPulsar();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(()
-> {
+ try {
+ final var snapshot =
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .getTableView().readLatest(topic.getName());
+ if (snapshot != null) {
+ handleSnapshot(snapshot);
+ final var startReadCursorPosition =
PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
+ snapshot.getMaxReadPositionEntryId());
+ future.complete(startReadCursorPosition);
+ } else {
+ future.complete(null);
+ }
+ } catch (Throwable e) {
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
}
@Override
@@ -191,13 +163,6 @@ public class SingleSnapshotAbortedTxnProcessorImpl
implements AbortedTxnProcesso
return CompletableFuture.completedFuture(null);
}
- private void
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
- reader.closeAsync().exceptionally(e -> {
- log.error("[{}]Transaction buffer reader close error!",
topic.getName(), e);
- return null;
- });
- }
-
private void handleSnapshot(TransactionBufferSnapshot snapshot) {
if (snapshot.getAborts() != null) {
snapshot.getAborts().forEach(abortTxnMetadata ->
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index e94e7a04779..4ca27f77a87 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -24,11 +24,11 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
@@ -54,7 +54,6 @@ import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBuffer
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -228,220 +227,129 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
@Override
public CompletableFuture<Position> recoverFromSnapshot() {
- return
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotIndexService()
-
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- Position startReadCursorPosition = null;
- TransactionBufferSnapshotIndexes persistentSnapshotIndexes
= null;
- try {
- /*
- Read the transaction snapshot segment index.
- <p>
- The processor can get the sequence ID, unsealed
transaction IDs,
- segment index list and max read position in the
snapshot segment index.
- Then we can traverse the index list to read all
aborted transaction IDs
- in segments to aborts.
- </p>
- */
- while (reader.hasMoreEvents()) {
- Message<TransactionBufferSnapshotIndexes> message
= reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(),
TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshotIndexes
transactionBufferSnapshotIndexes = message.getValue();
- if (transactionBufferSnapshotIndexes != null) {
- persistentSnapshotIndexes =
transactionBufferSnapshotIndexes;
- startReadCursorPosition =
PositionFactory.create(
-
transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionLedgerId(),
-
transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionEntryId());
- }
- }
- }
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction
buffer recover fail by read "
- + "transactionBufferSnapshot timeout!",
topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(
- new
BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when
read "
- + "transactionBufferSnapshot!",
topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
- }
- Position finalStartReadCursorPosition =
startReadCursorPosition;
- TransactionBufferSnapshotIndexes
finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
- if (persistentSnapshotIndexes == null) {
- return recoverOldSnapshot();
- } else {
- this.unsealedTxnIds =
convertTypeToTxnID(persistentSnapshotIndexes
- .getSnapshot().getAborts());
- }
- //Read snapshot segment to recover aborts.
- ArrayList<CompletableFuture<Void>> completableFutures =
new ArrayList<>();
- CompletableFuture<Void>
openManagedLedgerAndHandleSegmentsFuture = new CompletableFuture<>();
- AtomicBoolean hasInvalidIndex = new AtomicBoolean(false);
- AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback
= new AsyncCallbacks
- .OpenReadOnlyManagedLedgerCallback() {
- @Override
- public void
openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl
readOnlyManagedLedger,
- Object
ctx) {
-
finalPersistentSnapshotIndexes.getIndexList().forEach(index -> {
- CompletableFuture<Void> handleSegmentFuture =
new CompletableFuture<>();
- completableFutures.add(handleSegmentFuture);
- readOnlyManagedLedger.asyncReadEntry(
-
PositionFactory.create(index.getSegmentLedgerID(),
- index.getSegmentEntryID()),
- new AsyncCallbacks.ReadEntryCallback()
{
- @Override
- public void
readEntryComplete(Entry entry, Object ctx) {
-
handleSnapshotSegmentEntry(entry);
-
indexes.put(PositionFactory.create(
-
index.abortedMarkLedgerID,
-
index.abortedMarkEntryID),
- index);
- entry.release();
-
handleSegmentFuture.complete(null);
- }
-
- @Override
- public void
readEntryFailed(ManagedLedgerException exception, Object ctx) {
- /*
- The logic flow of deleting
expired segment is:
- <p>
- 1. delete segment
- 2. update segment index
- </p>
- If the worker delete segment
successfully
- but failed to update segment
index,
- the segment can not be read
according to the index.
- We update index again if
there are invalid indexes.
- */
- if (((ManagedLedgerImpl)
topic.getManagedLedger())
-
.ledgerExists(index.getAbortedMarkLedgerID())) {
- log.error("[{}] Failed to
read snapshot segment [{}:{}]",
- topic.getName(),
index.segmentLedgerID,
-
index.segmentEntryID, exception);
-
handleSegmentFuture.completeExceptionally(exception);
- } else {
- hasInvalidIndex.set(true);
- }
- }
-
- @Override
- public String toString() {
- return
String.format("Transaction buffer [%s] recover from snapshot",
-
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
- }
- }, null);
- });
-
openManagedLedgerAndHandleSegmentsFuture.complete(null);
- }
+ final var pulsar = topic.getBrokerService().getPulsar();
+ final var future = new CompletableFuture<Position>();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(()
-> {
+ try {
+ final var indexes =
pulsar.getTransactionBufferSnapshotServiceFactory()
+
.getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
+ if (indexes == null) {
+ // Try recovering from the old format snapshot
+ future.complete(recoverOldSnapshot());
+ return;
+ }
+ final var snapshot = indexes.getSnapshot();
+ final var startReadCursorPosition =
PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
+ snapshot.getMaxReadPositionEntryId());
+ this.unsealedTxnIds = convertTypeToTxnID(snapshot.getAborts());
+ // Read snapshot segment to recover aborts
+ final var snapshotSegmentTopicName =
TopicName.get(TopicDomain.persistent.toString(),
+ TopicName.get(topic.getName()).getNamespaceObject(),
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ readSegmentEntries(snapshotSegmentTopicName, indexes);
+ if (!this.indexes.isEmpty()) {
+ // If there is no segment index, the persistent worker
will write segment begin from 0.
+
persistentWorker.sequenceID.set(this.indexes.get(this.indexes.lastKey()).sequenceID
+ 1);
+ }
+ unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
+ future.complete(startReadCursorPosition);
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
- @Override
- public void
openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
- log.error("[{}] Failed to open readOnly managed
ledger", topic, exception);
-
openManagedLedgerAndHandleSegmentsFuture.completeExceptionally(exception);
- }
- };
-
- TopicName snapshotSegmentTopicName =
TopicName.get(TopicDomain.persistent.toString(),
-
TopicName.get(topic.getName()).getNamespaceObject(),
-
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
-
this.topic.getBrokerService().getPulsar().getManagedLedgerFactory()
-
.asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName
- .getPersistenceNamingEncoding(),
callback,
- topic.getManagedLedger().getConfig(),
- null);
- /*
- Wait the processor recover completely and then allow TB
- to recover the messages after the
startReadCursorPosition.
- */
- return openManagedLedgerAndHandleSegmentsFuture
- .thenCompose((ignore) ->
FutureUtil.waitForAll(completableFutures))
- .thenCompose((i) -> {
- /*
- Update the snapshot segment index if there
exist invalid indexes.
- */
- if (hasInvalidIndex.get()) {
-
persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
- () ->
persistentWorker.updateSnapshotIndex(
-
finalPersistentSnapshotIndexes.getSnapshot()));
- }
- /*
- If there is no segment index, the
persistent worker will write segment begin from 0.
- */
- if (indexes.size() != 0) {
-
persistentWorker.sequenceID.set(indexes.get(indexes.lastKey()).sequenceID + 1);
- }
- /*
- Append the aborted txn IDs in the index
metadata
- can keep the order of the aborted txn in the
aborts.
- So that we can trim the expired snapshot
segment in aborts
- according to the latest transaction IDs in
the segmentIndex.
- */
- unsealedTxnIds.forEach(txnID ->
aborts.put(txnID, txnID));
- return
CompletableFuture.completedFuture(finalStartReadCursorPosition);
- }).exceptionally(ex -> {
- log.error("[{}] Failed to recover snapshot
segment", this.topic.getName(), ex);
- return null;
- });
-
- },
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
+ private void readSegmentEntries(TopicName topicName,
TransactionBufferSnapshotIndexes indexes) throws Exception {
+ final var managedLedger = openReadOnlyManagedLedger(topicName);
+ boolean hasInvalidIndex = false;
+ for (var index : indexes.getIndexList()) {
+ final var position =
PositionFactory.create(index.getSegmentLedgerID(), index.getSegmentEntryID());
+ final var abortedPosition =
PositionFactory.create(index.abortedMarkLedgerID, index.abortedMarkEntryID);
+ try {
+ final var entry = readEntry(managedLedger, position);
+ try {
+ handleSnapshotSegmentEntry(entry);
+ this.indexes.put(abortedPosition, index);
+ } finally {
+ entry.release();
+ }
+ } catch (Throwable throwable) {
+ if (((ManagedLedgerImpl) topic.getManagedLedger())
+ .ledgerExists(index.getAbortedMarkLedgerID())) {
+ log.error("[{}] Failed to read snapshot segment [{}:{}]",
+ topic.getName(), index.segmentLedgerID,
+ index.segmentEntryID, throwable);
+ throw throwable;
+ } else {
+ hasInvalidIndex = true;
+ }
+ }
+ }
+ if (hasInvalidIndex) {
+ // Update the snapshot segment index if there exist invalid
indexes.
+
persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
+ () ->
persistentWorker.updateSnapshotIndex(indexes.getSnapshot()));
+ }
+ }
+
+ private ReadOnlyManagedLedgerImpl openReadOnlyManagedLedger(TopicName
topicName) throws Exception {
+ final var future = new CompletableFuture<ReadOnlyManagedLedgerImpl>();
+ final var callback = new
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
+ @Override
+ public void
openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger,
Object ctx) {
+ future.complete(managedLedger);
+ }
+
+ @Override
+ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException
exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Transaction buffer [%s] recover from
snapshot",
+
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
+ }
+ };
+
topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(
+ topicName.getPersistenceNamingEncoding(), callback,
topic.getManagedLedger().getConfig(), null);
+ return wait(future, "open read only ml for " + topicName);
+ }
+
+ private Entry readEntry(ReadOnlyManagedLedgerImpl managedLedger, Position
position) throws Exception {
+ final var future = new CompletableFuture<Entry>();
+ managedLedger.asyncReadEntry(position, new
AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ future.complete(entry);
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ return wait(future, "read entry from " + position);
}
// This method will be deprecated and removed in version 4.x.0
- private CompletableFuture<Position> recoverOldSnapshot() {
- return
topic.getBrokerService().getPulsar().getPulsarResources().getTopicResources()
-
.listPersistentTopicsAsync(NamespaceName.get(TopicName.get(topic.getName()).getNamespace()))
- .thenCompose(topics -> {
- if (!topics.contains(TopicDomain.persistent + "://"
- + TopicName.get(topic.getName()).getNamespace() +
"/"
- + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
- return CompletableFuture.completedFuture(null);
- } else {
- return
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotService()
-
.createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader
-> {
- Position
startReadCursorPositionInOldSnapshot = null;
- try {
- while (snapshotReader.hasMoreEvents())
{
- Message<TransactionBufferSnapshot>
message = snapshotReader.readNextAsync()
-
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if
(topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshot
transactionBufferSnapshot =
- message.getValue();
- if (transactionBufferSnapshot
!= null) {
-
handleOldSnapshot(transactionBufferSnapshot);
-
startReadCursorPositionInOldSnapshot = PositionFactory.create(
-
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-
transactionBufferSnapshot.getMaxReadPositionEntryId());
- }
- }
- }
- } catch (TimeoutException ex) {
- Throwable t =
FutureUtil.unwrapCompletionException(ex);
- String errorMessage =
String.format("[%s] Transaction buffer recover fail by "
- + "read
transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(new
BrokerServiceException
-
.ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer
recover fail when read "
- +
"transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- assert snapshotReader != null;
- closeReader(snapshotReader);
- }
- return
CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
- },
-
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
- }
- });
+ private Position recoverOldSnapshot() throws Exception {
+ final var pulsar = topic.getBrokerService().getPulsar();
+ final var topicName = TopicName.get(topic.getName());
+ final var topics =
wait(pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(
+ NamespaceName.get(topicName.getNamespace())), "list persistent
topics");
+ if (!topics.contains(TopicDomain.persistent + "://" +
topicName.getNamespace() + "/"
+ + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
+ return null;
+ }
+ final var snapshot =
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .getTableView().readLatest(topic.getName());
+ if (snapshot == null) {
+ return null;
+ }
+ handleOldSnapshot(snapshot);
+ return PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
snapshot.getMaxReadPositionEntryId());
}
// This method will be deprecated and removed in version 4.x.0
@@ -509,9 +417,17 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
return pulsarClient.getConfiguration().getOperationTimeoutMs();
}
- private <T> void closeReader(SystemTopicClient.Reader<T> reader) {
+ private <R> R wait(CompletableFuture<R> future, String msg) throws
Exception {
+ try {
+ return future.get(getSystemClientOperationTimeoutMs(),
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
+ }
+
+ private <T> void closeReader(SystemTopicClient.Reader<T> reader) {
reader.closeAsync().exceptionally(e -> {
- log.error("[{}]Transaction buffer snapshot reader close error!",
topic.getName(), e);
+ log.warn("[{}] Failed to close reader: {}", topic.getName(),
e.getMessage());
return null;
});
}
@@ -838,25 +754,37 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
* </p>
*/
private CompletableFuture<Void> clearAllSnapshotSegments() {
- return
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotSegmentService()
-
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- try {
- while (reader.hasMoreEvents()) {
- Message<TransactionBufferSnapshotSegment>
message = reader.readNextAsync()
-
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if
(topic.getName().equals(message.getValue().getTopicName())) {
-
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
- }
+ final var future = new CompletableFuture<Void>();
+ final var pulsar = topic.getBrokerService().getPulsar();
+
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var reader =
wait(pulsar.getTransactionBufferSnapshotServiceFactory()
+
.getTxnBufferSnapshotSegmentService().createReader(TopicName.get(topic.getName()))
+ , "create reader");
+ try {
+ while (wait(reader.hasMoreEventsAsync(), "has more
events")) {
+ final var message = wait(reader.readNextAsync(),
"read next");
+ if
(topic.getName().equals(message.getValue().getTopicName())) {
+
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
}
- return CompletableFuture.completedFuture(null);
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer clear snapshot
segments fail!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
}
- });
+ future.complete(null);
+ } finally {
+ closeReader(reader);
+ }
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
+
+ private <R> R wait(CompletableFuture<R> future, String msg) throws
Exception {
+ try {
+ return future.get(getSystemClientOperationTimeoutMs(),
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg,
e.getCause());
+ }
}
synchronized CompletableFuture<Void> closeAsync() {
@@ -882,4 +810,4 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
return segment;
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
new file mode 100644
index 00000000000..7608a393cc9
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.utils.SimpleCache;
+
+/**
+ * Compared with the more generic {@link
org.apache.pulsar.client.api.TableView}, this table view
+ * - Provides just a single public method that reads the latest value
synchronously.
+ * - Maintains multiple long-lived readers that will be expired after some
time (1 minute by default).
+ */
+@Slf4j
+public class TableView<T> {
+
+ // Remove the cached reader and snapshots if there is no refresh request
in 1 minute
+ private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
+ private static final long CACHE_EXPIRE_CHECK_FREQUENCY_MS = 3000L;
+ @VisibleForTesting
+ protected final Function<TopicName, CompletableFuture<Reader<T>>>
readerCreator;
+ private final Map<String, T> snapshots = new ConcurrentHashMap<>();
+ private final long clientOperationTimeoutMs;
+ private final SimpleCache<NamespaceName, Reader<T>> readers;
+
+ public TableView(Function<TopicName, CompletableFuture<Reader<T>>>
readerCreator, long clientOperationTimeoutMs,
+ ScheduledExecutorService executor) {
+ this.readerCreator = readerCreator;
+ this.clientOperationTimeoutMs = clientOperationTimeoutMs;
+ this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS,
CACHE_EXPIRE_CHECK_FREQUENCY_MS);
+ }
+
+ public T readLatest(String topic) throws Exception {
+ final var reader = getReader(topic);
+ while (wait(reader.hasMoreEventsAsync(), "has more events")) {
+ final var msg = wait(reader.readNextAsync(), "read message");
+ if (msg.getKey() != null) {
+ if (msg.getValue() != null) {
+ snapshots.put(msg.getKey(), msg.getValue());
+ } else {
+ snapshots.remove(msg.getKey());
+ }
+ }
+ }
+ return snapshots.get(topic);
+ }
+
+ @VisibleForTesting
+ protected Reader<T> getReader(String topic) {
+ final var topicName = TopicName.get(topic);
+ return readers.get(topicName.getNamespaceObject(), () -> {
+ try {
+ return wait(readerCreator.apply(topicName), "create reader");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, __ -> __.closeAsync().exceptionally(e -> {
+ log.warn("Failed to close reader {}", e.getMessage());
+ return null;
+ }));
+ }
+
+ private <R> R wait(CompletableFuture<R> future, String msg) throws
Exception {
+ try {
+ return future.get(clientOperationTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index b4662e5fa83..7561457d11f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -632,7 +632,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
this, topic.getName());
return;
}
-
abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition
-> {
+
abortedTxnProcessor.recoverFromSnapshot().thenAccept(startReadCursorPosition ->
{
//Transaction is not use for this topic, so just make
maxReadPosition as LAC.
if (startReadCursorPosition == null) {
callBack.noNeedToRecover();
@@ -678,8 +678,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
closeCursor(SUBSCRIPTION_NAME);
callBack.recoverComplete();
- },
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this)).exceptionally(e -> {
+ }).exceptionally(e -> {
callBack.recoverExceptionally(e.getCause());
log.error("[{}]Transaction buffer failed to recover
snapshot!", topic.getName(), e);
return null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
new file mode 100644
index 00000000000..6a3a6721198
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+
+public class SimpleCache<K, V> {
+
+ private final Map<K, ExpirableValue<V>> cache = new HashMap<>();
+ private final long timeoutMs;
+
+ @RequiredArgsConstructor
+ private class ExpirableValue<V> {
+
+ private final V value;
+ private final Consumer<V> expireCallback;
+ private long deadlineMs;
+
+ boolean tryExpire() {
+ if (System.currentTimeMillis() >= deadlineMs) {
+ expireCallback.accept(value);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void updateDeadline() {
+ deadlineMs = System.currentTimeMillis() + timeoutMs;
+ }
+ }
+
+ public SimpleCache(final ScheduledExecutorService scheduler, final long
timeoutMs, final long frequencyMs) {
+ this.timeoutMs = timeoutMs;
+ scheduler.scheduleAtFixedRate(() -> {
+ synchronized (SimpleCache.this) {
+ final var keys = new HashSet<K>();
+ cache.forEach((key, value) -> {
+ if (value.tryExpire()) {
+ keys.add(key);
+ }
+ });
+ cache.keySet().removeAll(keys);
+ }
+ }, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS);
+ }
+
+ public synchronized V get(final K key, final Supplier<V> valueSupplier,
final Consumer<V> expireCallback) {
+ final var value = cache.get(key);
+ if (value != null) {
+ value.updateDeadline();
+ return value.value;
+ }
+
+ final var newValue = new ExpirableValue<>(valueSupplier.get(),
expireCallback);
+ newValue.updateDeadline();
+ cache.put(key, newValue);
+ return newValue.value;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index e4240bce700..8ab9d58f570 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -66,6 +66,7 @@ import
org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import
org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
@@ -90,7 +91,6 @@ import
org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -582,6 +582,19 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
reader.close();
}
+ static class MockTableView extends TableView<TransactionBufferSnapshot> {
+
+ public MockTableView(PulsarService pulsar) {
+ super(topic ->
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .createReader(topic), 30000L, pulsar.getExecutor());
+ }
+
+ @Override
+ public SystemTopicClient.Reader<TransactionBufferSnapshot>
getReader(String topic) {
+ return readerCreator.apply(TopicName.get(topic)).join();
+ }
+ }
+
@Test(timeOut=30000)
public void testTransactionBufferRecoverThrowException() throws Exception {
String topic = NAMESPACE1 +
"/testTransactionBufferRecoverThrowPulsarClientException";
@@ -612,6 +625,7 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
doReturn(CompletableFuture.completedFuture(reader))
.when(systemTopicTxnBufferSnapshotService).createReader(any());
doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
+ doReturn(new
MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService).getTableView();
TransactionBufferSnapshotServiceFactory
transactionBufferSnapshotServiceFactory =
mock(TransactionBufferSnapshotServiceFactory.class);
doReturn(systemTopicTxnBufferSnapshotService)
@@ -663,7 +677,8 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
PersistentTopic originalTopic,
Field field,
Producer<byte[]> producer) throws Exception {
- field.set(getPulsarServiceList().get(0),
transactionBufferSnapshotServiceFactory);
+ final var pulsar = getPulsarServiceList().get(0);
+ field.set(pulsar, transactionBufferSnapshotServiceFactory);
// recover again will throw then close topic
new TopicTransactionBuffer(originalTopic);
@@ -674,7 +689,7 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
assertTrue((boolean) close.get(originalTopic));
});
- field.set(getPulsarServiceList().get(0),
transactionBufferSnapshotServiceFactoryOriginal);
+ field.set(pulsar, transactionBufferSnapshotServiceFactoryOriginal);
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
@@ -684,29 +699,11 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
txn.commit().get();
}
-
- @Test
- public void testTransactionBufferNoSnapshotCloseReader() throws Exception{
- String topic = NAMESPACE1 + "/test";
- @Cleanup
- Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
- .topic(topic).sendTimeout(0,
TimeUnit.SECONDS).enableBatching(false).create();
-
- admin.topics().unload(topic);
-
- // unload success, all readers have been closed except for the
compaction sub
- producer.send("test");
- TopicStats stats = admin.topics().getStats(NAMESPACE1 + "/" +
TRANSACTION_BUFFER_SNAPSHOT);
-
- // except for the compaction sub
- assertEquals(stats.getSubscriptions().size(), 1);
- assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
- }
-
@Test
public void testTransactionBufferIndexSystemTopic() throws Exception {
+ final var pulsar = pulsarServiceList.get(0);
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
transactionBufferSnapshotIndexService =
- new
TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService();
+ new
TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotIndexService();
SystemTopicClient.Writer<TransactionBufferSnapshotIndexes>
indexesWriter =
transactionBufferSnapshotIndexService.getReferenceWriter(
@@ -766,9 +763,10 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
BrokerService brokerService = pulsarService.getBrokerService();
// create snapshot segment writer
+ final var pulsar = pulsarServiceList.get(0);
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
transactionBufferSnapshotSegmentService =
- new
TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService();
+ new
TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotSegmentService();
SystemTopicClient.Writer<TransactionBufferSnapshotSegment>
segmentWriter = transactionBufferSnapshotSegmentService
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
new file mode 100644
index 00000000000..c590eda1718
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+public class SimpleCacheTest {
+
+ private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(1);
+
+ @AfterClass
+ public void shutdown() {
+ executor.shutdown();
+ }
+
+ @Test
+ public void testConcurrentUpdate() throws Exception {
+ final var cache = new SimpleCache<Integer, Integer>(executor, 10000L,
10000L);
+ final var pool = Executors.newFixedThreadPool(2);
+ final var latch = new CountDownLatch(2);
+ for (int i = 0; i < 2; i++) {
+ final var value = i + 100;
+ pool.execute(() -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ cache.get(0, () -> value, __ -> {});
+ latch.countDown();
+ });
+ }
+ latch.await();
+ final var value = cache.get(0, () -> -1, __ -> {});
+ Assert.assertTrue(value == 100 || value == 101);
+ pool.shutdown();
+ }
+
+ @Test
+ public void testExpire() throws InterruptedException {
+ final var cache = new SimpleCache<Integer, Integer>(executor, 500L, 5);
+ final var expiredValues = Collections.synchronizedSet(new
HashSet<Integer>());
+
+ final var allKeys = IntStream.range(0,
5).boxed().collect(Collectors.toSet());
+ allKeys.forEach(key -> cache.get(key, () -> key + 100,
expiredValues::add));
+
+ Thread.sleep(400L);
+ final var recentAccessedKey = Set.of(1, 2);
+ recentAccessedKey.forEach(key -> cache.get(key, () -> -1,
expiredValues::add)); // access these keys
+
+ Thread.sleep(300L);
+ recentAccessedKey.forEach(key -> Assert.assertEquals(key + 100,
cache.get(key, () -> -1, __ -> {})));
+ allKeys.stream().filter(key -> !recentAccessedKey.contains(key))
+ .forEach(key -> Assert.assertEquals(-1, cache.get(key, () ->
-1, __ -> {})));
+ }
+}