This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b659cdce4871b1553b134e2e7e668e9dc1513318 Author: Qiang Zhao <[email protected]> AuthorDate: Mon Mar 28 16:43:29 2022 +0800 [fix][broker] Fix wrong state for non-durable cursor (#14869) ### Motivation The current non-durable cursor does not have the correct state. For example, when the reader is created, I always see the cursor status as ``Uninitialized`` via the ``getInternalStats`` method. ```json { "reader-xxxxx" : { "markDeletePosition" : "19785:18718", "readPosition" : "19807:42735", "waitingReadOp" : false, "pendingReadOps" : 0, "messagesConsumedCounter" : -2257, "cursorLedger" : -1, "cursorLedgerLastEntry" : -1, "individuallyDeletedMessages" : "[]", "lastLedgerSwitchTimestamp" : "2022-03-24T20:03:51.85Z", "state" : "Uninitialized", "numberOfEntriesSinceFirstNotAckedMessage" : 744993, "totalNonContiguousDeletedMessagesRange" : 0, "subscriptionHavePendingRead" : true, "subscriptionHavePendingReplayRead" : false, "properties" : { } } } ``` ### Modifications - Correct the cursor state. (cherry picked from commit b477557a6f6d36ae18f09d4edfa076e9092a17fe) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../mledger/impl/NonDurableCursorImpl.java | 4 ++-- .../org/apache/pulsar/client/impl/ReaderTest.java | 21 ++++++++++++++++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8c96f0e..f882444 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -236,7 +236,7 @@ public class ManagedCursorImpl implements ManagedCursor { Closed // The managed cursor has been closed } - private static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> STATE_UPDATER = + protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state"); protected volatile State state = null; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 1d545bd..4625f5b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -61,7 +61,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { // read-position recoverCursor(startCursorPosition); } - + STATE_UPDATER.set(this, State.Open); log.info("[{}] Created non-durable cursor read-position={} mark-delete-position={}", ledger.getName(), readPosition, markDeletePosition); } @@ -110,7 +110,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { @Override public void asyncClose(CloseCallback callback, Object ctx) { - // No-Op + STATE_UPDATER.set(this, State.Closed); callback.closeComplete(ctx); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 2052646..53ae8f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -55,6 +54,8 @@ import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; @@ -602,4 +603,22 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { }); } + @Test + public void testReaderCursorStatsCorrect() throws Exception { + final String readerNotAckTopic = "persistent://my-property/my-ns/testReaderCursorStatsCorrect"; + @Cleanup + Reader<byte[]> reader = pulsarClient.newReader() + .topic(readerNotAckTopic) + .startMessageId(MessageId.earliest) + .create(); + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(readerNotAckTopic); + Assert.assertEquals(internalStats.cursors.size(), 1); + String key = new ArrayList<>(internalStats.cursors.keySet()).get(0); + ManagedLedgerInternalStats.CursorStats cursor = internalStats.cursors.get(key); + Assert.assertEquals(cursor.state, "Open"); + reader.close(); + internalStats = admin.topics().getInternalStats(readerNotAckTopic); + Assert.assertEquals(internalStats.cursors.size(), 0); + } + }
