This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 35e6c17aa55f88c0685f369a27b44d042a2a2c78 Author: ran <[email protected]> AuthorDate: Sun Dec 8 18:28:55 2024 +0800 [fix][broker] Catch exception for entry payload interceptor processor (#23683) (cherry picked from commit 24c337f7a586e30a244739a4100697534fbd9f37) --- .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 11 ++++- .../intercept/MangedLedgerInterceptorImplTest.java | 49 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 36671e5956a..bd64a901ec6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -138,8 +138,15 @@ public class OpAddEntry implements AddCallback, CloseCallback, Runnable { lastInitTime = System.nanoTime(); if (ml.getManagedLedgerInterceptor() != null) { long originalDataLen = data.readableBytes(); - payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, - duplicateBuffer); + try { + payloadProcessorHandle = ml.getManagedLedgerInterceptor() + .processPayloadBeforeLedgerWrite(this, duplicateBuffer); + } catch (Exception e) { + ReferenceCountUtil.safeRelease(duplicateBuffer); + log.error("[{}] Error processing payload before ledger write", ml.getName(), e); + this.failed(new ManagedLedgerException.ManagedLedgerInterceptException(e)); + return; + } if (payloadProcessorHandle != null) { duplicateBuffer = payloadProcessorHandle.getProcessedPayload(); // If data len of entry changes, correct "dataLength" and "currentLedgerSize". diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java index 7d164b68147..863ab3b9532 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java @@ -18,8 +18,12 @@ */ package org.apache.pulsar.broker.intercept; +import static org.testng.Assert.fail; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.function.Predicate; @@ -436,4 +440,49 @@ public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase { } } + @Test(timeOut = 3000) + public void testManagedLedgerPayloadInputProcessorFailure() throws Exception { + var config = new ManagedLedgerConfig(); + final String failureMsg = "failed to process input payload"; + config.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl( + Collections.emptySet(), Set.of(new ManagedLedgerPayloadProcessor() { + @Override + public Processor inputProcessor() { + return new Processor() { + @Override + public ByteBuf process(Object contextObj, ByteBuf inputPayload) { + throw new RuntimeException(failureMsg); + } + + @Override + public void release(ByteBuf processedPayload) { + // no-op + fail("the release method can't be reached"); + } + }; + } + }))); + + var ledger = factory.open("testManagedLedgerPayloadProcessorFailure", config); + var countDownLatch = new CountDownLatch(1); + var expectedException = new ArrayList<Exception>(); + ledger.asyncAddEntry("test".getBytes(), 1, 1, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + entryData.release(); + countDownLatch.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + // expected + expectedException.add(exception); + countDownLatch.countDown(); + } + }, null); + countDownLatch.await(); + assertEquals(expectedException.size(), 1); + assertEquals(expectedException.get(0).getCause().getMessage(), failureMsg); + } + }
