aloyszhang commented on a change in pull request #9091:
URL: https://github.com/apache/pulsar/pull/9091#discussion_r550010559



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -83,9 +83,9 @@ public void onManagedLedgerPropertiesInitialize(Map<String, 
String> propertiesMa
     public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle 
lh) {
         try {
             for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                if (interceptor instanceof AppendIndexMetadataInterceptor && 
lh.getLastAddConfirmed() >= 0) {

Review comment:
       Agree with you that instanceof is not a good practice if we have a 
better choice. 
   There are two class `AppendIndexMetadataInterceptor` and 
`AppendBrokerTimestampMetadataInterceptor` implement 
`BrokerEntryMetadataInterceptor`. 
   If using polymorphism, for  `AppendBrokerTimestampMetadataInterceptor`, its 
`recoveryIndexGenerator` method do nothing like 
   ```java
   public void recoveryIndexGenerator(long index) {
          // do nothing.
       }
   ```
   And the new `onManagedLedgerLastLedgerInitialize` would be like 
   ```java
     @Override
       public void onManagedLedgerLastLedgerInitialize(String name, 
LedgerHandle lh) {
           try {
               for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
                   if (lh.getLastAddConfirmed() >= 0) {
                       LedgerEntries ledgerEntries =
                               lh.read(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed());
                       for (LedgerEntry entry : ledgerEntries) {
                           PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
                                   
Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
                           if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
                                       
interceptor.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
                           }
                       }
   
                   }
               }
           } catch (org.apache.bookkeeper.client.api.BKException | 
InterruptedException e) {
               log.error("[{}] Read last entry error.", name, e);
           }
       }
   ```
   So , even if interceptor is a `AppendBrokerTimestampMetadataInterceptor`, we 
still need to read the last entry from BookKeeper and  deserialize 
`BrokerEntryMetadata`  from the entry.
   
https://github.com/aloyszhang/pulsar/blob/recovery-index/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java#L87~L92
   ```java
   LedgerEntries ledgerEntries =
                               lh.read(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed());
                       for (LedgerEntry entry : ledgerEntries) {
                           PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
                                   
Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
                           if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to