This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 9381b8552d9 [branch-2.10][fix][broker] Fix index generator is not 
rollback after entries are failed added (#19980)
9381b8552d9 is described below

commit 9381b8552d9f25330df552465ea09ca2b84e0fd6
Author: Zhangao <[email protected]>
AuthorDate: Mon Apr 3 14:37:32 2023 +0800

    [branch-2.10][fix][broker] Fix index generator is not rollback after 
entries are failed added (#19980)
    
    Co-authored-by: gavingaozhangmin <[email protected]>
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 ++++
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  1 +
 .../intercept/ManagedLedgerInterceptor.java        |  8 ++++
 .../intercept/ManagedLedgerInterceptorImpl.java    |  9 +++++
 .../intercept/MangedLedgerInterceptorImplTest.java | 43 ++++++++++++++++++++++
 .../intercept/AppendIndexMetadataInterceptor.java  |  4 ++
 6 files changed, 72 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 14967728214..b6872dfd02e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -840,6 +840,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         lastAddEntryTimeMs = System.currentTimeMillis();
     }
 
+    protected void afterFailedAddEntry(int numOfMessages) {
+        if (managedLedgerInterceptor == null) {
+            return;
+        }
+        managedLedgerInterceptor.afterFailedAddEntry(numOfMessages);
+    }
+
     private boolean beforeAddEntry(OpAddEntry addOperation) {
         // if no interceptor, just return true to make sure addOperation will 
be initiate()
         if (managedLedgerInterceptor == null) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index ada391f657d..1b6c6e96175 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -150,6 +150,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
 
     public void failed(ManagedLedgerException e) {
         AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+        ml.afterFailedAddEntry(this.getNumberOfMessages());
         if (cb != null) {
             ReferenceCountUtil.release(data);
             cb.addFailed(e, ctx);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
index cb5ba0cbfee..9b1cd4cf0fb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
@@ -41,6 +41,14 @@ public interface ManagedLedgerInterceptor {
      */
     OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages);
 
+    /**
+     * Intercept When add entry failed.
+     * @param numberOfMessages
+     */
+    default void afterFailedAddEntry(int numberOfMessages){
+
+    }
+
     /**
      * Intercept when ManagedLedger is initialized.
      * @param propertiesMap map of properties.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index 0f634e0553e..11cfaf9f8bc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -78,6 +78,15 @@ public class ManagedLedgerInterceptorImpl implements 
ManagedLedgerInterceptor {
         return op;
     }
 
+    @Override
+    public void afterFailedAddEntry(int numberOfMessages) {
+        for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
+            if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                ((AppendIndexMetadataInterceptor) 
interceptor).decreaseWithNumberOfMessages(numberOfMessages);
+            }
+        }
+    }
+
     @Override
     public void onManagedLedgerPropertiesInitialize(Map<String, String> 
propertiesMap) {
         if (propertiesMap == null || propertiesMap.size() == 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 22562602e62..57cf446c02a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -304,6 +304,49 @@ public class MangedLedgerInterceptorImplTest  extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testAddEntryFailed() throws Exception {
+        final int MOCK_BATCH_SIZE = 2;
+        final String ledgerAndCursorName = "testAddEntryFailed";
+
+        ManagedLedgerInterceptor interceptor =
+                new 
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setManagedLedgerInterceptor(interceptor);
+
+        ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+        ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+
+        ledger.terminate();
+
+        ManagedLedgerInterceptorImpl interceptor1 =
+                (ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor();
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        try {
+            ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new 
AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                    countDownLatch.countDown();
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                    countDownLatch.countDown();
+                }
+            }, null);
+
+            countDownLatch.await();
+            assertEquals(interceptor1.getIndex(), -1);
+        } finally {
+            ledger.close();
+            factory.shutdown();
+        }
+
+    }
+
     @Test
     public void testBeforeAddEntryWithException() throws Exception {
         final int MOCK_BATCH_SIZE = 2;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
index 6dfed6a7448..dc8ea827d4d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
@@ -50,4 +50,8 @@ public class AppendIndexMetadataInterceptor implements 
BrokerEntryMetadataInterc
     public long getIndex() {
         return indexGenerator.get();
     }
+
+    public void decreaseWithNumberOfMessages(int numberOfMessages) {
+        indexGenerator.addAndGet(-numberOfMessages);
+    }
 }

Reply via email to