BewareMyPower commented on code in PR #25148:
URL: https://github.com/apache/pulsar/pull/25148#discussion_r2696593233
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java:
##########
@@ -457,6 +457,56 @@ public void run() {
}
}
+ @Test
+ public void testConcurrentResetCursorByTimestamp() throws Exception {
Review Comment:
This test can pass even without the change of this PR.
> fence reset-cursor-by-timestamp to avoid concurrent resets
Is the fencing mechanism change necessary? Even if so, it would be better to
open a separated PR for it.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java:
##########
@@ -428,7 +428,7 @@ public void testConcurrentResetCursor() throws Exception {
messageIds.add(msgId);
}
- List<PulsarAdminException> exceptions = new ArrayList<>();
+ List<PulsarAdminException> exceptions = new
java.util.concurrent.CopyOnWriteArrayList<>();
Review Comment:
Same long type
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java:
##########
@@ -183,6 +185,59 @@ public void
testPartialCachingWithMultipleEntriesInCacheWhilePartialReadFails()
verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L),
eq(99L), any(), any(), any());
}
+ @Test
+ public void testReadFromStorageRetriesWhenHandleClosed() {
+ RangeEntryCacheManagerImpl mockEntryCacheManager =
mock(RangeEntryCacheManagerImpl.class);
+ ManagedLedgerFactoryMBeanImpl mlFactoryMBean =
mock(ManagedLedgerFactoryMBeanImpl.class);
+
when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean);
+ ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class);
+ ManagedLedgerMBeanImpl mockManagedLedgerMBean =
mock(ManagedLedgerMBeanImpl.class);
+ when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean);
+ when(mockManagedLedger.getName()).thenReturn("testManagedLedger");
+
when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class));
+
when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn(Optional.empty());
+ RangeCacheRemovalQueue mockRangeCacheRemovalQueue =
mock(RangeCacheRemovalQueue.class);
+ when(mockRangeCacheRemovalQueue.addEntry(any())).thenReturn(true);
+ InflightReadsLimiter inflightReadsLimiter =
mock(InflightReadsLimiter.class);
+
when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter);
+ doAnswer(invocation -> {
+ long permits = invocation.getArgument(0);
+ InflightReadsLimiter.Handle handle = new
InflightReadsLimiter.Handle(permits, System.currentTimeMillis(),
+ true);
+ return Optional.of(handle);
+ }).when(inflightReadsLimiter).acquire(anyLong(), any());
+
+ RangeEntryCacheImpl cache = new
RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false,
+ mockRangeCacheRemovalQueue, EntryLengthFunction.DEFAULT,
mock(PendingReadsManager.class));
+
+ ReadHandle readHandle = mock(ReadHandle.class);
+ when(readHandle.getId()).thenReturn(1L);
+
when(mockManagedLedger.reopenReadHandle(1L)).thenReturn(CompletableFuture.completedFuture(readHandle));
+
+ LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(1L, 0L, 1,
Unpooled.wrappedBuffer(new byte[] {1}));
+ org.apache.bookkeeper.client.api.LedgerEntries ledgerEntries =
+ mock(org.apache.bookkeeper.client.api.LedgerEntries.class);
+ List<LedgerEntry> entryList = List.of((LedgerEntry) ledgerEntry);
+ when(ledgerEntries.iterator()).thenReturn(entryList.iterator());
+
+ java.util.concurrent.atomic.AtomicInteger readAttempts = new
java.util.concurrent.atomic.AtomicInteger();
Review Comment:
I see some types include full package path, could you just import them?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]