ivankelly commented on a change in pull request #1678: PIP-17: provide 
BlockAwareSegmentInputStream implementation and test
URL: https://github.com/apache/incubator-pulsar/pull/1678#discussion_r185566416
 
 

 ##########
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
 ##########
 @@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import 
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+@Slf4j
+public class BlockAwareSegmentInputStreamTest {
+    @Data
+    class MockLedgerEntry implements LedgerEntry {
+        public byte blockPadding = 0xB;
+        long ledgerId;
+        long entryId;
+        long length;
+        byte entryBytes[];
+        ByteBuf entryBuffer;
+
+        MockLedgerEntry(long ledgerId, long entryId, long length) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.length = length;
+            this.entryBytes = new byte[(int)length];
+            entryBuffer = Unpooled.wrappedBuffer(entryBytes);
+            entryBuffer.writerIndex(0);
+            IntStream.range(0, (int)length).forEach(i -> 
entryBuffer.writeByte(blockPadding));
+        }
+
+        @Override
+        public ByteBuffer getEntryNioBuffer() {
+            return null;
+        }
+
+        @Override
+        public LedgerEntry duplicate() {
+            return null;
+        }
+
+        @Override
+        public void close() {
+            entryBuffer.release();
+        }
+    }
+
+    @Data
+    class MockLedgerEntries implements LedgerEntries {
+        int ledgerId;
+        int startEntryId;
+        int count;
+        int entrySize;
+        List<LedgerEntry> entries;
+
+        MockLedgerEntries(int ledgerId, int startEntryId, int count, int 
entrySize) {
+            this.ledgerId = ledgerId;
+            this.startEntryId = startEntryId;
+            this.count = count;
+            this.entrySize = entrySize;
+            this.entries = Lists.newArrayList(count);
+
+            IntStream.range(startEntryId, startEntryId + count).forEach(i ->
+                entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+        }
+
+        @Override
+        public void close() {
+            entries.clear();
+        }
+
+        @Override
+        public LedgerEntry getEntry(long entryId) {
+            if (entryId < startEntryId || entryId >= startEntryId + count) {
+                return null;
+            }
+
+            return entries.get(((int)entryId - startEntryId));
+        }
+
+        @Override
+        public Iterator<LedgerEntry> iterator() {
+            return entries.iterator();
+        }
+    }
+
+    class MockReadHandle implements ReadHandle {
+        int ledgerId;
+        int entrySize;
+        int lac;
+        MockReadHandle(int ledgerId, int entrySize, int lac) {
+            this.ledgerId = ledgerId;
+            this.entrySize = entrySize;
+            this.lac = lac;
+        }
+
+        @Override
+        public CompletableFuture<LedgerEntries> readAsync(long firstEntry, 
long lastEntry) {
+            CompletableFuture<LedgerEntries> future = new 
CompletableFuture<>();
+            LedgerEntries entries = new MockLedgerEntries(ledgerId,
+                (int)firstEntry,
+                (int)(lastEntry - firstEntry + 1),
+                entrySize);
+
+            Executors.newCachedThreadPool(new DefaultThreadFactory("test"))
+                .submit(() -> future.complete(entries));
+            return future;
+        }
+
+        @Override
+        public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
+            return readAsync(firstEntry, lastEntry);
+        }
+
+        @Override
+        public CompletableFuture<Long> readLastAddConfirmedAsync() {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+            return null;
+        }
+
+        @Override
+        public long getLastAddConfirmed() {
+            return lac;
+        }
+
+        @Override
+        public long getLength() {
+            return (lac + 1) * entrySize;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return true;
+        }
+
+        @Override
+        public CompletableFuture<LastConfirmedAndEntry>
+        readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, 
boolean parallel) {
+            return null;
+        }
+
+        @Override
+        public LedgerMetadata getLedgerMetadata() {
+            return null;
+        }
+
+        @Override
+        public long getId() {
+            return ledgerId;
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            return null;
+        }
+    }
+
+    @Test
+    public void blockAwareSegmentInputStreamTestHaveEndPadding() throws 
Exception {
+        int ledgerId = 1;
+        int entrySize = 8;
+        int lac = 160;
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac);
+
+        // set block size bigger than to (header + entry) size.
+        int blockSize = 3148 + 5;
+        BlockAwareSegmentInputStreamImpl inputStream = new 
BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+        int expectedEntryCount = (blockSize - 
DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8);
+
+        // verify get methods
+        assertEquals(inputStream.getLedger(), readHandle);
+        assertEquals(inputStream.getStartEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        // verify read inputStream
+        // 1. read header. 128
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        ByteStreams.readFully(inputStream, headerB);
+        DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new 
ByteArrayInputStream(headerB));
+        assertEquals(headerRead.getBlockLength(), blockSize);
+        assertEquals(headerRead.getFirstEntryId(), 0);
+
+        byte[] entryData = new byte[entrySize];
+        IntStream.range(0, entrySize).forEach(i -> {
 
 Review comment:
   Arrays.fill(entryData, 0xB);

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to