eolivelli commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r924087799


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -893,7 +893,18 @@ private void deleteManagedLedgerData(BookKeeper bkc, 
String managedLedgerName, M
             DeleteLedgerCallback callback, Object ctx) {
         Futures.waitForAll(info.ledgers.stream()
                 .filter(li -> !li.isOffloaded)
-                .map(li -> 
bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute())
+                .map(li -> 
bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
+                        .handleAsync((result, ex) -> {
+                            if (ex != null) {
+                                int rc = BKException.getExceptionCode(ex);
+                                if (rc == 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+                                    || rc == 
BKException.Code.NoSuchLedgerExistsException) {
+                                    return null;

Review Comment:
   I think that we should log something, at INFO level, this way we will see 
when/if this happens in production.
   we are not going to flood the logs because I think that this will happen 
only seldom.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java:
##########
@@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", 
"false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {

Review Comment:
   why aren't we failing in case of failure of createTenant/createNamespace ?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java:
##########
@@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", 
"false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {
+
+        }
+        final String ns1 = tenant + "/crash-broker";
+        try {
+            admin.namespaces().createNamespace(ns1, 
Sets.newHashSet(config.getClusterName()));
+        } catch (Exception e) {
+
+        }
+
+        final String topic1 = "persistent://" + ns1 + "/my-topic-" + 
System.currentTimeMillis();
+
+        // Create subscription
+        Consumer<byte[]> consumer = 
client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
+                .receiverQueueSize(5).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().iterator().next();
+        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
+        configField.setAccessible(true);
+        // Create multiple data-ledger
+        ManagedLedgerConfig config = (ManagedLedgerConfig) 
configField.get(cursor);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        // bookkeeper client
+        Field bookKeeperField = 
ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+        bookKeeperField.setAccessible(true);
+        // Create multiple data-ledger
+        BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+        // (1) publish messages in 10 data-ledgers each with 20 entries under 
managed-ledger
+        Producer<byte[]> producer = 
client.newProducer().topic(topic1).create();
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // validate: consumer is able to consume msg and close consumer after 
reading 1 entry
+        Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
+        consumer.close();
+
+        NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+        Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
+        Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
+        long firstLedgerToDelete = lastLedger.getKey();
+
+        // (2) delete first 4 data-ledgers
+        ledgerInfo.entrySet().forEach(entry -> {
+            if (!entry.equals(lastLedger)) {
+                assertEquals(entry.getValue().getEntries(), entriesPerLedger);
+                try {
+                    bookKeeper.deleteLedger(entry.getKey());
+                } catch (Exception e) {
+                    e.printStackTrace();

Review Comment:
   nit: use logger



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java:
##########
@@ -61,15 +85,18 @@ public InvalidImplementationException(String msg) {
      */
     public static class NotFoundException extends MetadataStoreException {
         public NotFoundException() {
-            super((Throwable) null);
+            super(makeBkFriendlyException(

Review Comment:
   my understanding is that MetadataStoreException is about 
ZooKeeper/Etdc/RocksDB metadata stores.
   so NotFound is like "znode does not exist"
   
   why do we need to always inject a BKException as cause ?
   
   can we do it only when we are using PulsarLedgerManager/ManagedLedger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to