This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a605ea32c7e [cleanup] Cleanup some duplicated code (#23204)
a605ea32c7e is described below
commit a605ea32c7e6813bd37ef73198ed8706d88d4b1a
Author: Yong Zhang <[email protected]>
AuthorDate: Wed Aug 21 03:43:38 2024 +0800
[cleanup] Cleanup some duplicated code (#23204)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 46 +---------------------
.../broker/service/persistent/PersistentTopic.java | 8 +---
.../pulsar/broker/service/PersistentTopicTest.java | 19 ++++-----
.../pulsar/broker/service/ServerCnxTest.java | 6 +--
.../service/persistent/MessageDuplicationTest.java | 21 +++++-----
5 files changed, 28 insertions(+), 72 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2f60eeff2fb..5756d6e9524 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -687,37 +687,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public Position addEntry(byte[] data, int offset, int length) throws
InterruptedException, ManagedLedgerException {
- final CountDownLatch counter = new CountDownLatch(1);
- // Result list will contain the status exception and the resulting
- // position
- class Result {
- ManagedLedgerException status = null;
- Position position = null;
- }
- final Result result = new Result();
-
- asyncAddEntry(data, offset, length, new AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
- result.position = position;
- counter.countDown();
- }
-
- @Override
- public void addFailed(ManagedLedgerException exception, Object
ctx) {
- result.status = exception;
- counter.countDown();
- }
- }, null);
-
- counter.await();
-
- if (result.status != null) {
- log.error("[{}] Error adding entry", name, result.status);
- throw result.status;
- }
-
- return result.position;
+ return addEntry(data, 1, offset, length);
}
@Override
@@ -777,19 +747,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback,
Object ctx) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] asyncAddEntry size={} state={}", name,
buffer.readableBytes(), state);
- }
-
- // retain buffer in this thread
- buffer.retain();
-
- // Jump to specific thread to avoid contention from writers writing
from different threads
- executor.execute(() -> {
- OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, callback, ctx,
- currentLedgerTimeoutTriggered);
- internalAsyncAddEntry(addOperation);
- });
+ asyncAddEntry(buffer, 1, callback, ctx);
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c26725deaea..146ac05d695 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -683,12 +683,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext
publishContext) {
- if (brokerService.isBrokerEntryMetadataEnabled()) {
- ledger.asyncAddEntry(headersAndPayload,
- (int) publishContext.getNumberOfMessages(), this,
publishContext);
- } else {
- ledger.asyncAddEntry(headersAndPayload, this, publishContext);
- }
+ ledger.asyncAddEntry(headersAndPayload,
+ (int) publishContext.getNumberOfMessages(), this, publishContext);
}
public void asyncReadEntry(Position position,
AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f2ed015bd1e..f9171e88361 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -24,6 +24,7 @@ import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeast;
@@ -294,11 +295,11 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doAnswer(invocationOnMock -> {
final ByteBuf payload = (ByteBuf)
invocationOnMock.getArguments()[0];
- final AddEntryCallback callback = (AddEntryCallback)
invocationOnMock.getArguments()[1];
- final Topic.PublishContext ctx = (Topic.PublishContext)
invocationOnMock.getArguments()[2];
+ final AddEntryCallback callback = (AddEntryCallback)
invocationOnMock.getArguments()[2];
+ final Topic.PublishContext ctx = (Topic.PublishContext)
invocationOnMock.getArguments()[3];
callback.addComplete(PositionFactory.LATEST, payload, ctx);
return null;
- }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class),
any(AddEntryCallback.class), any());
+ }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(),
any(AddEntryCallback.class), any());
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
long lastMaxReadPositionMovedForwardTimestamp =
topic.getLastMaxReadPositionMovedForwardTimestamp();
@@ -377,10 +378,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// override asyncAddEntry callback to return error
doAnswer((Answer<Object>) invocationOnMock -> {
- ((AddEntryCallback) invocationOnMock.getArguments()[1]).addFailed(
- new ManagedLedgerException("Managed ledger failure"),
invocationOnMock.getArguments()[2]);
+ ((AddEntryCallback) invocationOnMock.getArguments()[2]).addFailed(
+ new ManagedLedgerException("Managed ledger failure"),
invocationOnMock.getArguments()[3]);
return null;
- }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class),
any(AddEntryCallback.class), any());
+ }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(),
any(AddEntryCallback.class), any());
topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
if (exception == null) {
@@ -1421,11 +1422,11 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// call addComplete on ledger asyncAddEntry
doAnswer(invocationOnMock -> {
- ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addComplete(PositionFactory.create(1, 1),
+ ((AddEntryCallback)
invocationOnMock.getArguments()[2]).addComplete(PositionFactory.create(1, 1),
null,
- invocationOnMock.getArguments()[2]);
+ invocationOnMock.getArguments()[3]);
return null;
- }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class),
any(AddEntryCallback.class), any());
+ }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(),
any(AddEntryCallback.class), any());
// call openCursorComplete on cursor asyncOpen
doAnswer(invocationOnMock -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 03115d79af0..42b52d901e3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2943,12 +2943,12 @@ public class ServerCnxTest {
// call addComplete on ledger asyncAddEntry
doAnswer((Answer<Object>) invocationOnMock -> {
- ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addComplete(
+ ((AddEntryCallback)
invocationOnMock.getArguments()[2]).addComplete(
PositionFactory.create(-1, -1),
null,
- invocationOnMock.getArguments()[2]);
+ invocationOnMock.getArguments()[3]);
return null;
- }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class),
any(AddEntryCallback.class), any());
+ }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(),
any(AddEntryCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock ->
true).when(cursorMock).isDurable();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 4957cc998e3..e7dcbc60213 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -284,7 +285,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(PositionFactory.create(0, 1), null,
publishContext1);
- verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(1)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
Long lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 0);
@@ -294,7 +295,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
persistentTopic.publishMessage(byteBuf2, publishContext2);
persistentTopic.addComplete(PositionFactory.create(0, 2), null,
publishContext2);
- verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(2)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName2);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
@@ -306,7 +307,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
publishContext1 = getPublishContext(producerName1, 1);
persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(PositionFactory.create(0, 3), null,
publishContext1);
- verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(3)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 1);
@@ -318,7 +319,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
publishContext1 = getPublishContext(producerName1, 5);
persistentTopic.publishMessage(byteBuf1, publishContext1);
persistentTopic.addComplete(PositionFactory.create(0, 4), null,
publishContext1);
- verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
@@ -330,7 +331,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
persistentTopic.publishMessage(byteBuf1, publishContext1);
- verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(4)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
@@ -341,7 +342,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
publishContext1 = getPublishContext(producerName1, 6);
// don't complete message
persistentTopic.publishMessage(byteBuf1, publishContext1);
- verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 6);
@@ -353,7 +354,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
byteBuf1 = getMessage(producerName1, 6);
publishContext1 = getPublishContext(producerName1, 6);
persistentTopic.publishMessage(byteBuf1, publishContext1);
- verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(5)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
verify(publishContext1,
times(1)).completed(any(MessageDeduplication.MessageDupUnknownException.class),
eq(-1L), eq(-1L));
// complete seq 6 message eventually
@@ -363,7 +364,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
byteBuf1 = getMessage(producerName1, 7);
publishContext1 = getPublishContext(producerName1, 7);
persistentTopic.publishMessage(byteBuf1, publishContext1);
- verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
persistentTopic.addFailed(new ManagedLedgerException("test"),
publishContext1);
// check highestSequencedPushed is reset
@@ -383,7 +384,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
byteBuf1 = getMessage(producerName1, 6);
publishContext1 = getPublishContext(producerName1, 6);
persistentTopic.publishMessage(byteBuf1, publishContext1);
- verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(6)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
verify(publishContext1, times(1)).completed(eq(null), eq(-1L),
eq(-1L));
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
@@ -393,7 +394,7 @@ public class MessageDuplicationTest extends BrokerTestBase {
byteBuf1 = getMessage(producerName1, 8);
publishContext1 = getPublishContext(producerName1, 8);
persistentTopic.publishMessage(byteBuf1, publishContext1);
- verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class),
any(), any());
+ verify(managedLedger, times(7)).asyncAddEntry(any(ByteBuf.class),
anyInt(), any(), any());
persistentTopic.addComplete(PositionFactory.create(0, 5), null,
publishContext1);
lastSequenceIdPushed =
messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);