This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8a472955c9cd60043f14471c8735078c5672ec4f
Author: ran <[email protected]>
AuthorDate: Sun Dec 8 18:28:55 2024 +0800

    [fix][broker] Catch exception for entry payload interceptor processor 
(#23683)
    
    (cherry picked from commit 24c337f7a586e30a244739a4100697534fbd9f37)
---
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 11 ++++-
 .../intercept/MangedLedgerInterceptorImplTest.java | 49 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index acbb0da5a4e..28bbba42558 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -138,8 +138,15 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable {
             lastInitTime = System.nanoTime();
             if (ml.getManagedLedgerInterceptor() != null) {
                 long originalDataLen = data.readableBytes();
-                payloadProcessorHandle = 
ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this,
-                        duplicateBuffer);
+                try {
+                    payloadProcessorHandle = ml.getManagedLedgerInterceptor()
+                            .processPayloadBeforeLedgerWrite(this, 
duplicateBuffer);
+                } catch (Exception e) {
+                    ReferenceCountUtil.safeRelease(duplicateBuffer);
+                    log.error("[{}] Error processing payload before ledger 
write", ml.getName(), e);
+                    this.failed(new 
ManagedLedgerException.ManagedLedgerInterceptException(e));
+                    return;
+                }
                 if (payloadProcessorHandle != null) {
                     duplicateBuffer = 
payloadProcessorHandle.getProcessedPayload();
                     // If data len of entry changes, correct "dataLength" and 
"currentLedgerSize".
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 7d164b68147..863ab3b9532 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pulsar.broker.intercept;
 
+import static org.testng.Assert.fail;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Predicate;
@@ -436,4 +440,49 @@ public class MangedLedgerInterceptorImplTest  extends 
MockedBookKeeperTestCase {
         }
     }
 
+    @Test(timeOut = 3000)
+    public void testManagedLedgerPayloadInputProcessorFailure() throws 
Exception {
+        var config = new ManagedLedgerConfig();
+        final String failureMsg = "failed to process input payload";
+        config.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl(
+                Collections.emptySet(), Set.of(new 
ManagedLedgerPayloadProcessor() {
+            @Override
+            public Processor inputProcessor() {
+                return new Processor() {
+                    @Override
+                    public ByteBuf process(Object contextObj, ByteBuf 
inputPayload) {
+                        throw new RuntimeException(failureMsg);
+                    }
+
+                    @Override
+                    public void release(ByteBuf processedPayload) {
+                        // no-op
+                        fail("the release method can't be reached");
+                    }
+                };
+            }
+        })));
+
+        var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", 
config);
+        var countDownLatch = new CountDownLatch(1);
+        var expectedException = new ArrayList<Exception>();
+        ledger.asyncAddEntry("test".getBytes(), 1, 1, new 
AsyncCallbacks.AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                entryData.release();
+                countDownLatch.countDown();
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                // expected
+                expectedException.add(exception);
+                countDownLatch.countDown();
+            }
+        }, null);
+        countDownLatch.await();
+        assertEquals(expectedException.size(), 1);
+        assertEquals(expectedException.get(0).getCause().getMessage(), 
failureMsg);
+    }
+
 }

Reply via email to