This is an automated email from the ASF dual-hosted git repository.
rgao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 24c337f7a58 [fix][broker] Catch exception for entry payload
interceptor processor (#23683)
24c337f7a58 is described below
commit 24c337f7a586e30a244739a4100697534fbd9f37
Author: ran <[email protected]>
AuthorDate: Sun Dec 8 18:28:55 2024 +0800
[fix][broker] Catch exception for entry payload interceptor processor
(#23683)
---
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 11 ++++-
.../intercept/MangedLedgerInterceptorImplTest.java | 49 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 036ce9223e8..2c00fd2cdf7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -139,8 +139,15 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable, Managed
lastInitTime = System.nanoTime();
if (ml.getManagedLedgerInterceptor() != null) {
long originalDataLen = data.readableBytes();
- payloadProcessorHandle = ml.getManagedLedgerInterceptor()
- .processPayloadBeforeLedgerWrite(this.getCtx(),
duplicateBuffer);
+ try {
+ payloadProcessorHandle = ml.getManagedLedgerInterceptor()
+ .processPayloadBeforeLedgerWrite(this.getCtx(),
duplicateBuffer);
+ } catch (Exception e) {
+ ReferenceCountUtil.safeRelease(duplicateBuffer);
+ log.error("[{}] Error processing payload before ledger
write", ml.getName(), e);
+ this.failed(new
ManagedLedgerException.ManagedLedgerInterceptException(e));
+ return;
+ }
if (payloadProcessorHandle != null) {
duplicateBuffer =
payloadProcessorHandle.getProcessedPayload();
// If data len of entry changes, correct "dataLength" and
"currentLedgerSize".
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 74a88382b0e..26b2d52c194 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.broker.intercept;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -431,4 +435,49 @@ public class MangedLedgerInterceptorImplTest extends
MockedBookKeeperTestCase {
}
}
+ @Test(timeOut = 3000)
+ public void testManagedLedgerPayloadInputProcessorFailure() throws
Exception {
+ var config = new ManagedLedgerConfig();
+ final String failureMsg = "failed to process input payload";
+ config.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl(
+ Collections.emptySet(), Set.of(new
ManagedLedgerPayloadProcessor() {
+ @Override
+ public Processor inputProcessor() {
+ return new Processor() {
+ @Override
+ public ByteBuf process(Object contextObj, ByteBuf
inputPayload) {
+ throw new RuntimeException(failureMsg);
+ }
+
+ @Override
+ public void release(ByteBuf processedPayload) {
+ // no-op
+ fail("the release method can't be reached");
+ }
+ };
+ }
+ })));
+
+ var ledger = factory.open("testManagedLedgerPayloadProcessorFailure",
config);
+ var countDownLatch = new CountDownLatch(1);
+ var expectedException = new ArrayList<Exception>();
+ ledger.asyncAddEntry("test".getBytes(), 1, 1, new
AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ entryData.release();
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ // expected
+ expectedException.add(exception);
+ countDownLatch.countDown();
+ }
+ }, null);
+ countDownLatch.await();
+ assertEquals(expectedException.size(), 1);
+ assertEquals(expectedException.get(0).getCause().getMessage(),
failureMsg);
+ }
+
}