This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 6610c0297f6 [fix][storage]fix OpAddEntry release error when exception
in ManagedLedgerInterceptor (#17394)
6610c0297f6 is described below
commit 6610c0297f6bfa7e5e253631dfc95d643d6db115
Author: Qiang Huang <[email protected]>
AuthorDate: Thu Sep 8 11:08:49 2022 +0800
[fix][storage]fix OpAddEntry release error when exception in
ManagedLedgerInterceptor (#17394)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 -
.../intercept/MangedLedgerInterceptorImplTest.java | 64 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a2282ee0f92..845aa3670f2 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,7 +34,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
@@ -839,7 +838,6 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
} catch (Exception e) {
addOperation.failed(
new ManagedLedgerInterceptException("Interceptor managed
ledger before add to bookie failed."));
- ReferenceCountUtil.release(addOperation.data);
log.error("[{}] Failed to intercept adding an entry to bookie.",
name, e);
return false;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index e42db517f8d..37c49b79ed0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -20,13 +20,18 @@ package org.apache.pulsar.broker.intercept;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.util.concurrent.CountDownLatch;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -256,6 +261,65 @@ public class MangedLedgerInterceptorImplTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testBeforeAddEntryWithException() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ final String ledgerAndCursorName = "testBeforeAddEntryWithException";
+
+ ManagedLedgerInterceptor interceptor =
+ new
MockManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setManagedLedgerInterceptor(interceptor);
+
+ ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
+ ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new
AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ countDownLatch.countDown();
+ }
+ }, null);
+ countDownLatch.await();
+ assertEquals(buffer.refCnt(), 1);
+ } finally {
+ ledger.close();
+ factory.shutdown();
+ }
+ }
+
+ private class MockManagedLedgerInterceptorImpl extends
ManagedLedgerInterceptorImpl {
+ private final Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors;
+
+ public MockManagedLedgerInterceptorImpl(
+ Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors,
+ Set<ManagedLedgerPayloadProcessor>
brokerEntryPayloadProcessors) {
+ super(brokerEntryMetadataInterceptors,
brokerEntryPayloadProcessors);
+ this.brokerEntryMetadataInterceptors =
brokerEntryMetadataInterceptors;
+ }
+
+ @Override
+ public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+ if (op == null || numberOfMessages <= 0) {
+ return op;
+ }
+ op.setData(Commands.addBrokerEntryMetadata(op.getData(),
brokerEntryMetadataInterceptors,
+ numberOfMessages));
+ if (op != null) {
+ throw new RuntimeException("throw exception before add entry
for test");
+ }
+ return op;
+ }
+ }
+
public static Set<BrokerEntryMetadataInterceptor>
getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");