zymap commented on a change in pull request #4879: [Transaction][Buffer]make 
the transaction index to store in the ledger
URL: https://github.com/apache/pulsar/pull/4879#discussion_r311012684
 
 

 ##########
 File path: 
pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java
 ##########
 @@ -123,4 +204,377 @@ private void addTxnToCommittedIndex(TxnID txnID, long 
committedAtLedgerId) {
 
         return removeFuture;
     }
+
+    // Take a snapshot for all indexes. We can persist the transaction meta 
because the indexes can be rebuilt by it.
+    // a. Create a begin block and put the current transaction log position 
into it.
+    // b. Create the middle  block to store the transaction meta  and the 
snapshot start position.
+    // c. Create the end block to say the snapshot is ending and put the 
sanpshot start position to get the number of
+    //    snapshot blocks  when recovering.
+    public CompletableFuture<Void> takeSnapshot(Position txnBufferPosition) {
+        return startSnapshot(txnBufferPosition)
+            .thenCompose(position -> indexSnapshot(position, 
txnIndex.values()))
+            .thenCompose(position -> endSnapshot(position));
+    }
+
+    private CompletableFuture<Position> startSnapshot(Position position) {
+        return record(DataFormat.startStore(position));
+    }
+
+    private CompletableFuture<Position> indexSnapshot(Position 
startSnapshotPos,
+                                               Collection<TransactionMetaImpl> 
snapshotsMeta) {
+        List<CompletableFuture<Position>> snapshot =
+            snapshotsMeta.stream()
+                         .map(meta -> 
record(DataFormat.middleStore(startSnapshotPos, (TransactionMetaImpl) meta)))
+                         .collect(Collectors.toList());
+
+        return FutureUtil.waitForAll(snapshot).thenApply(ignore -> 
startSnapshotPos);
+    }
+
+    private CompletableFuture<Void> endSnapshot(Position startPos) {
+        return record(DataFormat.endStore(startPos)).thenApply(position -> 
null);
+    }
+
+    private CompletableFuture<Position> record(StoredTxn storedTxn) {
+        CompletableFuture<Position> recordFuture = new CompletableFuture<>();
+
+        indexCursor.get().asyncAddEntry(storedTxn.toByteArray(), new 
AsyncCallbacks.AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.info("Success to record the txn [{} - {}:{}] at [{}]", 
storedTxn.getStoredStatus(),
+                             
storedTxn.getTxnMeta().getTxnId().getMostSigBits(),
+                             
storedTxn.getTxnMeta().getTxnId().getLeastSigBits(), position);
+                }
+                recordFuture.complete(position);
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                if (log.isDebugEnabled()) {
+                    log.info("Failed to record the txn [{} : {}:{}]", 
storedTxn.getStoredStatus(),
+                             
storedTxn.getTxnMeta().getTxnId().getMostSigBits(),
+                             
storedTxn.getTxnMeta().getTxnId().getLeastSigBits());
+                }
+                recordFuture.completeExceptionally(exception);
+            }
+        }, null);
+
+        return recordFuture;
+    }
+
+    // Recover the index.
+    // i. Read the last entry of the transaction cursor ledger.
+    //      a. If the end entry is the beginning of the snapshot, move 
backward and recover the index by c.
+    //      b. If the end entry in the middle of the snapshot, get teh 
snapshot beginning position, recover the index
+    //         by a.
+    //      c. If the end entry is the ending of the snapshot, get the 
snapshot beginning position, recover it by the
+    //         middle  entries.
+    public CompletableFuture<Void> recover() {
+        CompletableFuture<Void> recoverFuture = new CompletableFuture<>();
+
+        LedgerHandle lh = indexCursor.get().getCurrentCursorLedger();
+        long ledgerId = lh.getId();
+        long entryId = lh.getLastAddConfirmed();
+        PositionImpl currentPosition = PositionImpl.get(ledgerId, entryId);
+
+        readSpecifiedPosEntry(currentPosition)
+            .thenApply(entry -> new 
PersistentTxnIndexSnapshot(entry.getData()))
+            .whenComplete((snapshot, throwable) -> {
+                if (throwable != null) {
+                    recoverFuture.completeExceptionally(throwable);
+                } else {
+                    if (snapshot.status == null) {
+                        recoverFuture.complete(null);
+                    } else {
+                        switch (snapshot.status) {
+                            case START:
+                                
recoverFromStart(currentPosition).whenComplete((ignore, error) -> {
+                                    checkComplete(error, recoverFuture);
+                                });
+                                break;
+                            case MIDDLE:
+                                
recoverFromMiddle(snapshot).whenComplete((ignore, error) -> {
+                                    checkComplete(error, recoverFuture);
+                                });
+                                break;
+                            case END:
+                                recoverFromEnd(snapshot).whenComplete((ignore, 
error) -> {
+                                    checkComplete(error, recoverFuture);
+                                });
+                        }
+                    }
+                }
+            });
+
+        return recoverFuture;
+    }
+
+    private void checkComplete(Throwable error, CompletableFuture<Void> 
future) {
+        if (error != null) {
+            future.completeExceptionally(error);
+        } else {
+            future.complete(null);
+        }
+    }
+
+    @Getter
+    final static class PersistentTxnIndexSnapshot {
+        enum SnapshotStatus  {
+            START,
+            MIDDLE,
+            END,
+        }
+
+        SnapshotStatus status;
+        // If the status is START, the position is the position which is the 
transaction log doing snapshot.
+        //  If the status is others, the position is the snapshot beginning 
position on the cursor ledger.
+        Position position;
+        TransactionMetaImpl meta;
+
+        PersistentTxnIndexSnapshot(byte[] entry) {
+            StoredTxn txn = DataFormat.parseStoredTxn(entry);
+            switch (txn.getStoredStatus()) {
+                case START:
+                    this.status = SnapshotStatus.START;
+                    break;
+                case MIDDLE:
+                    this.status = SnapshotStatus.MIDDLE;
+                    break;
+                case END:
+                    this.status = SnapshotStatus.END;
+                    break;
+            }
+            this.meta = DataFormat.recoverMeta(txn.getTxnMeta());
+            this.position = DataFormat.recoverPosition(txn.getPosition());
+        }
+
+    }
+
+    private CompletableFuture<Void> recoverFromStart(Position currentPosition) 
{
+        return readPrevEntry(currentPosition)
+                   .thenApply(entry -> new 
PersistentTxnIndexSnapshot(entry.getData()))
+                   .thenCompose(this::recoverFromEnd);
+
+    }
+
+    private CompletableFuture<Entry> readPrevEntry(Position position) {
+        PositionImpl currentPos = (PositionImpl) position;
+        if (currentPos.getEntryId() == 0) {
+            return FutureUtil.failedFuture(
+                new TransactionIndexRecoveringError("Not found the prev 
position of the current position " + position));
 
 Review comment:
   > when does this happen?
   
   When reading entries on the cursor ledger, if the entry id is 0, it 
indicates there is no more entry before the read position.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to