This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 944905363fb5c030616ade0f8ee3a3b899ea5dcd
Author: Yijie Shen <[email protected]>
AuthorDate: Mon Feb 10 15:55:18 2020 +0800

    [Broker]Reset cursor with a non-exists position (#6120)
    
    `ManagedCursorImpl.asyncResetCursor` is used in three kinds of 
circumstances:
    - REST API: create a subscription with messageId. Per the document: Reset 
subscription to message position closest to given position.
    - REST API: reset subscription to a given position: Per the document: Reset 
subscription to message position closest to given position.
    - Consumer seek command.
    
    In all the cases above, when the user provides a MessageId, we should make 
the best effort to find the closest position, instead of throwing an 
InvalidCursorPosition Exception.
    
    This is because if a user provids an invalid position, it's not possible 
for he or she gets a valid position, since ledger ids for a given topic may not 
be continuous and only brokers are aware of the order. Therefore, we should 
avoid throw invalid cursor position but find the nearest position and do the 
reset stuff.
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 21 ++++---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 71 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  3 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   |  3 +-
 4 files changed, 87 insertions(+), 11 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d765538..158a2ce 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -948,14 +948,21 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         // order trim and reset operations on a ledger
         ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
-            if (ledger.isValidPosition(newPosition) || 
newPosition.equals(PositionImpl.earliest)
-                    || newPosition.equals(PositionImpl.latest)) {
-                internalResetCursor(newPosition, callback);
-            } else {
-                // caller (replay) should handle this error and retry cursor 
reset
-                callback.resetFailed(new 
ManagedLedgerException.InvalidCursorPositionException(newPosition.toString()),
-                        newPosition);
+            PositionImpl actualPosition = newPosition;
+
+            if (!ledger.isValidPosition(actualPosition) &&
+                !actualPosition.equals(PositionImpl.earliest) &&
+                !actualPosition.equals(PositionImpl.latest)) {
+                actualPosition = ledger.getNextValidPosition(actualPosition);
+
+                if (actualPosition == null) {
+                    // next valid position would only return null when newPos
+                    // is larger than all available positions, then it's 
latest in effect.
+                    actualPosition = PositionImpl.latest;
+                }
             }
+
+            internalResetCursor(actualPosition, callback);
         }));
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index b0db926..ddb8da4 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -553,6 +553,77 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void testResetCursor1() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
+            new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+        ManagedCursor cursor = ledger.openCursor("trc1");
+        PositionImpl actualEarliest = (PositionImpl) 
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+        PositionImpl lastInPrev = (PositionImpl) 
ledger.addEntry("dummy-entry-4".getBytes(Encoding));
+        PositionImpl firstInNext = (PositionImpl) 
ledger.addEntry("dummy-entry-5".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-6".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-7".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-8".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-9".getBytes(Encoding));
+        PositionImpl last = (PositionImpl) 
ledger.addEntry("dummy-entry-10".getBytes(Encoding));
+
+        final AtomicBoolean moveStatus = new AtomicBoolean(false);
+
+        // reset to earliest
+        PositionImpl earliest = PositionImpl.earliest;
+        try {
+            cursor.resetCursor(earliest);
+            moveStatus.set(true);
+        } catch (Exception e) {
+            log.warn("error in reset cursor", e.getCause());
+        }
+        assertTrue(moveStatus.get());
+        PositionImpl earliestPos = new 
PositionImpl(actualEarliest.getLedgerId(), -1);
+        assertEquals(earliestPos, cursor.getReadPosition());
+        moveStatus.set(false);
+
+        // reset to one after last entry in a ledger should point to the first 
entry in the next ledger
+        PositionImpl resetPosition = new 
PositionImpl(lastInPrev.getLedgerId(), lastInPrev.getEntryId() + 1);
+        try {
+            cursor.resetCursor(resetPosition);
+            moveStatus.set(true);
+        } catch (Exception e) {
+            log.warn("error in reset cursor", e.getCause());
+        }
+        assertTrue(moveStatus.get());
+        assertEquals(firstInNext, cursor.getReadPosition());
+        moveStatus.set(false);
+
+        // reset to a non exist larger ledger should point to the first 
non-exist entry in the last ledger
+        PositionImpl latest = new PositionImpl(last.getLedgerId() + 2, 0);
+        try {
+            cursor.resetCursor(latest);
+            moveStatus.set(true);
+        } catch (Exception e) {
+            log.warn("error in reset cursor", e.getCause());
+        }
+        assertTrue(moveStatus.get());
+        PositionImpl lastPos = new PositionImpl(last.getLedgerId(), 
last.getEntryId() + 1);
+        assertEquals(lastPos, cursor.getReadPosition());
+        moveStatus.set(false);
+
+        // reset to latest should point to the first non-exist entry in the 
last ledger
+        PositionImpl anotherLast = PositionImpl.latest;
+        try {
+            cursor.resetCursor(anotherLast);
+            moveStatus.set(true);
+        } catch (Exception e) {
+            log.warn("error in reset cursor", e.getCause());
+        }
+        assertTrue(moveStatus.get());
+        assertEquals(lastPos, cursor.getReadPosition());
+
+        cursor.close();
+        ledger.close();
+    }
+
+    @Test(timeOut = 20000)
     void testasyncResetCursor() throws Exception {
         ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
                 new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 8f90c35..4a54e2e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -509,9 +509,8 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         try {
             messageId = new MessageIdImpl(0, 0, -1);
             admin.topics().resetCursor(topicName, "my-sub", messageId);
-            fail("It should have failed due to invalid subscription name");
         } catch (PulsarAdminException.PreconditionFailedException e) {
-            // Ok
+            fail("It shouldn't fail for a invalid position");
         }
 
         consumer.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index 75ef9e8..eaf7a2d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -501,9 +501,8 @@ public class V1_AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         try {
             messageId = new MessageIdImpl(0, 0, -1);
             admin.topics().resetCursor(topicName, "my-sub", messageId);
-            fail("It should have failed due to invalid subscription name");
         } catch (PulsarAdminException.PreconditionFailedException e) {
-            // Ok
+            fail("It shouldn't fail for a invalid position");
         }
 
         consumer.close();

Reply via email to