horizonzy commented on code in PR #15063:
URL: https://github.com/apache/pulsar/pull/15063#discussion_r869496534


##########
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java:
##########
@@ -530,4 +683,120 @@ public void testOnlyNegativeOnEOF() throws Exception {
         }
     }
 
+    @Test
+    public void testOnlyNegativeOnEOFWithBufferedRead() throws IOException {
+        int ledgerId = 1;
+        int entrySize = 10000;
+        int lac = 0;
+
+        Random r = new Random(0);
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, 
() -> (byte)r.nextInt());
+
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 
2;
+        BlockAwareSegmentInputStreamImpl inputStream = new 
BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+
+        int bytesRead = 0;
+        int ret;
+        int offset = 0;
+        int resetOffsetCount = 0;
+        byte[] buf = new byte[1024];
+        while ((ret = inputStream.read(buf, offset, buf.length - offset)) > 0) 
{
+            bytesRead += ret;
+            int currentOffset = offset;
+            offset = (offset + ret) % buf.length;
+            if (offset < currentOffset) {
+                resetOffsetCount++;
+            }
+        }
+        assertEquals(bytesRead, blockSize);
+        assertNotEquals(resetOffsetCount, 0);
+    }
+
+    // This test is for testing the read(byte[] buf, int off, int len) method 
can work properly
+    // on the offset not 0.
+    @Test
+    public void testReadTillLacWithSmallBuffer() throws Exception {
+        // simulate last data block read.
+        int ledgerId = 1;
+        int entrySize = 8;
+        int lac = 89;
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac);
+
+        // set block size equals to (header + lac_entry) size.
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + (1 + lac) * 
(entrySize + 4 + 8);
+        BlockAwareSegmentInputStreamImpl inputStream = new 
BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+        int expectedEntryCount = (blockSize - 
DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8);
+
+        // verify get methods
+        assertEquals(inputStream.getLedger(), readHandle);
+        assertEquals(inputStream.getStartEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        // verify read inputStream
+        // 1. read header. 128
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        // read twice to test the offset not 0 case
+        int ret = inputStream.read(headerB, 0, 66);
+        assertEquals(ret, 66);
+        ret = inputStream.read(headerB, 66, headerB.length - 66);
+        assertEquals(headerB.length - 66, ret);
+        DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new 
ByteArrayInputStream(headerB));
+        assertEquals(headerRead.getBlockLength(), blockSize);
+        assertEquals(headerRead.getFirstEntryId(), 0);
+
+        byte[] entryData = new byte[entrySize];
+        Arrays.fill(entryData, (byte)0xB); // 0xB is 
MockLedgerEntry.blockPadding
+
+        // 2. read Ledger entries. 96 * 20
+        IntStream.range(0, expectedEntryCount).forEach(i -> {
+            try {
+                byte lengthBuf[] = new byte[4];
+                byte entryIdBuf[] = new byte[8];
+                byte content[] = new byte[entrySize];
+
+                int read = inputStream.read(lengthBuf, 0, 4);
+                assertEquals(read, 4);
+                read = inputStream.read(entryIdBuf, 0, 8);
+                assertEquals(read, 8);
+
+                Random random = new Random(System.currentTimeMillis());
+                int o = 0;
+                int totalRead = 0;
+                int maxReadTime = 10;
+                while (o != content.length) {
+                    int r;
+                    if (maxReadTime-- == 0) {
+                        r = entrySize - o;
+                    } else {
+                        r = random.nextInt(entrySize - o);
+                    }
+                    read = inputStream.read(content, o, r);
+                    totalRead += read;
+                    o += r;
+                }
+                assertEquals(totalRead, entrySize);
+
+                assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
+                assertEquals(i, Longs.fromByteArray(entryIdBuf));
+                assertArrayEquals(entryData, content);
+            } catch (Exception e) {
+                fail("meet exception", e);
+            }
+        });
+
+        // 3. should have no padding
+        int left = blockSize - DataBlockHeaderImpl.getDataStartOffset() -  
expectedEntryCount * (entrySize + 4 + 8);

Review Comment:
   It only calculate conversely, maybe should check 
BlockAwareSegmentInputStreamImpl `dataBlockFullOffset == blockSize`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to