zhaijack closed pull request #1651: PIP-17:  provide DataBlockHeader and 
BlockAwareSegmentInputStream implementation
URL: https://github.com/apache/incubator-pulsar/pull/1651
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
new file mode 100644
index 0000000000..6e2837c337
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.IOException;
+import java.io.InputStream;
+import 
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+
+/**
+ * The data block header in code storage for each data block
+ *
+ * Currently, It is in format:
+ * [ magic_word -- int ][ block_len -- int ][ block_entry_count -- int ][ 
first_entry_id  -- long]
+ *
+ * with the size: 4 + 4 + 4 + 8 = 20 Bytes
+ */
+@Unstable
+@LimitedPrivate
+public interface DataBlockHeader {
+    /**
+     * Get Magic Word for data block.
+     * It is a sequence of bytes used to identify the start of a block.
+     */
+    int getBlockMagicWord();
+
+    /**
+     * Get the length of the block in bytes, including the header.
+     */
+    int getBlockLength();
+
+    /**
+     * Get the message entry count that stored in this data block.
+     */
+    int getBlockEntryCount();
+
+    /**
+     * Get the message entry Id for the first message that stored in this data 
block.
+     */
+    long getFirstEntryId();
+
+    /**
+     * Get the size of this DataBlockHeader.
+     */
+    int getHeaderSize();
+
+    /**
+     * Get the content of the data block header as InputStream.
+     * Read out in current format.
+     */
+    InputStream toStream() throws IOException;
+
+    /**
+     * Get the payload start offset in this block.
+     * Space before this offset is for header and alignment.
+     */
+    static int getDataStartOffset() {
+        return DataBlockHeaderImpl.getDataStartOffset();
+    }
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
new file mode 100644
index 0000000000..188e21e4d0
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The BlockAwareSegmentInputStream for each cold storage data block.
+ * It contains a byte buffer, which contains all the content for this data 
block.
+ *      DataBlockHeader + entries(each with format[[entry_size -- 
int][entry_id -- long][entry_data]])
+ *
+ */
+public class BlockAwareSegmentInputStream extends ByteArrayInputStream {
+    private static final Logger log = 
LoggerFactory.getLogger(BlockAwareSegmentInputStream.class);
+
+    private static final byte blockEndPadding = 0xA;
+
+    private final ReadHandle ledger;
+    private final long startEntryId;
+    private final int blockSize;
+
+    private DataBlockHeader dataBlockHeader;
+    // Number of Message entries that read from ledger and written to this 
block.
+    private int blockEntryCount;
+    // Number of Payload Bytes from ledger that has been written to this 
buffer.
+    private int payloadBytesWritten;
+
+    // ByteBuf that wrapped the buffer and use to write content into the 
buffer.
+    private final ByteBuf writer;
+
+    public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, 
int blockSize) {
+        super(new byte[blockSize]);
+        this.ledger = ledger;
+        this.startEntryId = startEntryId;
+        this.blockSize = blockSize;
+
+        this.payloadBytesWritten = 0;
+        this.blockEntryCount = 0;
+        this.writer = Unpooled.wrappedBuffer(this.buf);
+        // set writer index to the payload start offset
+        writer.writerIndex(DataBlockHeaderImpl.getDataStartOffset());
+    }
+
+    @Override
+    public void close() throws IOException {
+        writer.release();
+        super.close();
+    }
+
+    // Read entry complete, no space for one more entry, do padding and 
construct header, complete future
+    private void entryReadComplete(CompletableFuture<Void> future) throws 
IOException {
+        // padding at end
+        IntStream.range(0, writer.writableBytes()).forEach(i -> 
writer.writeByte(blockEndPadding));
+
+        // construct and write block header
+        dataBlockHeader = DataBlockHeaderImpl.of(blockSize, blockEntryCount, 
startEntryId);
+        writer.writerIndex(0);
+        writer.writeBytes(dataBlockHeader.toStream(), 
dataBlockHeader.getHeaderSize());
+        writer.writerIndex(blockSize);
+        future.complete(null);
+    }
+
+    // roughly get how may entries we would read this time.
+    private int readEntryCount() {
+        // read some more entries than roughly computed each time.
+        final int readAheadNum = 100;
+        long ledgerLength = ledger.getLength();
+        long lac = ledger.getLastAddConfirmed();
+        int averageEntrySize = (int)(ledgerLength / (lac + 1)) + 4 + 8;
+        int numEntriesToRead = writer.writableBytes() / averageEntrySize;
+        return numEntriesToRead + readAheadNum;
+    }
+
+    // put the entries that read from ledger into buffer, and update counters. 
If buffer is full, complete the future.
+    private void bufReadEntries(CompletableFuture<Void> future, LedgerEntries 
entries) throws IOException {
+        Iterator<LedgerEntry> iterator = entries.iterator();
+        int entryLength = 0;
+        while (iterator.hasNext()) {
+            LedgerEntry entry = iterator.next();
+            long entryId = entry.getEntryId();
+            entryLength = (int)entry.getLength();
+
+            if (writer.writableBytes() >= entryLength + 4 + 8) {
+                // has space for this entry, write it into buf
+                writer
+                    .writeInt(entryLength)
+                    .writeLong(entryId)
+                    .writeBytes(entry.getEntryBuffer());
+                // set counters
+                blockEntryCount ++;
+                payloadBytesWritten += entryLength;
+                entry.close();
+            } else {
+                // buf has no space left for a whole message entry
+                entry.close();
+                if (iterator.hasNext()) {
+                    iterator.forEachRemaining(LedgerEntry::close);
+                }
+                // padding and write header
+                entryReadComplete(future);
+                return;
+            }
+        }
+
+        // not have read enough entries, read more.
+        checkState(!iterator.hasNext());
+        if (writer.writableBytes() > 4 + 8) {
+            readLedgerEntries(future, startEntryId + blockEntryCount, 
readEntryCount());
+        } else {
+            entryReadComplete(future);
+        }
+    }
+
+    // read `number` entries start from `start`, if buffer is full, 
bufReadEntries will complete the future.
+    private void readLedgerEntries(CompletableFuture<Void> future, long start, 
int number) {
+        ledger.readAsync(start, Math.min(start + number - 1, 
ledger.getLastAddConfirmed()))
+            .whenComplete((ledgerEntries, exception) -> {
+            try {
+                if (exception != null) {
+                    log.error("Meet exception in readAsync.", exception);
+                    this.close();
+                    future.completeExceptionally(exception);
+                    return;
+                }
+
+                bufReadEntries(future, ledgerEntries);
+            } catch (Exception ioe) {
+                log.error("Meet exception while read entries.", ioe);
+                future.completeExceptionally(ioe);
+            }
+        });
+    }
+
+    // initialize and fill data from ReadHandle
+    public CompletableFuture<Void> initialize() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        readLedgerEntries(future, startEntryId, readEntryCount());
+        return future;
+    }
+
+    public ReadHandle getLedger() {
+        return ledger;
+    }
+
+    public long getStartEntryId() {
+        return startEntryId;
+    }
+
+    public int getBlockSize() {
+        return blockSize;
+    }
+
+    public DataBlockHeader getDataBlockHeader() {
+        return dataBlockHeader;
+    }
+
+    public int getBlockEntryCount() {
+        return blockEntryCount;
+    }
+
+    public long getLastEntryIdWritten() {
+        return startEntryId + blockEntryCount - 1;
+    }
+
+    public int getPayloadBytesWritten() {
+        return payloadBytesWritten;
+    }
+
+    public byte getBlockEndPadding() {
+        return blockEndPadding;
+    }
+
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
new file mode 100644
index 0000000000..e2063b368c
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+
+/**
+ *
+ * The data block header in code storage for each data block.
+ *
+ */
+public class DataBlockHeaderImpl implements DataBlockHeader {
+    // Magic Word for data block.
+    // It is a sequence of bytes used to identify the start of a block.
+    private static final int dataBlockMagicWord = 0xDBDBDBDB;
+    // This is bigger than header size. Leaving some place for alignment and 
future enhancement.
+    // Payload use this as the start offset.
+    private static final int dataBlockHeaderAlign = 1024;
+    // The size of this header.
+    private static final int dataBlockHeaderSize = 4 /* magic word */
+        + 4 /* index block length */
+        + 4 /* block entry count */
+        + 8 /* first entry id */;
+
+    public static DataBlockHeaderImpl of(int blockLength, int blockEntryCount, 
long firstEntryId) {
+        return new DataBlockHeaderImpl(blockLength, blockEntryCount, 
firstEntryId);
+    }
+
+    // Construct DataBlockHeader from InputStream
+    public static DataBlockHeaderImpl fromStream(InputStream stream) throws 
IOException {
+        DataInputStream dis = new DataInputStream(stream);
+        int magic = dis.readInt();
+        checkState(magic == dataBlockMagicWord);
+        return new DataBlockHeaderImpl(dis.readInt(), dis.readInt(), 
dis.readLong());
+    }
+
+    private final int blockLength;
+    private final int blockEntryCount;
+    private final long firstEntryId;
+
+    @Override
+    public int getBlockMagicWord() {
+        return dataBlockMagicWord;
+    }
+
+    @Override
+    public int getBlockLength() {
+        return this.blockLength;
+    }
+
+    @Override
+    public int getBlockEntryCount() {
+        return this.blockEntryCount;
+    }
+
+    @Override
+    public long getFirstEntryId() {
+        return this.firstEntryId;
+    }
+
+    @Override
+    public int getHeaderSize() {
+        return dataBlockHeaderSize;
+    }
+
+    static public int getDataStartOffset() {
+        return dataBlockHeaderAlign;
+    }
+
+    public DataBlockHeaderImpl(int blockLength, int blockEntryCount, long 
firstEntryId) {
+        this.blockLength = blockLength;
+        this.blockEntryCount = blockEntryCount;
+        this.firstEntryId = firstEntryId;
+    }
+
+    /**
+     * Get the content of the data block header as InputStream.
+     * Read out in format:
+     *   [ magic_word -- int ][ block_len -- int ][ block_entry_count -- int 
][ first_entry_id  -- long]
+     */
+    public InputStream toStream() throws IOException {
+        int headerSize = 4 /* magic word */
+            + 4 /* index block length */
+            + 4 /* block entry count */
+            + 8 /* first entry id */;
+
+        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(headerSize, 
headerSize);
+        out.writeInt(dataBlockMagicWord)
+            .writeInt(blockLength)
+            .writeInt(blockEntryCount)
+            .writeLong(firstEntryId);
+
+        return new ByteBufInputStream(out, true);
+    }
+}
+
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
new file mode 100644
index 0000000000..8e7fa6bf91
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import com.google.common.io.ByteStreams;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStream;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+@Slf4j
+public class BlockAwareSegmentInputStreamTest {
+    @Data
+    class MockLedgerEntry implements LedgerEntry {
+        byte blockPadding = 0xB;
+        long ledgerId;
+        long entryId;
+        long length;
+        byte entryBytes[];
+        ByteBuf entryBuffer;
+
+        MockLedgerEntry(long ledgerId, long entryId, long length) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.length = length;
+            this.entryBytes = new byte[(int)length];
+            entryBuffer = Unpooled.wrappedBuffer(entryBytes);
+            entryBuffer.writerIndex(0);
+            IntStream.range(0, (int)length).forEach(i -> 
entryBuffer.writeByte(blockPadding));
+        }
+
+        @Override
+        public ByteBuffer getEntryNioBuffer() {
+            return null;
+        }
+
+        @Override
+        public LedgerEntry duplicate() {
+            return null;
+        }
+
+        @Override
+        public void close() {
+            entryBuffer.release();
+        }
+    }
+
+    @Data
+    class MockLedgerEntries implements LedgerEntries {
+        int ledgerId;
+        int startEntryId;
+        int count;
+        int entrySize;
+        List<LedgerEntry> entries;
+
+        MockLedgerEntries(int ledgerId, int startEntryId, int count, int 
entrySize) {
+            this.ledgerId = ledgerId;
+            this.startEntryId = startEntryId;
+            this.count = count;
+            this.entrySize = entrySize;
+            this.entries = Lists.newArrayList(count);
+
+            IntStream.range(startEntryId, startEntryId + count).forEach(i ->
+                entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+        }
+
+        @Override
+        public void close() {
+            entries.clear();
+        }
+
+        @Override
+        public LedgerEntry getEntry(long entryId) {
+            if (entryId < startEntryId || entryId >= startEntryId + count) {
+                return null;
+            }
+
+            return entries.get(((int)entryId - startEntryId));
+        }
+
+        @Override
+        public Iterator<LedgerEntry> iterator() {
+            return entries.iterator();
+        }
+    }
+
+    class MockReadHandle implements ReadHandle {
+        int ledgerId;
+        int entrySize;
+        int lac;
+        MockReadHandle(int ledgerId, int entrySize, int lac) {
+            this.ledgerId = ledgerId;
+            this.entrySize = entrySize;
+            this.lac = lac;
+        }
+
+        @Override
+        public CompletableFuture<LedgerEntries> readAsync(long firstEntry, 
long lastEntry) {
+            CompletableFuture<LedgerEntries> future = new 
CompletableFuture<>();
+            LedgerEntries entries = new MockLedgerEntries(ledgerId,
+                (int)firstEntry,
+                (int)(lastEntry - firstEntry + 1),
+                entrySize);
+
+            Executors.newCachedThreadPool(new DefaultThreadFactory("test"))
+                .submit(() -> future.complete(entries));
+            return future;
+        }
+
+        @Override
+        public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
+            return readAsync(firstEntry, lastEntry);
+        }
+
+        @Override
+        public CompletableFuture<Long> readLastAddConfirmedAsync() {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+            return null;
+        }
+
+        @Override
+        public long getLastAddConfirmed() {
+            return lac;
+        }
+
+        @Override
+        public long getLength() {
+            return (lac + 1) * entrySize;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return true;
+        }
+
+        @Override
+        public CompletableFuture<LastConfirmedAndEntry>
+        readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, 
boolean parallel) {
+            return null;
+        }
+
+        @Override
+        public LedgerMetadata getLedgerMetadata() {
+            return null;
+        }
+
+        @Override
+        public long getId() {
+            return ledgerId;
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            return null;
+        }
+    }
+
+    @Test
+    public void blockAwareSegmentInputStreamTest() throws Exception {
+        int ledgerId = 1;
+        int entrySize = 8;
+        int lac = 200;
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac);
+
+        int blockSize = 2 * 1024; // first 1024 for blockHeader, so it should 
only able to have entry: 1024/(8+4+8) = 51
+        BlockAwareSegmentInputStream inputStream = new 
BlockAwareSegmentInputStream(readHandle, 0, blockSize);
+        CompletableFuture<Void> initFuture = inputStream.initialize();
+        initFuture.get();
+
+        int expectedEntryCount = 1024 / (entrySize + 4 + 8);
+        // verify get methods
+        assertTrue(inputStream.getLedger() == readHandle);
+        assertTrue(inputStream.getStartEntryId() == 0);
+        assertTrue(inputStream.getBlockSize() == blockSize);
+        assertTrue(inputStream.getBlockEntryCount() == expectedEntryCount);
+        assertTrue(inputStream.getPayloadBytesWritten() == entrySize * 
expectedEntryCount);
+        assertTrue(inputStream.getLastEntryIdWritten() == expectedEntryCount - 
1);
+
+        // verify header
+        DataBlockHeader header = inputStream.getDataBlockHeader();
+        assertTrue(header.getBlockMagicWord() == 0xDBDBDBDB);
+        assertTrue(header.getBlockLength() == blockSize);
+        assertTrue(header.getBlockEntryCount() == expectedEntryCount);
+        assertTrue(header.getFirstEntryId() == 0);
+
+        // verify read inputStream
+        // 1. read header
+        byte headerB[] = new byte[DataBlockHeader.getDataStartOffset()];
+        ByteStreams.readFully(inputStream, headerB);
+        DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new 
ByteArrayInputStream(headerB));
+        assertTrue(headerRead.getBlockMagicWord() == 0xDBDBDBDB);
+        assertTrue(headerRead.getBlockLength() == blockSize);
+        assertTrue(headerRead.getBlockEntryCount() == expectedEntryCount);
+        assertTrue(headerRead.getFirstEntryId() == 0);
+
+        byte[] entryData = new byte[entrySize];
+        IntStream.range(0, entrySize).forEach(i -> {
+            entryData[i] = 0xB;
+        });
+
+        // 2. read Ledger entries
+        IntStream.range(0, expectedEntryCount).forEach(i -> {
+            try {
+                byte lengthBuf[] = new byte[4];
+                byte entryIdBuf[] = new byte[8];
+                byte content[] = new byte[entrySize];
+                inputStream.read(lengthBuf);
+                inputStream.read(entryIdBuf);
+                inputStream.read(content);
+
+                long entryId = Unpooled.wrappedBuffer(entryIdBuf).getLong(0);
+
+                assertEquals(entrySize, 
Unpooled.wrappedBuffer(lengthBuf).getInt(0));
+                assertEquals(i, entryId);
+                assertArrayEquals(entryData, content);
+            } catch (Exception e) {
+                fail("meet exception", e);
+            }
+        });
+
+        // 3. read padding
+        int left = blockSize - DataBlockHeader.getDataStartOffset() -  
expectedEntryCount * (entrySize + 4 + 8);
+        byte padding[] = new byte[left];
+        inputStream.read(padding);
+        ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
+        IntStream.range(0, paddingBuf.capacity()).forEach(i ->
+            assertEquals(paddingBuf.readByte(), 
inputStream.getBlockEndPadding())
+        );
+
+        // 4. reach end.
+        assertEquals(-1, inputStream.read());
+
+        inputStream.close();
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java
new file mode 100644
index 0000000000..530d73b4e8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertTrue;
+
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DataBlockHeaderTest {
+
+    @Test
+    public void dataBlockHeaderImplTest() throws Exception {
+        int headerLength = 1024 * 1024;
+        int entryCount = 2222;
+        long firstEntryId = 3333L;
+
+        DataBlockHeaderImpl dataBlockHeader = 
DataBlockHeaderImpl.of(headerLength,
+            entryCount,
+            firstEntryId);
+
+        // verify get methods
+        assertTrue(dataBlockHeader.getBlockMagicWord() == 0xDBDBDBDB);
+        assertTrue(dataBlockHeader.getBlockLength() == headerLength);
+        assertTrue(dataBlockHeader.getBlockEntryCount() == entryCount);
+        assertTrue(dataBlockHeader.getFirstEntryId() == firstEntryId);
+
+        // verify toStream and fromStream
+        InputStream stream = dataBlockHeader.toStream();
+        DataBlockHeaderImpl rebuild = DataBlockHeaderImpl.fromStream(stream);
+
+        assertTrue(rebuild.getBlockMagicWord() == 0xDBDBDBDB);
+        assertTrue(rebuild.getBlockLength() == headerLength);
+        assertTrue(rebuild.getBlockEntryCount() == entryCount);
+        assertTrue(rebuild.getFirstEntryId() == firstEntryId);
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to