This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d558b1df91e Revert "[fix][ml] Enhance OpFindNewest to support skip
non-recoverable data (#24441)"
d558b1df91e is described below
commit d558b1df91ec5494317593a5784914774aec53dc
Author: coderzc <[email protected]>
AuthorDate: Wed Jul 2 10:29:40 2025 +0800
Revert "[fix][ml] Enhance OpFindNewest to support skip non-recoverable data
(#24441)"
This reverts commit 8f348a5f7ed06a8baa9f60baeab69971464f0489.
---
.../bookkeeper/mledger/impl/OpFindNewest.java | 117 +--------------
.../service/PersistentMessageFinderTest.java | 159 +--------------------
.../broker/service/SubscriptionSeekTest.java | 86 -----------
3 files changed, 5 insertions(+), 357 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index befc40188a3..900af9322c7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -18,8 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.collect.Range;
import java.util.Optional;
import java.util.function.Predicate;
import lombok.extern.slf4j.Slf4j;
@@ -29,7 +27,6 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
-import org.apache.bookkeeper.mledger.PositionFactory;
@Slf4j
class OpFindNewest implements ReadEntryCallback {
@@ -47,7 +44,6 @@ class OpFindNewest implements ReadEntryCallback {
PositionImpl searchPosition;
long min;
long max;
- long mid;
Position lastMatchedPosition = null;
State state;
@@ -62,7 +58,6 @@ class OpFindNewest implements ReadEntryCallback {
this.min = 0;
this.max = numberOfEntries;
- this.mid = mid();
this.searchPosition = startPosition;
this.state = State.checkFirst;
@@ -118,8 +113,7 @@ class OpFindNewest implements ReadEntryCallback {
} else {
// start binary search
state = State.searching;
- this.mid = mid();
- searchPosition = ledger.getPositionAfterN(startPosition,
this.mid, PositionBound.startExcluded);
+ searchPosition = ledger.getPositionAfterN(startPosition,
mid(), PositionBound.startExcluded);
find();
}
break;
@@ -127,129 +121,26 @@ class OpFindNewest implements ReadEntryCallback {
if (condition.test(entry)) {
// mid - last
lastMatchedPosition = position;
- min = mid;
+ min = mid();
} else {
// start - mid
- max = mid - 1;
+ max = mid() - 1;
}
- this.mid = mid();
if (max <= min) {
callback.findEntryComplete(lastMatchedPosition,
OpFindNewest.this.ctx);
return;
}
- searchPosition = ledger.getPositionAfterN(startPosition, this.mid,
PositionBound.startExcluded);
+ searchPosition = ledger.getPositionAfterN(startPosition, mid(),
PositionBound.startExcluded);
find();
}
}
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- if (exception instanceof
ManagedLedgerException.NonRecoverableLedgerException
- && ledger.getConfig().isAutoSkipNonRecoverableData()) {
- try {
- log.info("[{}] Ledger {} is not recoverable, skip
non-recoverable data, state:{}", ledger.getName(),
- searchPosition, state);
- checkArgument(state == State.checkFirst || state ==
State.checkLast || state == State.searching);
- if (state == State.checkFirst) {
- // If we failed to read the first entry, try next valid
position
- Position nextPosition =
findNextValidPosition(searchPosition, exception);
- if (nextPosition != null && nextPosition.getEntryId() !=
-1) {
- long numberOfEntries =
-
ledger.getNumberOfEntries(Range.closedOpen(searchPosition, nextPosition));
- searchPosition = nextPosition;
- min += numberOfEntries;
- find();
- return;
- }
- } else if (state == State.checkLast) {
- Position prevPosition =
findPreviousValidPosition(searchPosition, exception);
- if (prevPosition != null && prevPosition.getEntryId() !=
-1) {
- long numberOfEntries =
-
ledger.getNumberOfEntries(Range.openClosed(prevPosition, searchPosition));
- searchPosition = prevPosition;
- max -= numberOfEntries;
- find();
- return;
- }
- } else if (state == State.searching) {
- // In searching state, if we failed to read the mid entry,
try next valid position
- Position nextPosition =
findNextValidPosition(searchPosition, exception);
- if (nextPosition != null && nextPosition.getEntryId() !=
-1) {
- searchPosition = nextPosition;
- find();
- return;
- } else {
- // If we can't find next valid position, try previous
valid position
- Position prevPosition =
findPreviousValidPosition(searchPosition, exception);
- if (prevPosition != null && prevPosition.getEntryId()
!= -1) {
- searchPosition = prevPosition;
- find();
- return;
- }
- }
- }
-
- // If don't find any entry, return the last matched position
- log.warn("[{}] Failed to find next valid entry. Returning last
matched position: {}", ledger.getName(),
- lastMatchedPosition);
- callback.findEntryComplete(lastMatchedPosition,
OpFindNewest.this.ctx);
- return;
- } catch (Exception e) {
- callback.findEntryFailed(
- new ManagedLedgerException("Failed to skip non-recoverable
data during search position", e),
- Optional.ofNullable(searchPosition),
OpFindNewest.this.ctx);
- return;
- }
- }
-
callback.findEntryFailed(exception,
Optional.ofNullable(searchPosition), OpFindNewest.this.ctx);
}
- private Position findPreviousValidPosition(Position searchPosition,
ManagedLedgerException exception) {
- Position prevPosition;
- if (exception instanceof
ManagedLedgerException.LedgerNotExistException) {
- prevPosition =
-
ledger.getPreviousPosition(PositionFactory.create(searchPosition.getLedgerId(),
-1L));
- } else {
- prevPosition = ledger.getPreviousPosition(searchPosition);
- }
- if (prevPosition.getEntryId() != -1) {
- var minPosition = ledger.getPositionAfterN(startPosition, min,
PositionBound.startExcluded);
- if (minPosition.compareTo(prevPosition) > 0) {
- // If the previous position is out of the min position, an
invalid position is returned
- prevPosition = null;
- }
- }
- return prevPosition;
- }
-
- private Position findNextValidPosition(Position searchPosition, Exception
exception) {
- Position nextPosition = null;
- if (exception instanceof
ManagedLedgerException.LedgerNotExistException) {
- Long nextLedgerId =
ledger.getNextValidLedger(searchPosition.getLedgerId());
- if (nextLedgerId != null) {
- Boolean nonEmptyLedger =
ledger.getOptionalLedgerInfo(nextLedgerId)
- .map(ledgerInfo -> ledgerInfo.getEntries() > 0)
- .orElse(false);
- if (nonEmptyLedger) {
- nextPosition = PositionFactory.create(nextLedgerId, 0);
- }
- }
- } else {
- nextPosition = ledger.getNextValidPosition(searchPosition);
- }
-
- if (nextPosition != null) {
- var maxPosition = ledger.getPositionAfterN(startPosition, max,
PositionBound.startExcluded);
- if (maxPosition.compareTo(nextPosition) < 0) {
- // If the next position is out of the max position, an invalid
position is returned
- nextPosition = null;
- }
- }
- return nextPosition;
- }
-
public void find() {
if (cursor != null ? cursor.hasMoreEntries(searchPosition) :
ledger.hasMoreEntries(searchPosition)) {
ledger.asyncReadEntry(searchPosition, this, null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 0e150e69cc6..6518b61d479 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
-import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -65,9 +64,6 @@ import
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonito
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -82,7 +78,6 @@ import org.testng.Assert;
import org.testng.annotations.Test;
@Test(groups = "broker")
-@Slf4j
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
public static byte[] createMessageWrittenToLedger(String msg) {
@@ -146,12 +141,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
}
CompletableFuture<Void> findMessage(final Result result, final
ManagedCursor c1, final long timestamp) {
- return findMessage(result, c1, timestamp, 0);
- }
-
- CompletableFuture<Void> findMessage(final Result result, final
ManagedCursor c1, final long timestamp,
- int
ledgerCloseTimestampMaxClockSkewMillis) {
- PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1,
ledgerCloseTimestampMaxClockSkewMillis);
+ PersistentMessageFinder messageFinder = new
PersistentMessageFinder("topicname", c1, 0);
final CompletableFuture<Void> future = new CompletableFuture<>();
messageFinder.findMessages(timestamp, new
AsyncCallbacks.FindEntryCallback() {
@@ -450,153 +440,6 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
}
- public void testFindMessageWithTimestampAutoSkipNonRecoverable() throws
Exception {
-
- final String ledgerAndCursorName =
"testFindMessageWithTimestampAutoSkipNonRecoverable";
- final int entriesPerLedger = 5;
- final int totalEntries = 50;
-
- ManagedLedgerConfig config = new ManagedLedgerConfig();
- config.setRetentionSizeInMB(10);
- config.setMaxEntriesPerLedger(entriesPerLedger);
- config.setRetentionTime(1, TimeUnit.HOURS);
- config.setAutoSkipNonRecoverableData(true);
- ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
-
- long initTimeMillis = System.currentTimeMillis();
- for (int i = 0; i < totalEntries; i++) {
- ledger.addEntry(createMessageWrittenToLedger("msg" + i,
initTimeMillis + i));
- }
- // {0,1,2,3,4} (0) 3 x
- // {5,6,7,8,9} (1) 4
- // {10,11,12,13,14} (2) 5
- // {15,16,17,18,19} (3) 6
- // {20,21,22,23,24} (4) 7 x
- // {25,26,27,28,29} (5) 8 x
- // {30,31,32,33,34} (6) 9
- // {35,36,37,38,39} (7) 10
- // {40,41,42,43,44} (8) 11
- // {45,46,47,48,49} (9) 12 x
- Awaitility.await().untilAsserted(() ->
- assertEquals(ledger.getState(),
ManagedLedgerImpl.State.LedgerOpened));
-
- List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
- LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
- // The `lastLedgerInfo` should be newly opened, and it does not
contain any entries.
- // Please refer to: https://github.com/apache/pulsar/pull/22034
- assertEquals(lastLedgerInfo.getEntries(), 0);
- assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
-
- bkc.deleteLedger(ledgers.get(0).getLedgerId());
- bkc.deleteLedger(ledgers.get(4).getLedgerId());
- bkc.deleteLedger(ledgers.get(5).getLedgerId());
- bkc.deleteLedger(ledgers.get(9).getLedgerId());
-
- MessageId messageId = findMessageIdByPublishTime(initTimeMillis + 17,
ledger).join();
- log.info("messageId: {}", messageId);
- assertEquals(messageId, new
MessageIdImpl(ledgers.get(3).getLedgerId(), 2, -1));
-
- messageId = findMessageIdByPublishTime(initTimeMillis + 27,
ledger).join();
- log.info("messageId: {}", messageId);
- assertEquals(messageId, new
MessageIdImpl(ledgers.get(4).getLedgerId(), 0, -1));
-
- messageId = findMessageIdByPublishTime(initTimeMillis + 43,
ledger).join();
- log.info("messageId: {}", messageId);
- assertEquals(messageId, new
MessageIdImpl(ledgers.get(8).getLedgerId(), 3, -1));
-
- messageId = findMessageIdByPublishTime(initTimeMillis + 48,
ledger).join();
- log.info("messageId: {}", messageId);
- assertEquals(messageId, new
MessageIdImpl(ledgers.get(9).getLedgerId(), 0, -1));
-
- ledger.close();
- factory.shutdown();
- }
-
- public void testFindMessageByCursorWithTimestampAutoSkipNonRecoverable()
throws Exception {
-
- final String ledgerAndCursorName =
"testFindMessageByCursorWithTimestampAutoSkipNonRecoverable";
- final int entriesPerLedger = 5;
- final int totalEntries = 50;
-
- ManagedLedgerConfig config = new ManagedLedgerConfig();
- config.setRetentionSizeInMB(10);
- config.setMaxEntriesPerLedger(entriesPerLedger);
- config.setRetentionTime(1, TimeUnit.HOURS);
- config.setAutoSkipNonRecoverableData(true);
- ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
- ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
-
- long initTimeMillis = System.currentTimeMillis();
- for (int i = 0; i < totalEntries; i++) {
- ledger.addEntry(createMessageWrittenToLedger("msg" + i,
initTimeMillis + i));
- }
- // {0,1,2,3,4} (0) 3 x
- // {5,6,7,8,9} (1) 4
- // {10,11,12,13,14} (2) 5
- // {15,16,17,18,19} (3) 6
- // {20,21,22,23,24} (4) 7 x
- // {25,26,27,28,29} (5) 8 x
- // {30,31,32,33,34} (6) 9
- // {35,36,37,38,39} (7) 10
- // {40,41,42,43,44} (8) 11
- // {45,46,47,48,49} (9) 12 x
- Awaitility.await().untilAsserted(() ->
- assertEquals(ledger.getState(),
ManagedLedgerImpl.State.LedgerOpened));
-
- List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
- LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
- // The `lastLedgerInfo` should be newly opened, and it does not
contain any entries.
- // Please refer to: https://github.com/apache/pulsar/pull/22034
- assertEquals(lastLedgerInfo.getEntries(), 0);
- assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
-
- bkc.deleteLedger(ledgers.get(0).getLedgerId());
- bkc.deleteLedger(ledgers.get(4).getLedgerId());
- bkc.deleteLedger(ledgers.get(5).getLedgerId());
- bkc.deleteLedger(ledgers.get(9).getLedgerId());
- Result result = new Result();
-
- findMessage(result, cursor, initTimeMillis + 17, -1).join();
- log.info("position: {}", result.position);
- assertNull(result.exception);
- assertEquals(result.position,
PositionFactory.create(ledgers.get(3).getLedgerId(), 1));
-
- result = new Result();
- findMessage(result, cursor, initTimeMillis + 27, -1).join();
- log.info("position: {}", result.position);
- assertNull(result.exception);
- assertEquals(result.position,
PositionFactory.create(ledgers.get(3).getLedgerId(), 4));
-
- result = new Result();
- findMessage(result, cursor, initTimeMillis + 43, -1).join();
- log.info("position: {}", result.position);
- assertNull(result.exception);
- assertEquals(result.position,
PositionFactory.create(ledgers.get(8).getLedgerId(), 2));
-
- ledger.close();
- factory.shutdown();
- }
-
- private CompletableFuture<MessageId> findMessageIdByPublishTime(long
timestamp, ManagedLedger managedLedger) {
- return managedLedger.asyncFindPosition(entry -> {
- try {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
- return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp,
timestamp);
- } catch (Exception e) {
- log.error("Error deserializing message for message position
find", e);
- } finally {
- entry.release();
- }
- return false;
- }).thenApply(position -> {
- if (position == null) {
- return null;
- } else {
- return new MessageIdImpl(position.getLedgerId(),
position.getEntryId(), -1);
- }
- });
- }
-
@Test
public void testIncorrectClientClock() throws Exception {
final String ledgerAndCursorName = "testIncorrectClientClock";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index c8263534435..f5f16eaf223 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -47,7 +46,6 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
-import org.apache.commons.lang3.ArraySorter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -90,7 +88,6 @@ public class SubscriptionSeekTest extends BrokerTestBase {
conf.setManagedLedgerMaxEntriesPerLedger(10);
conf.setDefaultRetentionSizeInMB(100);
conf.setDefaultRetentionTimeInMinutes(100);
- conf.setAutoSkipNonRecoverableData(true);
super.baseSetup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}
@@ -567,89 +564,6 @@ public class SubscriptionSeekTest extends BrokerTestBase {
}
}
- @Test(timeOut = 30_000)
- public void testSeekByTimestampWithSkipNonRecoverableData() throws
Exception {
- String topicName =
"persistent://prop/use/ns-abc/testSeekByTimestampWithSkipNonRecoverableData";
- admin.topics().createNonPartitionedTopic(topicName);
- admin.topics().createSubscription(topicName, "my-sub",
MessageId.earliest);
-
- @Cleanup
- Producer<String> producer =
-
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
- for (int i = 0; i < 55; i++) {
- producer.send(("message-" + i));
- Thread.sleep(10);
- }
-
- Map<Long, MessageIdImpl> timestampToMessageId = new HashMap<>();
- List<Long> ledgerIds = new ArrayList<>();
- @Cleanup
- Reader<String> reader =
-
pulsarClient.newReader(Schema.STRING).topic(topicName).startMessageId(MessageId.earliest).create();
- while (reader.hasMessageAvailable()) {
- Message<String> message = reader.readNext();
- log.info("message: {} ----- {}", message.getMessageId(),
message.getPublishTime());
- timestampToMessageId.put(message.getPublishTime(), (MessageIdImpl)
message.getMessageId());
- long ledgerId = ((MessageIdImpl)
message.getMessageId()).getLedgerId();
- if (!ledgerIds.contains(ledgerId)) {
- ledgerIds.add(ledgerId);
- }
- }
-
- Assert.assertEquals(timestampToMessageId.size(), 55);
-
- LinkedHashSet<Long> deletedLedgerIds = new LinkedHashSet<>();
- deletedLedgerIds.add(ledgerIds.get(0));
- deletedLedgerIds.add(ledgerIds.get(ledgerIds.size() - 1));
- int mid = ledgerIds.size() / 2;
- deletedLedgerIds.add(ledgerIds.get(mid));
-
- for (Long deletedLedgerId : deletedLedgerIds) {
- pulsar.getBookKeeperClient().deleteLedger(deletedLedgerId);
- log.info("delete ledger: {}", deletedLedgerId);
- }
-
- admin.topics().unload(topicName);
-
- @Cleanup
- org.apache.pulsar.client.api.Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
- .receiverQueueSize(0)
- .topic(topicName).subscriptionName("my-sub")
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
- long[] timestamps =
timestampToMessageId.keySet().stream().mapToLong(Long::longValue).toArray();
- ArraySorter.sort(timestamps);
-
- timestampToMessageId.values().removeIf(messageId ->
deletedLedgerIds.contains(messageId.getLedgerId()));
-
- final int lastNonRecoverableEntryNums = 5;
-
- for (int i = 0; i < timestamps.length - lastNonRecoverableEntryNums;
i++) {
- MessageIdImpl nextValidMessageId =
timestampToMessageId.get(timestamps[i]);
- int l = i;
- while (nextValidMessageId == null) {
- nextValidMessageId = timestampToMessageId.get(timestamps[l++]);
- }
-
- consumer.seek(timestamps[i]);
- Message<String> receive = consumer.receive();
-
- MessageIdImpl msgId = (MessageIdImpl) receive.getMessageId();
- Assert.assertEquals(msgId.getLedgerId(),
nextValidMessageId.getLedgerId());
- Assert.assertEquals(msgId.getEntryId(),
nextValidMessageId.getEntryId());
- }
-
- MessageIdImpl lastMessageId = (MessageIdImpl)
producer.send(("message-last"));
-
- for (int i = timestamps.length - lastNonRecoverableEntryNums; i <
timestamps.length; i++) {
- consumer.seek(timestamps[i]);
- Message<String> receive = consumer.receive();
-
- MessageIdImpl msgId = (MessageIdImpl) receive.getMessageId();
- Assert.assertEquals(msgId.getLedgerId(),
lastMessageId.getLedgerId());
- Assert.assertEquals(msgId.getEntryId(),
lastMessageId.getEntryId());
- }
- }
-
@Test(timeOut = 30_000)
public void testSeekByTimestampWithLedgerTrim() throws Exception {
String topicName =
"persistent://prop/use/ns-abc/testSeekByTimestampWithLedgerTrim";