This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 944905363fb5c030616ade0f8ee3a3b899ea5dcd Author: Yijie Shen <[email protected]> AuthorDate: Mon Feb 10 15:55:18 2020 +0800 [Broker]Reset cursor with a non-exists position (#6120) `ManagedCursorImpl.asyncResetCursor` is used in three kinds of circumstances: - REST API: create a subscription with messageId. Per the document: Reset subscription to message position closest to given position. - REST API: reset subscription to a given position: Per the document: Reset subscription to message position closest to given position. - Consumer seek command. In all the cases above, when the user provides a MessageId, we should make the best effort to find the closest position, instead of throwing an InvalidCursorPosition Exception. This is because if a user provids an invalid position, it's not possible for he or she gets a valid position, since ledger ids for a given topic may not be continuous and only brokers are aware of the order. Therefore, we should avoid throw invalid cursor position but find the nearest position and do the reset stuff. --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 21 ++++--- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 71 ++++++++++++++++++++++ .../apache/pulsar/broker/admin/AdminApiTest2.java | 3 +- .../pulsar/broker/admin/v1/V1_AdminApiTest2.java | 3 +- 4 files changed, 87 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index d765538..158a2ce 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -948,14 +948,21 @@ public class ManagedCursorImpl implements ManagedCursor { // order trim and reset operations on a ledger ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> { - if (ledger.isValidPosition(newPosition) || newPosition.equals(PositionImpl.earliest) - || newPosition.equals(PositionImpl.latest)) { - internalResetCursor(newPosition, callback); - } else { - // caller (replay) should handle this error and retry cursor reset - callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(newPosition.toString()), - newPosition); + PositionImpl actualPosition = newPosition; + + if (!ledger.isValidPosition(actualPosition) && + !actualPosition.equals(PositionImpl.earliest) && + !actualPosition.equals(PositionImpl.latest)) { + actualPosition = ledger.getNextValidPosition(actualPosition); + + if (actualPosition == null) { + // next valid position would only return null when newPos + // is larger than all available positions, then it's latest in effect. + actualPosition = PositionImpl.latest; + } } + + internalResetCursor(actualPosition, callback); })); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index b0db926..ddb8da4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -553,6 +553,77 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { } @Test(timeOut = 20000) + void testResetCursor1() throws Exception { + ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", + new ManagedLedgerConfig().setMaxEntriesPerLedger(2)); + ManagedCursor cursor = ledger.openCursor("trc1"); + PositionImpl actualEarliest = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-2".getBytes(Encoding)); + ledger.addEntry("dummy-entry-3".getBytes(Encoding)); + PositionImpl lastInPrev = (PositionImpl) ledger.addEntry("dummy-entry-4".getBytes(Encoding)); + PositionImpl firstInNext = (PositionImpl) ledger.addEntry("dummy-entry-5".getBytes(Encoding)); + ledger.addEntry("dummy-entry-6".getBytes(Encoding)); + ledger.addEntry("dummy-entry-7".getBytes(Encoding)); + ledger.addEntry("dummy-entry-8".getBytes(Encoding)); + ledger.addEntry("dummy-entry-9".getBytes(Encoding)); + PositionImpl last = (PositionImpl) ledger.addEntry("dummy-entry-10".getBytes(Encoding)); + + final AtomicBoolean moveStatus = new AtomicBoolean(false); + + // reset to earliest + PositionImpl earliest = PositionImpl.earliest; + try { + cursor.resetCursor(earliest); + moveStatus.set(true); + } catch (Exception e) { + log.warn("error in reset cursor", e.getCause()); + } + assertTrue(moveStatus.get()); + PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1); + assertEquals(earliestPos, cursor.getReadPosition()); + moveStatus.set(false); + + // reset to one after last entry in a ledger should point to the first entry in the next ledger + PositionImpl resetPosition = new PositionImpl(lastInPrev.getLedgerId(), lastInPrev.getEntryId() + 1); + try { + cursor.resetCursor(resetPosition); + moveStatus.set(true); + } catch (Exception e) { + log.warn("error in reset cursor", e.getCause()); + } + assertTrue(moveStatus.get()); + assertEquals(firstInNext, cursor.getReadPosition()); + moveStatus.set(false); + + // reset to a non exist larger ledger should point to the first non-exist entry in the last ledger + PositionImpl latest = new PositionImpl(last.getLedgerId() + 2, 0); + try { + cursor.resetCursor(latest); + moveStatus.set(true); + } catch (Exception e) { + log.warn("error in reset cursor", e.getCause()); + } + assertTrue(moveStatus.get()); + PositionImpl lastPos = new PositionImpl(last.getLedgerId(), last.getEntryId() + 1); + assertEquals(lastPos, cursor.getReadPosition()); + moveStatus.set(false); + + // reset to latest should point to the first non-exist entry in the last ledger + PositionImpl anotherLast = PositionImpl.latest; + try { + cursor.resetCursor(anotherLast); + moveStatus.set(true); + } catch (Exception e) { + log.warn("error in reset cursor", e.getCause()); + } + assertTrue(moveStatus.get()); + assertEquals(lastPos, cursor.getReadPosition()); + + cursor.close(); + ledger.close(); + } + + @Test(timeOut = 20000) void testasyncResetCursor() throws Exception { ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 8f90c35..4a54e2e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -509,9 +509,8 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { try { messageId = new MessageIdImpl(0, 0, -1); admin.topics().resetCursor(topicName, "my-sub", messageId); - fail("It should have failed due to invalid subscription name"); } catch (PulsarAdminException.PreconditionFailedException e) { - // Ok + fail("It shouldn't fail for a invalid position"); } consumer.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java index 75ef9e8..eaf7a2d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java @@ -501,9 +501,8 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { try { messageId = new MessageIdImpl(0, 0, -1); admin.topics().resetCursor(topicName, "my-sub", messageId); - fail("It should have failed due to invalid subscription name"); } catch (PulsarAdminException.PreconditionFailedException e) { - // Ok + fail("It shouldn't fail for a invalid position"); } consumer.close();
