merlimat commented on code in PR #16389:
URL: https://github.com/apache/pulsar/pull/16389#discussion_r914262023
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -647,18 +647,7 @@ void initialize(PositionImpl position, Map<String, Long>
properties, Map<String,
ledger.getName(), name, messagesConsumedCounter,
markDeletePosition, readPosition);
}
- createNewMetadataLedger(new VoidCallback() {
- @Override
- public void operationComplete() {
- STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
- callback.operationComplete();
- }
-
- @Override
- public void operationFailed(ManagedLedgerException exception) {
- callback.operationFailed(exception);
- }
- });
+ callback.operationComplete();
Review Comment:
@codelipenghui There is a problem here.
The cursor z-node is not yet created at this point, so we are leaving the
cursor as "ephemeral". It also means that the updates on the cursor z-node are
then also failing because of the z-node version mismatch with the expectation.
In the current code, the cursor z-node is only created after:
1. Cursor ledger is created
2. We have written current mark-delete position to that ledger
3. Now we are going to create the cursor z-node (referencing the ledger id)
Instead of calling `operationComplete()` here we need to call
`persistPositionMetaStore()` which will create the z-node and include the
mark-delete position in the protobuf.
eg: sample test:
```java
@Test
public void testLazyCursorLedgerCreation() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
Position p1 = ledger.addEntry("test".getBytes());
ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor("test");
assertEquals(cursor.getMarkDeletedPosition(), p1);
ManagedLedgerFactory factory2 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = (ManagedLedgerImpl)
factory2.open("testLazyCursorLedgerCreation", managedLedgerConfig);
assertNotNull(ledger.getCursors().get("test"));
ManagedCursorImpl cursor1 = (ManagedCursorImpl)
ledger.openCursor("test");
assertEquals(cursor1.getMarkDeletedPosition(), p1);
factory2.shutdown();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]