This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b4d05ac  Fix the read performance issue in the offload readAsync 
(#12443)
b4d05ac is described below

commit b4d05ac1bf5cddb613d93806e505ef8788e1acc0
Author: Yong Zhang <[email protected]>
AuthorDate: Fri Oct 22 11:15:55 2021 +0800

    Fix the read performance issue in the offload readAsync (#12443)
    
    ---
    
    *Motivation*
    
    In the #12123, I add the seek operation at the readAsync method.
    It makes sure the data stream always seek to the first entry position
    to read and will not introduce EOF exception.
    But in the offload index entry, it groups a set of entries into a range,
    the seek operation will seek the posistion to the first entry in the range.
    That will introduce a performance issue because every read opeartion will
    read from the first entry in the range until it find the actual first read
    entry.
    But if we remove the seek operation, that will cause a EOF exception from
    the readAsync method. This PR adds a limitation of the seek opeartion.
    
    *Modifications*
    
    Add available method in the backedInputStream to get know how many bytes
    we can read from the stream.
---
 .../impl/BlobStoreBackedInputStreamImpl.java       |  5 +++++
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 26 +++++++++++++---------
 .../jcloud/BlobStoreBackedInputStreamTest.java     | 23 +++++++++++++++++++
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 19 ++++++++++++++++
 4 files changed, 63 insertions(+), 10 deletions(-)

diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 6a204d5..e3fc68a 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -141,4 +141,9 @@ public class BlobStoreBackedInputStreamImpl extends 
BackedInputStream {
     public void close() {
         buffer.release();
     }
+
+    @Override
+    public int available() throws IOException {
+        return (int)(objectLen - cursor) + buffer.readableBytes();
+    }
 }
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 2bf380d..98fdff4 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -105,6 +105,7 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            boolean seeked = false;
             try {
                 if (firstEntry > lastEntry
                     || firstEntry < 0
@@ -115,14 +116,13 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
                 long entriesToRead = (lastEntry - firstEntry) + 1;
                 long nextExpectedId = firstEntry;
 
-                // seek the position to the first entry position, otherwise we 
will get the unexpected entry ID when doing
-                // the first read, that would cause read an unexpected entry 
id which is out of range between firstEntry
-                // and lastEntry
-                // for example, when we get 1-10 entries at first, then the 
next request is get 2-9, the following code
-                // will read the entry id from the stream and that is not the 
correct entry id, so it will seek to the
-                // correct position then read the stream as normal. But the 
entry id may exceed the last entry id, that
-                // will cause we are hardly to know the edge of the request 
range.
-                
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+                // checking the data stream has enough data to read to avoid 
throw EOF exception when reading data.
+                // 12 bytes represent the stream have the length and entryID 
to read.
+                if (dataStream.available() < 12) {
+                    log.warn("There hasn't enough data to read, current 
available data has {} bytes,"
+                        + " seek to the first entry {} to avoid EOF 
exception", inputStream.available(), firstEntry);
+                    
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+                }
 
                 while (entriesToRead > 0) {
                     if (state == State.Closed) {
@@ -149,14 +149,20 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
                         log.warn("The read entry {} is not the expected entry 
{} but in the range of {} - {},"
                             + " seeking to the right position", entryId, 
nextExpectedId, nextExpectedId, lastEntry);
                         
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                        continue;
                     } else if (entryId < nextExpectedId
                         && 
!index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId)))
 {
                         log.warn("Read an unexpected entry id {} which is 
smaller than the next expected entry id {}"
                         + ", seeking to the right position", entries, 
nextExpectedId);
                         
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                        continue;
                     } else if (entryId > lastEntry) {
+                        // in the normal case, the entry id should increment 
in order. But if there has random access in
+                        // the read method, we should allow to seek to the 
right position and the entry id should
+                        // never over to the last entry again.
+                        if (!seeked) {
+                            
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            seeked = true;
+                            continue;
+                        }
                         log.info("Expected to read {}, but read {}, which is 
greater than last entry {}",
                             nextExpectedId, entryId, lastEntry);
                         throw new BKException.BKUnexpectedConditionException();
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
index ffe8fb2..36541b4 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
@@ -260,4 +260,27 @@ public class BlobStoreBackedInputStreamTest extends 
BlobStoreTestBase {
         toTest.seekForward(after);
         assertStreamsMatch(toTest, toCompare);
     }
+
+    @Test
+    public void testAvailable() throws IOException {
+        String objectKey = "testAvailable";
+        int objectSize = 2048;
+        RandomInputStream toWrite = new RandomInputStream(0, objectSize);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength(objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        BackedInputStream bis = new BlobStoreBackedInputStreamImpl(
+            blobStore, BUCKET, objectKey, (k, md) -> {}, objectSize, 512);
+        Assert.assertEquals(bis.available(), objectSize);
+        bis.seek(500);
+        Assert.assertEquals(bis.available(), objectSize - 500);
+        bis.seek(1024);
+        Assert.assertEquals(bis.available(), 1024);
+        bis.seek(2048);
+        Assert.assertEquals(bis.available(), 0);
+    }
 }
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 90d8b11..77dfc55 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -477,4 +478,22 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
             Assert.assertTrue(e.getCause().getMessage().contains("Invalid 
object version"));
         }
     }
+
+    @Test
+    public void testReadEOFException() throws Throwable {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = getOffloader();
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), 
toWrite.getLastAddConfirmed());
+        toTest.readAsync(0, toTest.getLastAddConfirmed()).get();
+
+        try {
+            toTest.readAsync(0, 0).get();
+        } catch (Exception e) {
+            fail("Get unexpected exception when reading entries", e);
+        }
+    }
 }

Reply via email to