This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b659cdce4871b1553b134e2e7e668e9dc1513318
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Mar 28 16:43:29 2022 +0800

    [fix][broker] Fix wrong state for non-durable cursor (#14869)
    
    ### Motivation
    
    The current non-durable cursor does not have the correct state.
    For example, when the reader is created, I always see the cursor status as 
``Uninitialized`` via the ``getInternalStats`` method.
    
    ```json
    {
      "reader-xxxxx" : {
          "markDeletePosition" : "19785:18718",
          "readPosition" : "19807:42735",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "messagesConsumedCounter" : -2257,
          "cursorLedger" : -1,
          "cursorLedgerLastEntry" : -1,
          "individuallyDeletedMessages" : "[]",
          "lastLedgerSwitchTimestamp" : "2022-03-24T20:03:51.85Z",
          "state" : "Uninitialized",
          "numberOfEntriesSinceFirstNotAckedMessage" : 744993,
          "totalNonContiguousDeletedMessagesRange" : 0,
          "subscriptionHavePendingRead" : true,
          "subscriptionHavePendingReplayRead" : false,
          "properties" : { }
        }
    }
    ```
    
    ### Modifications
    
    - Correct the cursor state.
    
    (cherry picked from commit b477557a6f6d36ae18f09d4edfa076e9092a17fe)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java  |  2 +-
 .../mledger/impl/NonDurableCursorImpl.java          |  4 ++--
 .../org/apache/pulsar/client/impl/ReaderTest.java   | 21 ++++++++++++++++++++-
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8c96f0e..f882444 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -236,7 +236,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         Closed // The managed cursor has been closed
     }
 
-    private static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> 
STATE_UPDATER =
+    protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, 
State> STATE_UPDATER =
         AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, 
State.class, "state");
     protected volatile State state = null;
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 1d545bd..4625f5b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -61,7 +61,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
             // read-position
             recoverCursor(startCursorPosition);
         }
-
+        STATE_UPDATER.set(this, State.Open);
         log.info("[{}] Created non-durable cursor read-position={} 
mark-delete-position={}", ledger.getName(),
                 readPosition, markDeletePosition);
     }
@@ -110,7 +110,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl 
{
 
     @Override
     public void asyncClose(CloseCallback callback, Object ctx) {
-        // No-Op
+        STATE_UPDATER.set(this, State.Closed);
         callback.closeComplete(ctx);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 2052646..53ae8f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -55,6 +54,8 @@ import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -602,4 +603,22 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
         });
     }
 
+    @Test
+    public void testReaderCursorStatsCorrect() throws Exception {
+        final String readerNotAckTopic = 
"persistent://my-property/my-ns/testReaderCursorStatsCorrect";
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(readerNotAckTopic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(readerNotAckTopic);
+        Assert.assertEquals(internalStats.cursors.size(), 1);
+        String key = new ArrayList<>(internalStats.cursors.keySet()).get(0);
+        ManagedLedgerInternalStats.CursorStats cursor = 
internalStats.cursors.get(key);
+        Assert.assertEquals(cursor.state, "Open");
+        reader.close();
+        internalStats = admin.topics().getInternalStats(readerNotAckTopic);
+        Assert.assertEquals(internalStats.cursors.size(), 0);
+    }
+
 }

Reply via email to