zymap commented on a change in pull request #4879: [Transaction][Buffer]make 
the transaction index to store in the ledger
URL: https://github.com/apache/pulsar/pull/4879#discussion_r310909167
 
 

 ##########
 File path: 
pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java
 ##########
 @@ -123,4 +204,377 @@ private void addTxnToCommittedIndex(TxnID txnID, long 
committedAtLedgerId) {
 
         return removeFuture;
     }
+
+    // Take a snapshot for all indexes. We can persist the transaction meta 
because the indexes can be rebuilt by it.
+    // a. Create a begin block and put the current transaction log position 
into it.
+    // b. Create the middle  block to store the transaction meta  and the 
snapshot start position.
+    // c. Create the end block to say the snapshot is ending and put the 
sanpshot start position to get the number of
+    //    snapshot blocks  when recovering.
+    public CompletableFuture<Void> takeSnapshot(Position txnBufferPosition) {
+        return startSnapshot(txnBufferPosition)
+            .thenCompose(position -> indexSnapshot(position, 
txnIndex.values()))
+            .thenCompose(position -> endSnapshot(position));
+    }
+
+    private CompletableFuture<Position> startSnapshot(Position position) {
+        return record(DataFormat.startStore(position));
+    }
+
+    private CompletableFuture<Position> indexSnapshot(Position 
startSnapshotPos,
+                                               Collection<TransactionMetaImpl> 
snapshotsMeta) {
+        List<CompletableFuture<Position>> snapshot =
+            snapshotsMeta.stream()
+                         .map(meta -> 
record(DataFormat.middleStore(startSnapshotPos, (TransactionMetaImpl) meta)))
+                         .collect(Collectors.toList());
+
+        return FutureUtil.waitForAll(snapshot).thenApply(ignore -> 
startSnapshotPos);
+    }
+
+    private CompletableFuture<Void> endSnapshot(Position startPos) {
+        return record(DataFormat.endStore(startPos)).thenApply(position -> 
null);
+    }
+
+    private CompletableFuture<Position> record(StoredTxn storedTxn) {
+        CompletableFuture<Position> recordFuture = new CompletableFuture<>();
+
+        indexCursor.get().asyncAddEntry(storedTxn.toByteArray(), new 
AsyncCallbacks.AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.info("Success to record the txn [{} - {}:{}] at [{}]", 
storedTxn.getStoredStatus(),
+                             
storedTxn.getTxnMeta().getTxnId().getMostSigBits(),
+                             
storedTxn.getTxnMeta().getTxnId().getLeastSigBits(), position);
+                }
+                recordFuture.complete(position);
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                if (log.isDebugEnabled()) {
+                    log.info("Failed to record the txn [{} : {}:{}]", 
storedTxn.getStoredStatus(),
+                             
storedTxn.getTxnMeta().getTxnId().getMostSigBits(),
+                             
storedTxn.getTxnMeta().getTxnId().getLeastSigBits());
+                }
+                recordFuture.completeExceptionally(exception);
+            }
+        }, null);
+
+        return recordFuture;
+    }
+
+    // Recover the index.
+    // i. Read the last entry of the transaction cursor ledger.
+    //      a. If the end entry is the beginning of the snapshot, move 
backward and recover the index by c.
+    //      b. If the end entry in the middle of the snapshot, get teh 
snapshot beginning position, recover the index
+    //         by a.
+    //      c. If the end entry is the ending of the snapshot, get the 
snapshot beginning position, recover it by the
+    //         middle  entries.
+    public CompletableFuture<Void> recover() {
+        CompletableFuture<Void> recoverFuture = new CompletableFuture<>();
+
+        LedgerHandle lh = indexCursor.get().getCurrentCursorLedger();
+        long ledgerId = lh.getId();
+        long entryId = lh.getLastAddConfirmed();
+        PositionImpl currentPosition = PositionImpl.get(ledgerId, entryId);
+
+        readSpecifiedPosEntry(currentPosition)
+            .thenApply(entry -> new 
PersistentTxnIndexSnapshot(entry.getData()))
+            .whenComplete((snapshot, throwable) -> {
+                if (throwable != null) {
+                    recoverFuture.completeExceptionally(throwable);
+                } else {
+                    if (snapshot.status == null) {
+                        recoverFuture.complete(null);
+                    } else {
+                        switch (snapshot.status) {
+                            case START:
+                                
recoverFromStart(currentPosition).whenComplete((ignore, error) -> {
+                                    checkComplete(error, recoverFuture);
+                                });
+                                break;
+                            case MIDDLE:
+                                
recoverFromMiddle(snapshot).whenComplete((ignore, error) -> {
+                                    checkComplete(error, recoverFuture);
+                                });
+                                break;
+                            case END:
+                                recoverFromEnd(snapshot).whenComplete((ignore, 
error) -> {
+                                    checkComplete(error, recoverFuture);
+                                });
+                        }
+                    }
+                }
+            });
+
+        return recoverFuture;
+    }
+
+    private void checkComplete(Throwable error, CompletableFuture<Void> 
future) {
+        if (error != null) {
+            future.completeExceptionally(error);
+        } else {
+            future.complete(null);
+        }
+    }
+
+    @Getter
+    final static class PersistentTxnIndexSnapshot {
+        enum SnapshotStatus  {
+            START,
+            MIDDLE,
+            END,
+        }
+
+        SnapshotStatus status;
+        // If the status is START, the position is the position which is the 
transaction log doing snapshot.
+        //  If the status is others, the position is the snapshot beginning 
position on the cursor ledger.
+        Position position;
+        TransactionMetaImpl meta;
+
+        PersistentTxnIndexSnapshot(byte[] entry) {
+            StoredTxn txn = DataFormat.parseStoredTxn(entry);
+            switch (txn.getStoredStatus()) {
+                case START:
+                    this.status = SnapshotStatus.START;
+                    break;
+                case MIDDLE:
+                    this.status = SnapshotStatus.MIDDLE;
+                    break;
+                case END:
+                    this.status = SnapshotStatus.END;
+                    break;
+            }
+            this.meta = DataFormat.recoverMeta(txn.getTxnMeta());
+            this.position = DataFormat.recoverPosition(txn.getPosition());
+        }
+
+    }
+
+    private CompletableFuture<Void> recoverFromStart(Position currentPosition) 
{
+        return readPrevEntry(currentPosition)
+                   .thenApply(entry -> new 
PersistentTxnIndexSnapshot(entry.getData()))
+                   .thenCompose(this::recoverFromEnd);
+
+    }
+
+    private CompletableFuture<Entry> readPrevEntry(Position position) {
+        PositionImpl currentPos = (PositionImpl) position;
+        if (currentPos.getEntryId() == 0) {
+            return FutureUtil.failedFuture(
+                new TransactionIndexRecoveringError("Not found the prev 
position of the current position " + position));
+        }
+
+        PositionImpl prevPosition = PositionImpl.get(currentPos.getLedgerId(), 
currentPos.getEntryId() - 1);
+        return readSpecifiedPosEntry(prevPosition);
+    }
+
+    private CompletableFuture<Void> 
recoverFromMiddle(PersistentTxnIndexSnapshot snapshot) {
+        return readSpecifiedPosEntry(snapshot.position)
+                   .thenApply(entry -> new 
PersistentTxnIndexSnapshot(entry.getData()))
+                   .thenCompose(startBlock -> 
recoverFromStart(snapshot.position));
+
+    }
+
+    private CompletableFuture<Entry> readSpecifiedPosEntry(Position position) {
+        CompletableFuture<Entry> readFuture = new CompletableFuture<>();
+
+        PositionImpl readPos = (PositionImpl) position;
+        LedgerHandle ledger = indexCursor.get().getCurrentCursorLedger();
+
+        ledger.asyncReadEntries(readPos.getEntryId(), readPos.getEntryId(), 
(rc, handle, entries, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                readFuture.completeExceptionally(BKException.create(rc));
+            } else {
+                if (entries.hasMoreElements()) {
+                    LedgerEntry ledgerEntry = entries.nextElement();
+                    EntryImpl entry = 
EntryImpl.create(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(),
+                                                       ledgerEntry.getEntry());
+
+                    readFuture.complete(entry);
+                } else {
+                    readFuture.completeExceptionally(new 
NoSuchElementException(
+                        "No such entry " + readPos.getEntryId() + " in ledger 
" + handle.getId()));
+                }
+            }
+        }, null);
+
+        return readFuture;
+    }
+
+    private CompletableFuture<Void> recoverFromEnd(PersistentTxnIndexSnapshot 
snapshot) {
+        return recoverFromLedger(snapshot);
+    }
+
+    private CompletableFuture<Void> 
recoverFromLedger(PersistentTxnIndexSnapshot snapshot) {
+        return readEntryFromCursorLedger(snapshot.position, 
indexCursor.get().getCurrentCursorLedger())
+            .thenApply(entries ->
+                           entries.stream()
+                                  .map(entry -> new 
PersistentTxnIndexSnapshot(entry.getData()))
+                                  .filter(tmpSnapshot ->
+                                              !tmpSnapshot.getStatus()
+                                                         
.equals(PersistentTxnIndexSnapshot.SnapshotStatus.END))
+                                  .collect(Collectors.toList()))
+            .thenCompose(snapshots -> rebuildIndex(snapshots))
+            .thenCompose(beginning -> replayTxnLogEntries(beginning.position));
+    }
+
+    private CompletableFuture<PersistentTxnIndexSnapshot> 
rebuildIndex(List<PersistentTxnIndexSnapshot> snapshots) {
+        List<CompletableFuture<Void>> rebuildFutre = new ArrayList<>();
+
+        snapshots.stream()
+                 .filter(snapshot -> 
snapshot.getStatus().equals(PersistentTxnIndexSnapshot.SnapshotStatus.MIDDLE))
+                 .map(snapshot -> snapshot.getMeta())
+                 .forEach(transactionMeta -> 
rebuildFutre.add(rebuildIndexByEntry(transactionMeta)));
+
+        return FutureUtil.waitForAll(rebuildFutre).thenCompose(ignore -> 
findStart(snapshots));
+    }
+
+    private CompletableFuture<PersistentTxnIndexSnapshot> 
findStart(List<PersistentTxnIndexSnapshot> snapshots) {
+        List<PersistentTxnIndexSnapshot> beginning = snapshots.stream()
+                 .filter(snapshot -> 
snapshot.getStatus().equals(PersistentTxnIndexSnapshot.SnapshotStatus.START))
+                 .collect(Collectors.toList());
+
+        if (beginning.size() != 1 || beginning.get(0) == null) {
+            return FutureUtil.failedFuture(new TransactionIndexRecoveringError(
+                "Found more than one START when recovering transaction index 
on cursor ledger: "
+                + indexCursor.get().getCurrentCursorLedger().getId()));
+        }
+
+        return CompletableFuture.completedFuture(beginning.get(0));
+    }
+
+    private CompletableFuture<Void> rebuildIndexByEntry(TransactionMetaImpl 
meta) {
+        // add to transaction index
+        txnIndex.putIfAbsent(meta.id(), meta);
+
+        // add to committed ledger transaction index
+        synchronized (committedLedgerTxnIndex) {
+            if (meta.isCommitted()) {
+                addTxnToCommittedIndex(meta.id(), meta.committedAtLedgerId());
+            }
+        }
+
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private CompletableFuture<List<Entry>> readEntryFromCursorLedger(Position 
startSnapshotPos,
+                                                                     
LedgerHandle cursorLedger) {
+        CompletableFuture<List<Entry>> readFuture = new CompletableFuture<>();
+        PositionImpl startPos = (PositionImpl) startSnapshotPos;
+        long startEntryId = startPos.getEntryId();
+        long endEntryId = cursorLedger.getLastAddConfirmed();
+
+        cursorLedger.asyncReadEntries(startEntryId, endEntryId, (rc, handle, 
entries, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                readFuture.completeExceptionally(BKException.create(rc));
+            } else {
+                if (entries.hasMoreElements()) {
+                    List<Entry> entryList = Collections.list(entries)
+                                                       .stream()
+                                                       .map(ledgerEntry -> 
EntryImpl.create(ledgerEntry.getLedgerId()
+                                                           , 
ledgerEntry.getEntryId(), ledgerEntry.getEntry()))
+                                                       
.collect(Collectors.toList());
+                    readFuture.complete(entryList);
+                } else {
+                    readFuture.completeExceptionally(new 
NoSuchElementException(
+                        "No more entry can read from ledger: " + 
handle.getId() + ", entry: " + startEntryId));
+                }
+            }
+        }, null);
+
+        return readFuture;
+    }
+
+    private CompletableFuture<List<Entry>> readEntryFromLedger(Position 
startSnapshotPos, ManagedLedger managedLedger) {
+        CompletableFuture<List<Entry>> readFuture = new CompletableFuture<>();
+
+        List<CompletableFuture<Void>> readAllEntryFuture = new ArrayList<>();
+        List<Entry> entryList = new ArrayList<>();
+        ManagedLedger cursorLedger = managedLedger;
+        ManagedCursor readCursor = null;
+        try {
+            readCursor = cursorLedger.newNonDurableCursor(startSnapshotPos);
+
+            while (readCursor.hasMoreEntries()) {
+                CompletableFuture<Void> readEntries = new 
CompletableFuture<>();
+                readCursor.asyncReadEntries(100, new 
AsyncCallbacks.ReadEntriesCallback() {
+                    @Override
+                    public void readEntriesComplete(List<Entry> entries, 
Object ctx) {
+                        synchronized (entryList) {
+                            entryList.addAll(entries);
+                        }
+                        readEntries.complete(null);
+                    }
+
+                    @Override
+                    public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
+                        readEntries.completeExceptionally(exception);
+                    }
+                }, null);
+                readAllEntryFuture.add(readEntries);
+            }
+
+            FutureUtil.waitForAll(readAllEntryFuture).whenComplete((ignore, 
error) -> {
+                if (error != null) {
+                    readFuture.completeExceptionally(error);
+                } else {
+                    readFuture.complete(entryList);
+                }
+            });
+
+        } catch (ManagedLedgerException e) {
 
 Review comment:
   the `newNonDurableCursor` throws the exception.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to