zymap commented on a change in pull request #12123:
URL: https://github.com/apache/pulsar/pull/12123#discussion_r717214760



##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -94,56 +104,70 @@ public LedgerMetadata getLedgerMetadata() {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, 
lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
+            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            try {
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
                     promise.completeExceptionally(new 
BKException.BKIncorrectParameterException());
                     return;
                 }
                 long entriesToRead = (lastEntry - firstEntry) + 1;
-                List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
                 long nextExpectedId = firstEntry;
-                try {
-                    while (entriesToRead > 0) {
-                        int length = dataStream.readInt();
-                        if (length < 0) { // hit padding or new block
-                            
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        }
-                        long entryId = dataStream.readLong();
-
-                        if (entryId == nextExpectedId) {
-                            ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
-                            entries.add(LedgerEntryImpl.create(ledgerId, 
entryId, length, buf));
-                            int toWrite = length;
-                            while (toWrite > 0) {
-                                toWrite -= buf.writeBytes(dataStream, toWrite);
-                            }
-                            entriesToRead--;
-                            nextExpectedId++;
-                        } else if (entryId > nextExpectedId) {
-                            
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId < nextExpectedId
-                                && 
!index.getIndexEntryForEntry(nextExpectedId).equals(
-                                index.getIndexEntryForEntry(entryId)))  {
-                            
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId > lastEntry) {
-                            log.info("Expected to read {}, but read {}, which 
is greater than last entry {}",
-                                     nextExpectedId, entryId, lastEntry);
-                            throw new 
BKException.BKUnexpectedConditionException();
-                        } else {
-                            long ignored = inputStream.skip(length);
+
+                // seek the position to the first entry position, otherwise we 
will get the unexpected entry ID when doing
+                // the first read, that would cause read an unexpected entry 
id which is out of range between firstEntry
+                // and lastEntry
+                // for example, when we get 1-10 entries at first, then the 
next request is get 2-9, the following code
+                // will read the entry id from the stream and that is not the 
correct entry id, so it will seek to the
+                // correct position then read the stream as normal. But the 
entry id may exceed the last entry id, that
+                // will cause we are hardly to know the edge of the request 
range.
+                
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+
+                while (entriesToRead > 0) {
+                    if (state == State.Closed) {
+                        log.warn("Reading a closed read handler. Ledger ID: 
{}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
+                        throw new BKException.BKUnexpectedConditionException();
+                    }
+                    int length = dataStream.readInt();
+                    if (length < 0) { // hit padding or new block
+                        
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    }
+                    long entryId = dataStream.readLong();
+
+                    if (entryId == nextExpectedId) {
+                        ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+                        entries.add(LedgerEntryImpl.create(ledgerId, entryId, 
length, buf));
+                        int toWrite = length;
+                        while (toWrite > 0) {
+                            toWrite -= buf.writeBytes(dataStream, toWrite);
                         }
+                        entriesToRead--;
+                        nextExpectedId++;
+                    } else if (entryId > nextExpectedId && entryId < 
lastEntry) {
+                        
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    } else if (entryId < nextExpectedId
+                        && !index.getIndexEntryForEntry(nextExpectedId).equals(
+                        index.getIndexEntryForEntry(entryId))) {
+                        
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    } else if (entryId > lastEntry) {
+                        log.info("Expected to read {}, but read {}, which is 
greater than last entry {}",
+                            nextExpectedId, entryId, lastEntry);
+                        throw new BKException.BKUnexpectedConditionException();
+                    } else {
+                        long ignored = inputStream.skip(length);

Review comment:
       There has a spotbugs check to let us handle the returned value. So we 
can't remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to