This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new d558b1df91e Revert "[fix][ml] Enhance OpFindNewest to support skip 
non-recoverable data (#24441)"
d558b1df91e is described below

commit d558b1df91ec5494317593a5784914774aec53dc
Author: coderzc <[email protected]>
AuthorDate: Wed Jul 2 10:29:40 2025 +0800

    Revert "[fix][ml] Enhance OpFindNewest to support skip non-recoverable data 
(#24441)"
    
    This reverts commit 8f348a5f7ed06a8baa9f60baeab69971464f0489.
---
 .../bookkeeper/mledger/impl/OpFindNewest.java      | 117 +--------------
 .../service/PersistentMessageFinderTest.java       | 159 +--------------------
 .../broker/service/SubscriptionSeekTest.java       |  86 -----------
 3 files changed, 5 insertions(+), 357 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index befc40188a3..900af9322c7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.collect.Range;
 import java.util.Optional;
 import java.util.function.Predicate;
 import lombok.extern.slf4j.Slf4j;
@@ -29,7 +27,6 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
-import org.apache.bookkeeper.mledger.PositionFactory;
 
 @Slf4j
 class OpFindNewest implements ReadEntryCallback {
@@ -47,7 +44,6 @@ class OpFindNewest implements ReadEntryCallback {
     PositionImpl searchPosition;
     long min;
     long max;
-    long mid;
     Position lastMatchedPosition = null;
     State state;
 
@@ -62,7 +58,6 @@ class OpFindNewest implements ReadEntryCallback {
 
         this.min = 0;
         this.max = numberOfEntries;
-        this.mid = mid();
 
         this.searchPosition = startPosition;
         this.state = State.checkFirst;
@@ -118,8 +113,7 @@ class OpFindNewest implements ReadEntryCallback {
             } else {
                 // start binary search
                 state = State.searching;
-                this.mid = mid();
-                searchPosition = ledger.getPositionAfterN(startPosition, 
this.mid, PositionBound.startExcluded);
+                searchPosition = ledger.getPositionAfterN(startPosition, 
mid(), PositionBound.startExcluded);
                 find();
             }
             break;
@@ -127,129 +121,26 @@ class OpFindNewest implements ReadEntryCallback {
             if (condition.test(entry)) {
                 // mid - last
                 lastMatchedPosition = position;
-                min = mid;
+                min = mid();
             } else {
                 // start - mid
-                max = mid - 1;
+                max = mid() - 1;
             }
-            this.mid = mid();
 
             if (max <= min) {
                 callback.findEntryComplete(lastMatchedPosition, 
OpFindNewest.this.ctx);
                 return;
             }
-            searchPosition = ledger.getPositionAfterN(startPosition, this.mid, 
PositionBound.startExcluded);
+            searchPosition = ledger.getPositionAfterN(startPosition, mid(), 
PositionBound.startExcluded);
             find();
         }
     }
 
     @Override
     public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-        if (exception instanceof 
ManagedLedgerException.NonRecoverableLedgerException
-            && ledger.getConfig().isAutoSkipNonRecoverableData()) {
-            try {
-                log.info("[{}] Ledger {} is not recoverable, skip 
non-recoverable data, state:{}", ledger.getName(),
-                    searchPosition, state);
-                checkArgument(state == State.checkFirst || state == 
State.checkLast || state == State.searching);
-                if (state == State.checkFirst) {
-                    // If we failed to read the first entry, try next valid 
position
-                    Position nextPosition = 
findNextValidPosition(searchPosition, exception);
-                    if (nextPosition != null && nextPosition.getEntryId() != 
-1) {
-                        long numberOfEntries =
-                            
ledger.getNumberOfEntries(Range.closedOpen(searchPosition, nextPosition));
-                        searchPosition = nextPosition;
-                        min += numberOfEntries;
-                        find();
-                        return;
-                    }
-                } else if (state == State.checkLast) {
-                    Position prevPosition = 
findPreviousValidPosition(searchPosition, exception);
-                    if (prevPosition != null && prevPosition.getEntryId() != 
-1) {
-                        long numberOfEntries =
-                            
ledger.getNumberOfEntries(Range.openClosed(prevPosition, searchPosition));
-                        searchPosition = prevPosition;
-                        max -= numberOfEntries;
-                        find();
-                        return;
-                    }
-                } else if (state == State.searching) {
-                    // In searching state, if we failed to read the mid entry, 
try next valid position
-                    Position nextPosition = 
findNextValidPosition(searchPosition, exception);
-                    if (nextPosition != null && nextPosition.getEntryId() != 
-1) {
-                        searchPosition = nextPosition;
-                        find();
-                        return;
-                    } else {
-                        // If we can't find next valid position, try previous 
valid position
-                        Position prevPosition = 
findPreviousValidPosition(searchPosition, exception);
-                        if (prevPosition != null && prevPosition.getEntryId() 
!= -1) {
-                            searchPosition = prevPosition;
-                            find();
-                            return;
-                        }
-                    }
-                }
-
-                // If don't find any entry, return the last matched position
-                log.warn("[{}] Failed to find next valid entry. Returning last 
matched position: {}", ledger.getName(),
-                    lastMatchedPosition);
-                callback.findEntryComplete(lastMatchedPosition, 
OpFindNewest.this.ctx);
-                return;
-            } catch (Exception e) {
-                callback.findEntryFailed(
-                    new ManagedLedgerException("Failed to skip non-recoverable 
data during search position", e),
-                    Optional.ofNullable(searchPosition), 
OpFindNewest.this.ctx);
-                return;
-            }
-        }
-
         callback.findEntryFailed(exception, 
Optional.ofNullable(searchPosition), OpFindNewest.this.ctx);
     }
 
-    private Position findPreviousValidPosition(Position searchPosition, 
ManagedLedgerException exception) {
-        Position prevPosition;
-        if (exception instanceof 
ManagedLedgerException.LedgerNotExistException) {
-            prevPosition =
-                
ledger.getPreviousPosition(PositionFactory.create(searchPosition.getLedgerId(), 
-1L));
-        } else {
-            prevPosition = ledger.getPreviousPosition(searchPosition);
-        }
-        if (prevPosition.getEntryId() != -1) {
-            var minPosition = ledger.getPositionAfterN(startPosition, min, 
PositionBound.startExcluded);
-            if (minPosition.compareTo(prevPosition) > 0) {
-                // If the previous position is out of the min position, an 
invalid position is returned
-                prevPosition = null;
-            }
-        }
-        return prevPosition;
-    }
-
-    private Position findNextValidPosition(Position searchPosition, Exception 
exception) {
-        Position nextPosition = null;
-        if (exception instanceof 
ManagedLedgerException.LedgerNotExistException) {
-            Long nextLedgerId = 
ledger.getNextValidLedger(searchPosition.getLedgerId());
-            if (nextLedgerId != null) {
-                Boolean nonEmptyLedger = 
ledger.getOptionalLedgerInfo(nextLedgerId)
-                    .map(ledgerInfo -> ledgerInfo.getEntries() > 0)
-                    .orElse(false);
-                if (nonEmptyLedger) {
-                    nextPosition = PositionFactory.create(nextLedgerId, 0);
-                }
-            }
-        } else {
-            nextPosition = ledger.getNextValidPosition(searchPosition);
-        }
-
-        if (nextPosition != null) {
-            var maxPosition = ledger.getPositionAfterN(startPosition, max, 
PositionBound.startExcluded);
-            if (maxPosition.compareTo(nextPosition) < 0) {
-                // If the next position is out of the max position, an invalid 
position is returned
-                nextPosition = null;
-            }
-        }
-        return nextPosition;
-    }
-
     public void find() {
         if (cursor != null ? cursor.hasMoreEntries(searchPosition) : 
ledger.hasMoreEntries(searchPosition)) {
             ledger.asyncReadEntry(searchPosition, this, null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 0e150e69cc6..6518b61d479 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -65,9 +64,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonito
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -82,7 +78,6 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-@Slf4j
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
 
     public static byte[] createMessageWrittenToLedger(String msg) {
@@ -146,12 +141,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
     }
 
     CompletableFuture<Void> findMessage(final Result result, final 
ManagedCursor c1, final long timestamp) {
-        return findMessage(result, c1, timestamp, 0);
-    }
-
-    CompletableFuture<Void> findMessage(final Result result, final 
ManagedCursor c1, final long timestamp,
-                                        int 
ledgerCloseTimestampMaxClockSkewMillis) {
-        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1, 
ledgerCloseTimestampMaxClockSkewMillis);
+        PersistentMessageFinder messageFinder = new 
PersistentMessageFinder("topicname", c1, 0);
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
         messageFinder.findMessages(timestamp, new 
AsyncCallbacks.FindEntryCallback() {
@@ -450,153 +440,6 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
     }
 
-    public void testFindMessageWithTimestampAutoSkipNonRecoverable() throws 
Exception {
-
-        final String ledgerAndCursorName = 
"testFindMessageWithTimestampAutoSkipNonRecoverable";
-        final int entriesPerLedger = 5;
-        final int totalEntries = 50;
-
-        ManagedLedgerConfig config = new ManagedLedgerConfig();
-        config.setRetentionSizeInMB(10);
-        config.setMaxEntriesPerLedger(entriesPerLedger);
-        config.setRetentionTime(1, TimeUnit.HOURS);
-        config.setAutoSkipNonRecoverableData(true);
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
-
-        long initTimeMillis = System.currentTimeMillis();
-        for (int i = 0; i < totalEntries; i++) {
-            ledger.addEntry(createMessageWrittenToLedger("msg" + i, 
initTimeMillis + i));
-        }
-        // {0,1,2,3,4} (0) 3 x
-        // {5,6,7,8,9} (1) 4
-        // {10,11,12,13,14} (2) 5
-        // {15,16,17,18,19} (3) 6
-        // {20,21,22,23,24} (4) 7 x
-        // {25,26,27,28,29} (5) 8 x
-        // {30,31,32,33,34} (6) 9
-        // {35,36,37,38,39} (7) 10
-        // {40,41,42,43,44} (8) 11
-        // {45,46,47,48,49} (9) 12 x
-        Awaitility.await().untilAsserted(() ->
-                assertEquals(ledger.getState(), 
ManagedLedgerImpl.State.LedgerOpened));
-
-        List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
-        LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
-        // The `lastLedgerInfo` should be newly opened, and it does not 
contain any entries.
-        // Please refer to: https://github.com/apache/pulsar/pull/22034
-        assertEquals(lastLedgerInfo.getEntries(), 0);
-        assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
-
-        bkc.deleteLedger(ledgers.get(0).getLedgerId());
-        bkc.deleteLedger(ledgers.get(4).getLedgerId());
-        bkc.deleteLedger(ledgers.get(5).getLedgerId());
-        bkc.deleteLedger(ledgers.get(9).getLedgerId());
-
-        MessageId messageId = findMessageIdByPublishTime(initTimeMillis + 17, 
ledger).join();
-        log.info("messageId: {}", messageId);
-        assertEquals(messageId, new 
MessageIdImpl(ledgers.get(3).getLedgerId(), 2, -1));
-
-        messageId = findMessageIdByPublishTime(initTimeMillis + 27, 
ledger).join();
-        log.info("messageId: {}", messageId);
-        assertEquals(messageId, new 
MessageIdImpl(ledgers.get(4).getLedgerId(), 0, -1));
-
-        messageId = findMessageIdByPublishTime(initTimeMillis + 43, 
ledger).join();
-        log.info("messageId: {}", messageId);
-        assertEquals(messageId, new 
MessageIdImpl(ledgers.get(8).getLedgerId(), 3, -1));
-
-        messageId = findMessageIdByPublishTime(initTimeMillis + 48, 
ledger).join();
-        log.info("messageId: {}", messageId);
-        assertEquals(messageId, new 
MessageIdImpl(ledgers.get(9).getLedgerId(), 0, -1));
-
-        ledger.close();
-        factory.shutdown();
-    }
-
-    public void testFindMessageByCursorWithTimestampAutoSkipNonRecoverable() 
throws Exception {
-
-        final String ledgerAndCursorName = 
"testFindMessageByCursorWithTimestampAutoSkipNonRecoverable";
-        final int entriesPerLedger = 5;
-        final int totalEntries = 50;
-
-        ManagedLedgerConfig config = new ManagedLedgerConfig();
-        config.setRetentionSizeInMB(10);
-        config.setMaxEntriesPerLedger(entriesPerLedger);
-        config.setRetentionTime(1, TimeUnit.HOURS);
-        config.setAutoSkipNonRecoverableData(true);
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
-        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
-
-        long initTimeMillis = System.currentTimeMillis();
-        for (int i = 0; i < totalEntries; i++) {
-            ledger.addEntry(createMessageWrittenToLedger("msg" + i, 
initTimeMillis + i));
-        }
-        // {0,1,2,3,4} (0) 3 x
-        // {5,6,7,8,9} (1) 4
-        // {10,11,12,13,14} (2) 5
-        // {15,16,17,18,19} (3) 6
-        // {20,21,22,23,24} (4) 7 x
-        // {25,26,27,28,29} (5) 8 x
-        // {30,31,32,33,34} (6) 9
-        // {35,36,37,38,39} (7) 10
-        // {40,41,42,43,44} (8) 11
-        // {45,46,47,48,49} (9) 12 x
-        Awaitility.await().untilAsserted(() ->
-                assertEquals(ledger.getState(), 
ManagedLedgerImpl.State.LedgerOpened));
-
-        List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
-        LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
-        // The `lastLedgerInfo` should be newly opened, and it does not 
contain any entries.
-        // Please refer to: https://github.com/apache/pulsar/pull/22034
-        assertEquals(lastLedgerInfo.getEntries(), 0);
-        assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
-
-        bkc.deleteLedger(ledgers.get(0).getLedgerId());
-        bkc.deleteLedger(ledgers.get(4).getLedgerId());
-        bkc.deleteLedger(ledgers.get(5).getLedgerId());
-        bkc.deleteLedger(ledgers.get(9).getLedgerId());
-        Result result = new Result();
-
-        findMessage(result, cursor, initTimeMillis + 17, -1).join();
-        log.info("position: {}", result.position);
-        assertNull(result.exception);
-        assertEquals(result.position, 
PositionFactory.create(ledgers.get(3).getLedgerId(), 1));
-
-        result = new Result();
-        findMessage(result, cursor, initTimeMillis + 27, -1).join();
-        log.info("position: {}", result.position);
-        assertNull(result.exception);
-        assertEquals(result.position, 
PositionFactory.create(ledgers.get(3).getLedgerId(), 4));
-
-        result = new Result();
-        findMessage(result, cursor, initTimeMillis + 43, -1).join();
-        log.info("position: {}", result.position);
-        assertNull(result.exception);
-        assertEquals(result.position, 
PositionFactory.create(ledgers.get(8).getLedgerId(), 2));
-
-        ledger.close();
-        factory.shutdown();
-    }
-
-    private CompletableFuture<MessageId> findMessageIdByPublishTime(long 
timestamp, ManagedLedger managedLedger) {
-        return managedLedger.asyncFindPosition(entry -> {
-            try {
-                long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
-                return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, 
timestamp);
-            } catch (Exception e) {
-                log.error("Error deserializing message for message position 
find", e);
-            } finally {
-                entry.release();
-            }
-            return false;
-        }).thenApply(position -> {
-            if (position == null) {
-                return null;
-            } else {
-                return new MessageIdImpl(position.getLedgerId(), 
position.getEntryId(), -1);
-            }
-        });
-    }
-
     @Test
     public void testIncorrectClientClock() throws Exception {
         final String ledgerAndCursorName = "testIncorrectClientClock";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index c8263534435..f5f16eaf223 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -47,7 +46,6 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
-import org.apache.commons.lang3.ArraySorter;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -90,7 +88,6 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         conf.setManagedLedgerMaxEntriesPerLedger(10);
         conf.setDefaultRetentionSizeInMB(100);
         conf.setDefaultRetentionTimeInMinutes(100);
-        conf.setAutoSkipNonRecoverableData(true);
         super.baseSetup();
         conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
     }
@@ -567,89 +564,6 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         }
     }
 
-    @Test(timeOut = 30_000)
-    public void testSeekByTimestampWithSkipNonRecoverableData() throws 
Exception {
-        String topicName = 
"persistent://prop/use/ns-abc/testSeekByTimestampWithSkipNonRecoverableData";
-        admin.topics().createNonPartitionedTopic(topicName);
-        admin.topics().createSubscription(topicName, "my-sub", 
MessageId.earliest);
-
-        @Cleanup
-        Producer<String> producer =
-            
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
-        for (int i = 0; i < 55; i++) {
-            producer.send(("message-" + i));
-            Thread.sleep(10);
-        }
-
-        Map<Long, MessageIdImpl> timestampToMessageId = new HashMap<>();
-        List<Long> ledgerIds = new ArrayList<>();
-        @Cleanup
-        Reader<String> reader =
-            
pulsarClient.newReader(Schema.STRING).topic(topicName).startMessageId(MessageId.earliest).create();
-        while (reader.hasMessageAvailable()) {
-            Message<String> message = reader.readNext();
-            log.info("message: {} ----- {}", message.getMessageId(), 
message.getPublishTime());
-            timestampToMessageId.put(message.getPublishTime(), (MessageIdImpl) 
message.getMessageId());
-            long ledgerId = ((MessageIdImpl) 
message.getMessageId()).getLedgerId();
-            if (!ledgerIds.contains(ledgerId)) {
-                ledgerIds.add(ledgerId);
-            }
-        }
-
-        Assert.assertEquals(timestampToMessageId.size(), 55);
-
-        LinkedHashSet<Long> deletedLedgerIds = new LinkedHashSet<>();
-        deletedLedgerIds.add(ledgerIds.get(0));
-        deletedLedgerIds.add(ledgerIds.get(ledgerIds.size() - 1));
-        int mid = ledgerIds.size() / 2;
-        deletedLedgerIds.add(ledgerIds.get(mid));
-
-        for (Long deletedLedgerId : deletedLedgerIds) {
-            pulsar.getBookKeeperClient().deleteLedger(deletedLedgerId);
-            log.info("delete ledger: {}", deletedLedgerId);
-        }
-
-        admin.topics().unload(topicName);
-
-        @Cleanup
-        org.apache.pulsar.client.api.Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
-            .receiverQueueSize(0)
-            .topic(topicName).subscriptionName("my-sub")
-            
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
-        long[] timestamps = 
timestampToMessageId.keySet().stream().mapToLong(Long::longValue).toArray();
-        ArraySorter.sort(timestamps);
-
-        timestampToMessageId.values().removeIf(messageId -> 
deletedLedgerIds.contains(messageId.getLedgerId()));
-
-        final int lastNonRecoverableEntryNums = 5;
-
-        for (int i = 0; i < timestamps.length - lastNonRecoverableEntryNums; 
i++) {
-            MessageIdImpl nextValidMessageId = 
timestampToMessageId.get(timestamps[i]);
-            int l = i;
-            while (nextValidMessageId == null) {
-                nextValidMessageId = timestampToMessageId.get(timestamps[l++]);
-            }
-
-            consumer.seek(timestamps[i]);
-            Message<String> receive = consumer.receive();
-
-            MessageIdImpl msgId = (MessageIdImpl) receive.getMessageId();
-            Assert.assertEquals(msgId.getLedgerId(), 
nextValidMessageId.getLedgerId());
-            Assert.assertEquals(msgId.getEntryId(), 
nextValidMessageId.getEntryId());
-        }
-
-        MessageIdImpl lastMessageId = (MessageIdImpl) 
producer.send(("message-last"));
-
-        for (int i = timestamps.length - lastNonRecoverableEntryNums; i < 
timestamps.length; i++) {
-            consumer.seek(timestamps[i]);
-            Message<String> receive = consumer.receive();
-
-            MessageIdImpl msgId = (MessageIdImpl) receive.getMessageId();
-            Assert.assertEquals(msgId.getLedgerId(), 
lastMessageId.getLedgerId());
-            Assert.assertEquals(msgId.getEntryId(), 
lastMessageId.getEntryId());
-        }
-    }
-
     @Test(timeOut = 30_000)
     public void testSeekByTimestampWithLedgerTrim() throws Exception {
         String topicName = 
"persistent://prop/use/ns-abc/testSeekByTimestampWithLedgerTrim";

Reply via email to