This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new 493ab7bf0ff [fix][flaky-test]ManagedCursorMetricsTest.testManagedCursorMetrics (#16878) 493ab7bf0ff is described below commit 493ab7bf0ffda064f3c4251e82b390aa1f83057c Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Wed Aug 3 12:08:44 2022 +0800 [fix][flaky-test]ManagedCursorMetricsTest.testManagedCursorMetrics (#16878) (cherry picked from commit a8231a4f821b360d7469685c77268d9591fb072d) --- .../broker/stats/ManagedCursorMetricsTest.java | 150 +++++++++++++++++---- 1 file changed, 122 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java index 4648ae2fb8f..812a6b2e047 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java @@ -18,14 +18,20 @@ */ package org.apache.pulsar.broker.stats; +import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; import lombok.Cleanup; -import org.apache.bookkeeper.client.PulsarMockLedgerHandle; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedCursorMXBean; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -34,21 +40,22 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.PulsarTestClient; import org.apache.pulsar.common.stats.Metrics; import org.awaitility.Awaitility; +import org.powermock.reflect.Whitebox; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = "broker") public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest { - @BeforeMethod(alwaysRun = true) + @BeforeClass @Override protected void setup() throws Exception { super.internalSetup(); } - @AfterMethod(alwaysRun = true) + @AfterClass @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -59,21 +66,28 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest { return PulsarTestClient.create(clientBuilder); } + /*** + * This method has overridden these case: + * brk_ml_cursor_persistLedgerSucceed + * brk_ml_cursor_persistLedgerErrors + * brk_ml_cursor_persistZookeeperSucceed + * brk_ml_cursor_nonContiguousDeletedMessagesRange + * But not overridden "brk_ml_cursor_nonContiguousDeletedMessagesRange". + */ @Test public void testManagedCursorMetrics() throws Exception { final String subName = "my-sub"; final String topicName = "persistent://my-namespace/use/my-ns/my-topic1"; - final int messageSize = 10; - + /** Before create cursor. Verify metrics will not be generated. **/ + // Create ManagedCursorMetrics and verify empty. ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar); - List<Metrics> metricsList = metrics.generate(); Assert.assertTrue(metricsList.isEmpty()); - - metricsList = metrics.generate(); - Assert.assertTrue(metricsList.isEmpty()); - - PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient; + /** + * Trigger the cursor ledger creation. + * After create cursor. Verify all metrics is zero. + */ + // Trigger cursor creation. @Cleanup ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) this.pulsarClient.newConsumer() .topic(topicName) @@ -82,30 +96,108 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest { .subscriptionName(subName) .isAckReceiptEnabled(true) .subscribe(); - @Cleanup Producer<byte[]> producer = this.pulsarClient.newProducer() .topic(topicName) + .enableBatching(false) .create(); - - for(PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) { - ledgerHandle.close(); - } - - for (int i = 0; i < messageSize; i++) { + final PersistentSubscription persistentSubscription = + (PersistentSubscription) pulsar.getBrokerService() + .getTopic(topicName, false).get().get().getSubscription(subName); + final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor(); + ManagedCursorMXBean managedCursorMXBean = managedCursor.getStats(); + // Assert. + metricsList = metrics.generate(); + Assert.assertFalse(metricsList.isEmpty()); + /* + see: https://github.com/apache/pulsar/pull/17504 + "createNewMetadataLedger" triggers once BK write, and "initialize" triggers the execution of + "createNewMetadataLedger". The logic of the branch master has been changed, so this line is different. + */ + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + /** + * 1. Send many messages, and only ack half. After the cursor data is written to BK, + * verify "brk_ml_cursor_persistLedgerSucceed" and "brk_ml_cursor_nonContiguousDeletedMessagesRange". + * 2. Ack another half, verify "brk_ml_cursor_nonContiguousDeletedMessagesRange" is zero. + */ + // Send many message and ack half. + List<MessageId> keepsMessageIdList = new ArrayList<>(); + for (int i = 0; i < 30; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); - consumer.acknowledge(consumer.receive().getMessageId()); + if (i < 10 || i > 20) { + consumer.acknowledge(consumer.receive().getMessageId()); + } else { + keepsMessageIdList.add(consumer.receive().getMessageId()); + } } - - Awaitility.await().until(() -> pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0); + // Wait persistent. + Awaitility.await().atMost(2, TimeUnit.SECONDS) + .until(() -> managedCursorMXBean.getPersistLedgerSucceed() > 0); + // Assert. + metricsList = metrics.generate(); + Assert.assertFalse(metricsList.isEmpty()); + Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); + Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), + 0L); + // Ack another half. + for (MessageId messageId : keepsMessageIdList){ + consumer.acknowledge(messageId); + } + // Wait persistent. + Awaitility.await().atMost(2, TimeUnit.SECONDS) + .until(() -> managedCursor.getTotalNonContiguousDeletedMessagesRange() == 0); + // Assert. + metricsList = metrics.generate(); + Assert.assertFalse(metricsList.isEmpty()); + Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); + Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + /** + * Make BK error, and send many message, then wait cursor persistent finish. + * After the cursor data is written to ZK, verify "brk_ml_cursor_persistLedgerErrors" and + * "brk_ml_cursor_persistZookeeperSucceed". + */ + // Send amd ack messages, at the same time makes BK error. + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> { + String message = UUID.randomUUID().toString(); + producer.send(message.getBytes()); + consumer.acknowledge(consumer.receive().getMessageId()); + // Make BK error. + LedgerHandle ledgerHandle = Whitebox.getInternalState(managedCursor, "cursorLedger"); + ledgerHandle.close(); + return managedCursorMXBean.getPersistLedgerErrors() > 0 + && managedCursorMXBean.getPersistZookeeperSucceed() > 0; + }); + // Assert. metricsList = metrics.generate(); Assert.assertFalse(metricsList.isEmpty()); - Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L); + Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 1L); Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L); Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + /** + * TODO verify "brk_ml_cursor_persistZookeeperErrors". + * This is not easy to implement, we can use {@link #mockZooKeeper} to fail ZK, but we cannot identify whether + * the request is triggered by the "create new ledger then write ZK" or the "persistent cursor then write ZK". + * The cursor path is "/managed-ledgers/my-namespace/use/my-ns/persistent/my-topic1/my-sub". Maybe we can + * mock/spy ManagedCursorImpl to overridden this case in another PR. + */ + mockZooKeeper.unsetAlwaysFail(); + producer.close(); + consumer.close(); + managedCursor.close(); + admin.topics().delete(topicName, true); } @Test @@ -143,10 +235,6 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest { .topic(topicName) .create(); - for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) { - ledgerHandle.close(); - } - for (int i = 0; i < messageSize; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -165,5 +253,11 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L); Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L); Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L); + + // cleanup. + consumer.close(); + consumer2.close(); + producer.close(); + admin.topics().delete(topicName, true); } }