This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 9381b8552d9 [branch-2.10][fix][broker] Fix index generator is not
rollback after entries are failed added (#19980)
9381b8552d9 is described below
commit 9381b8552d9f25330df552465ea09ca2b84e0fd6
Author: Zhangao <[email protected]>
AuthorDate: Mon Apr 3 14:37:32 2023 +0800
[branch-2.10][fix][broker] Fix index generator is not rollback after
entries are failed added (#19980)
Co-authored-by: gavingaozhangmin <[email protected]>
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 ++++
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 1 +
.../intercept/ManagedLedgerInterceptor.java | 8 ++++
.../intercept/ManagedLedgerInterceptorImpl.java | 9 +++++
.../intercept/MangedLedgerInterceptorImplTest.java | 43 ++++++++++++++++++++++
.../intercept/AppendIndexMetadataInterceptor.java | 4 ++
6 files changed, 72 insertions(+)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 14967728214..b6872dfd02e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -840,6 +840,13 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
lastAddEntryTimeMs = System.currentTimeMillis();
}
+ protected void afterFailedAddEntry(int numOfMessages) {
+ if (managedLedgerInterceptor == null) {
+ return;
+ }
+ managedLedgerInterceptor.afterFailedAddEntry(numOfMessages);
+ }
+
private boolean beforeAddEntry(OpAddEntry addOperation) {
// if no interceptor, just return true to make sure addOperation will
be initiate()
if (managedLedgerInterceptor == null) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index ada391f657d..1b6c6e96175 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -150,6 +150,7 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
public void failed(ManagedLedgerException e) {
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+ ml.afterFailedAddEntry(this.getNumberOfMessages());
if (cb != null) {
ReferenceCountUtil.release(data);
cb.addFailed(e, ctx);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
index cb5ba0cbfee..9b1cd4cf0fb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
@@ -41,6 +41,14 @@ public interface ManagedLedgerInterceptor {
*/
OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages);
+ /**
+ * Intercept When add entry failed.
+ * @param numberOfMessages
+ */
+ default void afterFailedAddEntry(int numberOfMessages){
+
+ }
+
/**
* Intercept when ManagedLedger is initialized.
* @param propertiesMap map of properties.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index 0f634e0553e..11cfaf9f8bc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -78,6 +78,15 @@ public class ManagedLedgerInterceptorImpl implements
ManagedLedgerInterceptor {
return op;
}
+ @Override
+ public void afterFailedAddEntry(int numberOfMessages) {
+ for (BrokerEntryMetadataInterceptor interceptor :
brokerEntryMetadataInterceptors) {
+ if (interceptor instanceof AppendIndexMetadataInterceptor) {
+ ((AppendIndexMetadataInterceptor)
interceptor).decreaseWithNumberOfMessages(numberOfMessages);
+ }
+ }
+ }
+
@Override
public void onManagedLedgerPropertiesInitialize(Map<String, String>
propertiesMap) {
if (propertiesMap == null || propertiesMap.size() == 0) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 22562602e62..57cf446c02a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -304,6 +304,49 @@ public class MangedLedgerInterceptorImplTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testAddEntryFailed() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ final String ledgerAndCursorName = "testAddEntryFailed";
+
+ ManagedLedgerInterceptor interceptor =
+ new
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setManagedLedgerInterceptor(interceptor);
+
+ ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+
+ ledger.terminate();
+
+ ManagedLedgerInterceptorImpl interceptor1 =
+ (ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor();
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
+ ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new
AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ countDownLatch.countDown();
+ }
+ }, null);
+
+ countDownLatch.await();
+ assertEquals(interceptor1.getIndex(), -1);
+ } finally {
+ ledger.close();
+ factory.shutdown();
+ }
+
+ }
+
@Test
public void testBeforeAddEntryWithException() throws Exception {
final int MOCK_BATCH_SIZE = 2;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
index 6dfed6a7448..dc8ea827d4d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
@@ -50,4 +50,8 @@ public class AppendIndexMetadataInterceptor implements
BrokerEntryMetadataInterc
public long getIndex() {
return indexGenerator.get();
}
+
+ public void decreaseWithNumberOfMessages(int numberOfMessages) {
+ indexGenerator.addAndGet(-numberOfMessages);
+ }
}