This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a605ea32c7e [cleanup] Cleanup some duplicated code (#23204)
a605ea32c7e is described below

commit a605ea32c7e6813bd37ef73198ed8706d88d4b1a
Author: Yong Zhang <[email protected]>
AuthorDate: Wed Aug 21 03:43:38 2024 +0800

    [cleanup] Cleanup some duplicated code (#23204)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 46 +---------------------
 .../broker/service/persistent/PersistentTopic.java |  8 +---
 .../pulsar/broker/service/PersistentTopicTest.java | 19 ++++-----
 .../pulsar/broker/service/ServerCnxTest.java       |  6 +--
 .../service/persistent/MessageDuplicationTest.java | 21 +++++-----
 5 files changed, 28 insertions(+), 72 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2f60eeff2fb..5756d6e9524 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -687,37 +687,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     @Override
     public Position addEntry(byte[] data, int offset, int length) throws 
InterruptedException, ManagedLedgerException {
-        final CountDownLatch counter = new CountDownLatch(1);
-        // Result list will contain the status exception and the resulting
-        // position
-        class Result {
-            ManagedLedgerException status = null;
-            Position position = null;
-        }
-        final Result result = new Result();
-
-        asyncAddEntry(data, offset, length, new AddEntryCallback() {
-            @Override
-            public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
-                result.position = position;
-                counter.countDown();
-            }
-
-            @Override
-            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
-                result.status = exception;
-                counter.countDown();
-            }
-        }, null);
-
-        counter.await();
-
-        if (result.status != null) {
-            log.error("[{}] Error adding entry", name, result.status);
-            throw result.status;
-        }
-
-        return result.position;
+        return addEntry(data, 1, offset, length);
     }
 
     @Override
@@ -777,19 +747,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     @Override
     public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, 
Object ctx) {
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
-        }
-
-        // retain buffer in this thread
-        buffer.retain();
-
-        // Jump to specific thread to avoid contention from writers writing 
from different threads
-        executor.execute(() -> {
-            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, callback, ctx,
-                    currentLedgerTimeoutTriggered);
-            internalAsyncAddEntry(addOperation);
-        });
+        asyncAddEntry(buffer, 1, callback, ctx);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c26725deaea..146ac05d695 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -683,12 +683,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext 
publishContext) {
-        if (brokerService.isBrokerEntryMetadataEnabled()) {
-            ledger.asyncAddEntry(headersAndPayload,
-                    (int) publishContext.getNumberOfMessages(), this, 
publishContext);
-        } else {
-            ledger.asyncAddEntry(headersAndPayload, this, publishContext);
-        }
+        ledger.asyncAddEntry(headersAndPayload,
+            (int) publishContext.getNumberOfMessages(), this, publishContext);
     }
 
     public void asyncReadEntry(Position position, 
AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f2ed015bd1e..f9171e88361 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.atLeast;
@@ -294,11 +295,11 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         doAnswer(invocationOnMock -> {
             final ByteBuf payload = (ByteBuf) 
invocationOnMock.getArguments()[0];
-            final AddEntryCallback callback = (AddEntryCallback) 
invocationOnMock.getArguments()[1];
-            final Topic.PublishContext ctx = (Topic.PublishContext) 
invocationOnMock.getArguments()[2];
+            final AddEntryCallback callback = (AddEntryCallback) 
invocationOnMock.getArguments()[2];
+            final Topic.PublishContext ctx = (Topic.PublishContext) 
invocationOnMock.getArguments()[3];
             callback.addComplete(PositionFactory.LATEST, payload, ctx);
             return null;
-        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), 
any(AddEntryCallback.class), any());
+        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), 
any(AddEntryCallback.class), any());
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         long lastMaxReadPositionMovedForwardTimestamp = 
topic.getLastMaxReadPositionMovedForwardTimestamp();
@@ -377,10 +378,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         // override asyncAddEntry callback to return error
         doAnswer((Answer<Object>) invocationOnMock -> {
-            ((AddEntryCallback) invocationOnMock.getArguments()[1]).addFailed(
-                    new ManagedLedgerException("Managed ledger failure"), 
invocationOnMock.getArguments()[2]);
+            ((AddEntryCallback) invocationOnMock.getArguments()[2]).addFailed(
+                    new ManagedLedgerException("Managed ledger failure"), 
invocationOnMock.getArguments()[3]);
             return null;
-        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), 
any(AddEntryCallback.class), any());
+        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), 
any(AddEntryCallback.class), any());
 
         topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
             if (exception == null) {
@@ -1421,11 +1422,11 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         // call addComplete on ledger asyncAddEntry
         doAnswer(invocationOnMock -> {
-            ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addComplete(PositionFactory.create(1, 1),
+            ((AddEntryCallback) 
invocationOnMock.getArguments()[2]).addComplete(PositionFactory.create(1, 1),
                     null,
-                    invocationOnMock.getArguments()[2]);
+                    invocationOnMock.getArguments()[3]);
             return null;
-        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), 
any(AddEntryCallback.class), any());
+        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), 
any(AddEntryCallback.class), any());
 
         // call openCursorComplete on cursor asyncOpen
         doAnswer(invocationOnMock -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 03115d79af0..42b52d901e3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2943,12 +2943,12 @@ public class ServerCnxTest {
 
         // call addComplete on ledger asyncAddEntry
         doAnswer((Answer<Object>) invocationOnMock -> {
-            ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addComplete(
+            ((AddEntryCallback) 
invocationOnMock.getArguments()[2]).addComplete(
                     PositionFactory.create(-1, -1),
                     null,
-                    invocationOnMock.getArguments()[2]);
+                    invocationOnMock.getArguments()[3]);
             return null;
-        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), 
any(AddEntryCallback.class), any());
+        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), 
any(AddEntryCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> 
true).when(cursorMock).isDurable();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 4957cc998e3..e7dcbc60213 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -284,7 +285,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
 
         persistentTopic.publishMessage(byteBuf1, publishContext1);
         persistentTopic.addComplete(PositionFactory.create(0, 1), null, 
publishContext1);
-        verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         Long lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 0);
@@ -294,7 +295,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
 
         persistentTopic.publishMessage(byteBuf2, publishContext2);
         persistentTopic.addComplete(PositionFactory.create(0, 2), null, 
publishContext2);
-        verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName2);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 1);
@@ -306,7 +307,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         publishContext1 = getPublishContext(producerName1, 1);
         persistentTopic.publishMessage(byteBuf1, publishContext1);
         persistentTopic.addComplete(PositionFactory.create(0, 3), null, 
publishContext1);
-        verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 1);
@@ -318,7 +319,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         publishContext1 = getPublishContext(producerName1, 5);
         persistentTopic.publishMessage(byteBuf1, publishContext1);
         persistentTopic.addComplete(PositionFactory.create(0, 4), null, 
publishContext1);
-        verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 5);
@@ -330,7 +331,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         byteBuf1 = getMessage(producerName1, 0);
         publishContext1 = getPublishContext(producerName1, 0);
         persistentTopic.publishMessage(byteBuf1, publishContext1);
-        verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 5);
@@ -341,7 +342,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         publishContext1 = getPublishContext(producerName1, 6);
         // don't complete message
         persistentTopic.publishMessage(byteBuf1, publishContext1);
-        verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
         assertEquals(lastSequenceIdPushed.longValue(), 6);
@@ -353,7 +354,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         byteBuf1 = getMessage(producerName1, 6);
         publishContext1 = getPublishContext(producerName1, 6);
         persistentTopic.publishMessage(byteBuf1, publishContext1);
-        verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         verify(publishContext1, 
times(1)).completed(any(MessageDeduplication.MessageDupUnknownException.class), 
eq(-1L), eq(-1L));
 
         // complete seq 6 message eventually
@@ -363,7 +364,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         byteBuf1 = getMessage(producerName1, 7);
         publishContext1 = getPublishContext(producerName1, 7);
         persistentTopic.publishMessage(byteBuf1, publishContext1);
-        verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
 
         persistentTopic.addFailed(new ManagedLedgerException("test"), 
publishContext1);
         // check highestSequencedPushed is reset
@@ -383,7 +384,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         byteBuf1 = getMessage(producerName1, 6);
         publishContext1 = getPublishContext(producerName1, 6);
         persistentTopic.publishMessage(byteBuf1, publishContext1);
-        verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         verify(publishContext1, times(1)).completed(eq(null), eq(-1L), 
eq(-1L));
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);
@@ -393,7 +394,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
         byteBuf1 = getMessage(producerName1, 8);
         publishContext1 = getPublishContext(producerName1, 8);
         persistentTopic.publishMessage(byteBuf1, publishContext1);
-        verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), 
any(), any());
+        verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class), 
anyInt(), any(), any());
         persistentTopic.addComplete(PositionFactory.create(0, 5), null, 
publishContext1);
         lastSequenceIdPushed = 
messageDeduplication.highestSequencedPushed.get(producerName1);
         assertNotNull(lastSequenceIdPushed);

Reply via email to