This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch congbo/cherry-pick-2.9/fix_#17736_problem_MessageCumulativeAckTest_mock in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 52fd58a69322d1675beebc4e94d39fdeb9dbbb40 Author: congbobo184 <[email protected]> AuthorDate: Sat Nov 26 20:51:40 2022 +0800 [cherry-pick][branch-2.9] cherry-pick #17736 problem --- .../broker/service/BrokerBkEnsemblesTests.java | 142 --------------------- 1 file changed, 142 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index d5480e84a0a..aa63b224a9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -276,148 +276,6 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase { consumer.close(); } -<<<<<<< HEAD -======= - @Test - public void testTruncateCorruptDataLedger() throws Exception { - // Ensure intended state for autoSkipNonRecoverableData - admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false"); - - @Cleanup - PulsarClient client = PulsarClient.builder() - .serviceUrl(pulsar.getWebServiceAddress()) - .statsInterval(0, TimeUnit.SECONDS) - .build(); - - final int totalMessages = 100; - final int totalDataLedgers = 5; - final int entriesPerLedger = totalMessages / totalDataLedgers; - - final String tenant = "prop"; - try { - admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"), - Sets.newHashSet(config.getClusterName()))); - } catch (Exception e) { - - } - final String ns1 = tenant + "/crash-broker"; - try { - admin.namespaces().createNamespace(ns1, Sets.newHashSet(config.getClusterName())); - } catch (Exception e) { - - } - - final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis(); - - // Create subscription - Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name") - .receiverQueueSize(5).subscribe(); - - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); - ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); - ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); - Field configField = ManagedCursorImpl.class.getDeclaredField("config"); - configField.setAccessible(true); - // Create multiple data-ledger - ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); - config.setMaxEntriesPerLedger(entriesPerLedger); - config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); - // bookkeeper client - Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper"); - bookKeeperField.setAccessible(true); - // Create multiple data-ledger - BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml); - - // (1) publish messages in 10 data-ledgers each with 20 entries under managed-ledger - Producer<byte[]> producer = client.newProducer().topic(topic1).create(); - for (int i = 0; i < totalMessages; i++) { - String message = "my-message-" + i; - producer.send(message.getBytes()); - } - - // validate: consumer is able to consume msg and close consumer after reading 1 entry - Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS)); - consumer.close(); - - NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo(); - Assert.assertEquals(ledgerInfo.size(), totalDataLedgers); - Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry(); - long firstLedgerToDelete = lastLedger.getKey(); - - // (2) delete first 4 data-ledgers - ledgerInfo.entrySet().forEach(entry -> { - if (!entry.equals(lastLedger)) { - assertEquals(entry.getValue().getEntries(), entriesPerLedger); - try { - bookKeeper.deleteLedger(entry.getKey()); - } catch (Exception e) { - log.warn("failed to delete ledger {}", entry.getKey(), e); - } - } - }); - - // create 5 more ledgers - for (int i = 0; i < totalMessages; i++) { - String message = "my-message2-" + i; - producer.send(message.getBytes()); - } - - // Admin should be able to truncate the topic - admin.topics().truncate(topic1); - - ledgerInfo.entrySet().forEach(entry -> { - log.warn("found ledger: {}", entry.getKey()); - assertNotEquals(firstLedgerToDelete, entry.getKey()); - }); - - // Currently, ledger deletion is async and failed deletion - // does not actually fail truncation but logs an exception - // and creates scheduled task to retry - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { - LedgerMetadata meta = bookKeeper - .getLedgerMetadata(firstLedgerToDelete) - .exceptionally(e -> null) - .get(); - assertEquals(null, meta, "ledger should be deleted " + firstLedgerToDelete); - }); - - // Should not throw, deleting absent ledger must be a noop - // unless PulsarManager returned a wrong error which - // got translated to BKUnexpectedConditionException - try { - bookKeeper.deleteLedger(firstLedgerToDelete); - } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException bke) { - // pass - } - - producer.close(); - consumer.close(); - } - - @Test - public void testDeleteLedgerFactoryCorruptLedger() throws Exception { - ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); - ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("test"); - - // bookkeeper client - Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper"); - bookKeeperField.setAccessible(true); - // Create multiple data-ledger - BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml); - - ml.addEntry("dummy-entry-1".getBytes()); - - NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo(); - long lastLedger = ledgerInfo.lastEntry().getKey(); - - ml.close(); - bookKeeper.deleteLedger(lastLedger); - - // BK ledger is deleted, factory should not throw on delete - factory.delete("test"); - } - ->>>>>>> 63d4cf20e7... ManagedLedger: move to FENCED state in case of BadVersionException (#17736) @Test(timeOut = 20000) public void testTopicWithWildCardChar() throws Exception { @Cleanup
