zymap commented on a change in pull request #4879: [Transaction][Buffer]make
the transaction index to store in the ledger
URL: https://github.com/apache/pulsar/pull/4879#discussion_r310909167
##########
File path:
pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java
##########
@@ -123,4 +204,377 @@ private void addTxnToCommittedIndex(TxnID txnID, long
committedAtLedgerId) {
return removeFuture;
}
+
+ // Take a snapshot for all indexes. We can persist the transaction meta
because the indexes can be rebuilt by it.
+ // a. Create a begin block and put the current transaction log position
into it.
+ // b. Create the middle block to store the transaction meta and the
snapshot start position.
+ // c. Create the end block to say the snapshot is ending and put the
sanpshot start position to get the number of
+ // snapshot blocks when recovering.
+ public CompletableFuture<Void> takeSnapshot(Position txnBufferPosition) {
+ return startSnapshot(txnBufferPosition)
+ .thenCompose(position -> indexSnapshot(position,
txnIndex.values()))
+ .thenCompose(position -> endSnapshot(position));
+ }
+
+ private CompletableFuture<Position> startSnapshot(Position position) {
+ return record(DataFormat.startStore(position));
+ }
+
+ private CompletableFuture<Position> indexSnapshot(Position
startSnapshotPos,
+ Collection<TransactionMetaImpl>
snapshotsMeta) {
+ List<CompletableFuture<Position>> snapshot =
+ snapshotsMeta.stream()
+ .map(meta ->
record(DataFormat.middleStore(startSnapshotPos, (TransactionMetaImpl) meta)))
+ .collect(Collectors.toList());
+
+ return FutureUtil.waitForAll(snapshot).thenApply(ignore ->
startSnapshotPos);
+ }
+
+ private CompletableFuture<Void> endSnapshot(Position startPos) {
+ return record(DataFormat.endStore(startPos)).thenApply(position ->
null);
+ }
+
+ private CompletableFuture<Position> record(StoredTxn storedTxn) {
+ CompletableFuture<Position> recordFuture = new CompletableFuture<>();
+
+ indexCursor.get().asyncAddEntry(storedTxn.toByteArray(), new
AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ if (log.isDebugEnabled()) {
+ log.info("Success to record the txn [{} - {}:{}] at [{}]",
storedTxn.getStoredStatus(),
+
storedTxn.getTxnMeta().getTxnId().getMostSigBits(),
+
storedTxn.getTxnMeta().getTxnId().getLeastSigBits(), position);
+ }
+ recordFuture.complete(position);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ if (log.isDebugEnabled()) {
+ log.info("Failed to record the txn [{} : {}:{}]",
storedTxn.getStoredStatus(),
+
storedTxn.getTxnMeta().getTxnId().getMostSigBits(),
+
storedTxn.getTxnMeta().getTxnId().getLeastSigBits());
+ }
+ recordFuture.completeExceptionally(exception);
+ }
+ }, null);
+
+ return recordFuture;
+ }
+
+ // Recover the index.
+ // i. Read the last entry of the transaction cursor ledger.
+ // a. If the end entry is the beginning of the snapshot, move
backward and recover the index by c.
+ // b. If the end entry in the middle of the snapshot, get teh
snapshot beginning position, recover the index
+ // by a.
+ // c. If the end entry is the ending of the snapshot, get the
snapshot beginning position, recover it by the
+ // middle entries.
+ public CompletableFuture<Void> recover() {
+ CompletableFuture<Void> recoverFuture = new CompletableFuture<>();
+
+ LedgerHandle lh = indexCursor.get().getCurrentCursorLedger();
+ long ledgerId = lh.getId();
+ long entryId = lh.getLastAddConfirmed();
+ PositionImpl currentPosition = PositionImpl.get(ledgerId, entryId);
+
+ readSpecifiedPosEntry(currentPosition)
+ .thenApply(entry -> new
PersistentTxnIndexSnapshot(entry.getData()))
+ .whenComplete((snapshot, throwable) -> {
+ if (throwable != null) {
+ recoverFuture.completeExceptionally(throwable);
+ } else {
+ if (snapshot.status == null) {
+ recoverFuture.complete(null);
+ } else {
+ switch (snapshot.status) {
+ case START:
+
recoverFromStart(currentPosition).whenComplete((ignore, error) -> {
+ checkComplete(error, recoverFuture);
+ });
+ break;
+ case MIDDLE:
+
recoverFromMiddle(snapshot).whenComplete((ignore, error) -> {
+ checkComplete(error, recoverFuture);
+ });
+ break;
+ case END:
+ recoverFromEnd(snapshot).whenComplete((ignore,
error) -> {
+ checkComplete(error, recoverFuture);
+ });
+ }
+ }
+ }
+ });
+
+ return recoverFuture;
+ }
+
+ private void checkComplete(Throwable error, CompletableFuture<Void>
future) {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else {
+ future.complete(null);
+ }
+ }
+
+ @Getter
+ final static class PersistentTxnIndexSnapshot {
+ enum SnapshotStatus {
+ START,
+ MIDDLE,
+ END,
+ }
+
+ SnapshotStatus status;
+ // If the status is START, the position is the position which is the
transaction log doing snapshot.
+ // If the status is others, the position is the snapshot beginning
position on the cursor ledger.
+ Position position;
+ TransactionMetaImpl meta;
+
+ PersistentTxnIndexSnapshot(byte[] entry) {
+ StoredTxn txn = DataFormat.parseStoredTxn(entry);
+ switch (txn.getStoredStatus()) {
+ case START:
+ this.status = SnapshotStatus.START;
+ break;
+ case MIDDLE:
+ this.status = SnapshotStatus.MIDDLE;
+ break;
+ case END:
+ this.status = SnapshotStatus.END;
+ break;
+ }
+ this.meta = DataFormat.recoverMeta(txn.getTxnMeta());
+ this.position = DataFormat.recoverPosition(txn.getPosition());
+ }
+
+ }
+
+ private CompletableFuture<Void> recoverFromStart(Position currentPosition)
{
+ return readPrevEntry(currentPosition)
+ .thenApply(entry -> new
PersistentTxnIndexSnapshot(entry.getData()))
+ .thenCompose(this::recoverFromEnd);
+
+ }
+
+ private CompletableFuture<Entry> readPrevEntry(Position position) {
+ PositionImpl currentPos = (PositionImpl) position;
+ if (currentPos.getEntryId() == 0) {
+ return FutureUtil.failedFuture(
+ new TransactionIndexRecoveringError("Not found the prev
position of the current position " + position));
+ }
+
+ PositionImpl prevPosition = PositionImpl.get(currentPos.getLedgerId(),
currentPos.getEntryId() - 1);
+ return readSpecifiedPosEntry(prevPosition);
+ }
+
+ private CompletableFuture<Void>
recoverFromMiddle(PersistentTxnIndexSnapshot snapshot) {
+ return readSpecifiedPosEntry(snapshot.position)
+ .thenApply(entry -> new
PersistentTxnIndexSnapshot(entry.getData()))
+ .thenCompose(startBlock ->
recoverFromStart(snapshot.position));
+
+ }
+
+ private CompletableFuture<Entry> readSpecifiedPosEntry(Position position) {
+ CompletableFuture<Entry> readFuture = new CompletableFuture<>();
+
+ PositionImpl readPos = (PositionImpl) position;
+ LedgerHandle ledger = indexCursor.get().getCurrentCursorLedger();
+
+ ledger.asyncReadEntries(readPos.getEntryId(), readPos.getEntryId(),
(rc, handle, entries, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ readFuture.completeExceptionally(BKException.create(rc));
+ } else {
+ if (entries.hasMoreElements()) {
+ LedgerEntry ledgerEntry = entries.nextElement();
+ EntryImpl entry =
EntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
+ ledgerEntry.getEntry());
+
+ readFuture.complete(entry);
+ } else {
+ readFuture.completeExceptionally(new
NoSuchElementException(
+ "No such entry " + readPos.getEntryId() + " in ledger
" + handle.getId()));
+ }
+ }
+ }, null);
+
+ return readFuture;
+ }
+
+ private CompletableFuture<Void> recoverFromEnd(PersistentTxnIndexSnapshot
snapshot) {
+ return recoverFromLedger(snapshot);
+ }
+
+ private CompletableFuture<Void>
recoverFromLedger(PersistentTxnIndexSnapshot snapshot) {
+ return readEntryFromCursorLedger(snapshot.position,
indexCursor.get().getCurrentCursorLedger())
+ .thenApply(entries ->
+ entries.stream()
+ .map(entry -> new
PersistentTxnIndexSnapshot(entry.getData()))
+ .filter(tmpSnapshot ->
+ !tmpSnapshot.getStatus()
+
.equals(PersistentTxnIndexSnapshot.SnapshotStatus.END))
+ .collect(Collectors.toList()))
+ .thenCompose(snapshots -> rebuildIndex(snapshots))
+ .thenCompose(beginning -> replayTxnLogEntries(beginning.position));
+ }
+
+ private CompletableFuture<PersistentTxnIndexSnapshot>
rebuildIndex(List<PersistentTxnIndexSnapshot> snapshots) {
+ List<CompletableFuture<Void>> rebuildFutre = new ArrayList<>();
+
+ snapshots.stream()
+ .filter(snapshot ->
snapshot.getStatus().equals(PersistentTxnIndexSnapshot.SnapshotStatus.MIDDLE))
+ .map(snapshot -> snapshot.getMeta())
+ .forEach(transactionMeta ->
rebuildFutre.add(rebuildIndexByEntry(transactionMeta)));
+
+ return FutureUtil.waitForAll(rebuildFutre).thenCompose(ignore ->
findStart(snapshots));
+ }
+
+ private CompletableFuture<PersistentTxnIndexSnapshot>
findStart(List<PersistentTxnIndexSnapshot> snapshots) {
+ List<PersistentTxnIndexSnapshot> beginning = snapshots.stream()
+ .filter(snapshot ->
snapshot.getStatus().equals(PersistentTxnIndexSnapshot.SnapshotStatus.START))
+ .collect(Collectors.toList());
+
+ if (beginning.size() != 1 || beginning.get(0) == null) {
+ return FutureUtil.failedFuture(new TransactionIndexRecoveringError(
+ "Found more than one START when recovering transaction index
on cursor ledger: "
+ + indexCursor.get().getCurrentCursorLedger().getId()));
+ }
+
+ return CompletableFuture.completedFuture(beginning.get(0));
+ }
+
+ private CompletableFuture<Void> rebuildIndexByEntry(TransactionMetaImpl
meta) {
+ // add to transaction index
+ txnIndex.putIfAbsent(meta.id(), meta);
+
+ // add to committed ledger transaction index
+ synchronized (committedLedgerTxnIndex) {
+ if (meta.isCommitted()) {
+ addTxnToCommittedIndex(meta.id(), meta.committedAtLedgerId());
+ }
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private CompletableFuture<List<Entry>> readEntryFromCursorLedger(Position
startSnapshotPos,
+
LedgerHandle cursorLedger) {
+ CompletableFuture<List<Entry>> readFuture = new CompletableFuture<>();
+ PositionImpl startPos = (PositionImpl) startSnapshotPos;
+ long startEntryId = startPos.getEntryId();
+ long endEntryId = cursorLedger.getLastAddConfirmed();
+
+ cursorLedger.asyncReadEntries(startEntryId, endEntryId, (rc, handle,
entries, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ readFuture.completeExceptionally(BKException.create(rc));
+ } else {
+ if (entries.hasMoreElements()) {
+ List<Entry> entryList = Collections.list(entries)
+ .stream()
+ .map(ledgerEntry ->
EntryImpl.create(ledgerEntry.getLedgerId()
+ ,
ledgerEntry.getEntryId(), ledgerEntry.getEntry()))
+
.collect(Collectors.toList());
+ readFuture.complete(entryList);
+ } else {
+ readFuture.completeExceptionally(new
NoSuchElementException(
+ "No more entry can read from ledger: " +
handle.getId() + ", entry: " + startEntryId));
+ }
+ }
+ }, null);
+
+ return readFuture;
+ }
+
+ private CompletableFuture<List<Entry>> readEntryFromLedger(Position
startSnapshotPos, ManagedLedger managedLedger) {
+ CompletableFuture<List<Entry>> readFuture = new CompletableFuture<>();
+
+ List<CompletableFuture<Void>> readAllEntryFuture = new ArrayList<>();
+ List<Entry> entryList = new ArrayList<>();
+ ManagedLedger cursorLedger = managedLedger;
+ ManagedCursor readCursor = null;
+ try {
+ readCursor = cursorLedger.newNonDurableCursor(startSnapshotPos);
+
+ while (readCursor.hasMoreEntries()) {
+ CompletableFuture<Void> readEntries = new
CompletableFuture<>();
+ readCursor.asyncReadEntries(100, new
AsyncCallbacks.ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries,
Object ctx) {
+ synchronized (entryList) {
+ entryList.addAll(entries);
+ }
+ readEntries.complete(null);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
+ readEntries.completeExceptionally(exception);
+ }
+ }, null);
+ readAllEntryFuture.add(readEntries);
+ }
+
+ FutureUtil.waitForAll(readAllEntryFuture).whenComplete((ignore,
error) -> {
+ if (error != null) {
+ readFuture.completeExceptionally(error);
+ } else {
+ readFuture.complete(entryList);
+ }
+ });
+
+ } catch (ManagedLedgerException e) {
Review comment:
the `newNonDurableCursor` throws the exception.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services