This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 24c337f7a58 [fix][broker] Catch exception for entry payload 
interceptor processor (#23683)
24c337f7a58 is described below

commit 24c337f7a586e30a244739a4100697534fbd9f37
Author: ran <[email protected]>
AuthorDate: Sun Dec 8 18:28:55 2024 +0800

    [fix][broker] Catch exception for entry payload interceptor processor 
(#23683)
---
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 11 ++++-
 .../intercept/MangedLedgerInterceptorImplTest.java | 49 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 036ce9223e8..2c00fd2cdf7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -139,8 +139,15 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable, Managed
             lastInitTime = System.nanoTime();
             if (ml.getManagedLedgerInterceptor() != null) {
                 long originalDataLen = data.readableBytes();
-                payloadProcessorHandle = ml.getManagedLedgerInterceptor()
-                        .processPayloadBeforeLedgerWrite(this.getCtx(), 
duplicateBuffer);
+                try {
+                    payloadProcessorHandle = ml.getManagedLedgerInterceptor()
+                            .processPayloadBeforeLedgerWrite(this.getCtx(), 
duplicateBuffer);
+                } catch (Exception e) {
+                    ReferenceCountUtil.safeRelease(duplicateBuffer);
+                    log.error("[{}] Error processing payload before ledger 
write", ml.getName(), e);
+                    this.failed(new 
ManagedLedgerException.ManagedLedgerInterceptException(e));
+                    return;
+                }
                 if (payloadProcessorHandle != null) {
                     duplicateBuffer = 
payloadProcessorHandle.getProcessedPayload();
                     // If data len of entry changes, correct "dataLength" and 
"currentLedgerSize".
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 74a88382b0e..26b2d52c194 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.broker.intercept;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -431,4 +435,49 @@ public class MangedLedgerInterceptorImplTest  extends 
MockedBookKeeperTestCase {
         }
     }
 
+    @Test(timeOut = 3000)
+    public void testManagedLedgerPayloadInputProcessorFailure() throws 
Exception {
+        var config = new ManagedLedgerConfig();
+        final String failureMsg = "failed to process input payload";
+        config.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl(
+                Collections.emptySet(), Set.of(new 
ManagedLedgerPayloadProcessor() {
+            @Override
+            public Processor inputProcessor() {
+                return new Processor() {
+                    @Override
+                    public ByteBuf process(Object contextObj, ByteBuf 
inputPayload) {
+                        throw new RuntimeException(failureMsg);
+                    }
+
+                    @Override
+                    public void release(ByteBuf processedPayload) {
+                        // no-op
+                        fail("the release method can't be reached");
+                    }
+                };
+            }
+        })));
+
+        var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", 
config);
+        var countDownLatch = new CountDownLatch(1);
+        var expectedException = new ArrayList<Exception>();
+        ledger.asyncAddEntry("test".getBytes(), 1, 1, new 
AsyncCallbacks.AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                entryData.release();
+                countDownLatch.countDown();
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                // expected
+                expectedException.add(exception);
+                countDownLatch.countDown();
+            }
+        }, null);
+        countDownLatch.await();
+        assertEquals(expectedException.size(), 1);
+        assertEquals(expectedException.get(0).getCause().getMessage(), 
failureMsg);
+    }
+
 }

Reply via email to