zhaijack closed pull request #1651: PIP-17: provide DataBlockHeader and BlockAwareSegmentInputStream implementation URL: https://github.com/apache/incubator-pulsar/pull/1651
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java new file mode 100644 index 0000000000..6e2837c337 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; +import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; + +/** + * The data block header in code storage for each data block + * + * Currently, It is in format: + * [ magic_word -- int ][ block_len -- int ][ block_entry_count -- int ][ first_entry_id -- long] + * + * with the size: 4 + 4 + 4 + 8 = 20 Bytes + */ +@Unstable +@LimitedPrivate +public interface DataBlockHeader { + /** + * Get Magic Word for data block. + * It is a sequence of bytes used to identify the start of a block. + */ + int getBlockMagicWord(); + + /** + * Get the length of the block in bytes, including the header. + */ + int getBlockLength(); + + /** + * Get the message entry count that stored in this data block. + */ + int getBlockEntryCount(); + + /** + * Get the message entry Id for the first message that stored in this data block. + */ + long getFirstEntryId(); + + /** + * Get the size of this DataBlockHeader. + */ + int getHeaderSize(); + + /** + * Get the content of the data block header as InputStream. + * Read out in current format. + */ + InputStream toStream() throws IOException; + + /** + * Get the payload start offset in this block. + * Space before this offset is for header and alignment. + */ + static int getDataStartOffset() { + return DataBlockHeaderImpl.getDataStartOffset(); + } +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java new file mode 100644 index 0000000000..188e21e4d0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStream.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * The BlockAwareSegmentInputStream for each cold storage data block. + * It contains a byte buffer, which contains all the content for this data block. + * DataBlockHeader + entries(each with format[[entry_size -- int][entry_id -- long][entry_data]]) + * + */ +public class BlockAwareSegmentInputStream extends ByteArrayInputStream { + private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStream.class); + + private static final byte blockEndPadding = 0xA; + + private final ReadHandle ledger; + private final long startEntryId; + private final int blockSize; + + private DataBlockHeader dataBlockHeader; + // Number of Message entries that read from ledger and written to this block. + private int blockEntryCount; + // Number of Payload Bytes from ledger that has been written to this buffer. + private int payloadBytesWritten; + + // ByteBuf that wrapped the buffer and use to write content into the buffer. + private final ByteBuf writer; + + public BlockAwareSegmentInputStream(ReadHandle ledger, long startEntryId, int blockSize) { + super(new byte[blockSize]); + this.ledger = ledger; + this.startEntryId = startEntryId; + this.blockSize = blockSize; + + this.payloadBytesWritten = 0; + this.blockEntryCount = 0; + this.writer = Unpooled.wrappedBuffer(this.buf); + // set writer index to the payload start offset + writer.writerIndex(DataBlockHeaderImpl.getDataStartOffset()); + } + + @Override + public void close() throws IOException { + writer.release(); + super.close(); + } + + // Read entry complete, no space for one more entry, do padding and construct header, complete future + private void entryReadComplete(CompletableFuture<Void> future) throws IOException { + // padding at end + IntStream.range(0, writer.writableBytes()).forEach(i -> writer.writeByte(blockEndPadding)); + + // construct and write block header + dataBlockHeader = DataBlockHeaderImpl.of(blockSize, blockEntryCount, startEntryId); + writer.writerIndex(0); + writer.writeBytes(dataBlockHeader.toStream(), dataBlockHeader.getHeaderSize()); + writer.writerIndex(blockSize); + future.complete(null); + } + + // roughly get how may entries we would read this time. + private int readEntryCount() { + // read some more entries than roughly computed each time. + final int readAheadNum = 100; + long ledgerLength = ledger.getLength(); + long lac = ledger.getLastAddConfirmed(); + int averageEntrySize = (int)(ledgerLength / (lac + 1)) + 4 + 8; + int numEntriesToRead = writer.writableBytes() / averageEntrySize; + return numEntriesToRead + readAheadNum; + } + + // put the entries that read from ledger into buffer, and update counters. If buffer is full, complete the future. + private void bufReadEntries(CompletableFuture<Void> future, LedgerEntries entries) throws IOException { + Iterator<LedgerEntry> iterator = entries.iterator(); + int entryLength = 0; + while (iterator.hasNext()) { + LedgerEntry entry = iterator.next(); + long entryId = entry.getEntryId(); + entryLength = (int)entry.getLength(); + + if (writer.writableBytes() >= entryLength + 4 + 8) { + // has space for this entry, write it into buf + writer + .writeInt(entryLength) + .writeLong(entryId) + .writeBytes(entry.getEntryBuffer()); + // set counters + blockEntryCount ++; + payloadBytesWritten += entryLength; + entry.close(); + } else { + // buf has no space left for a whole message entry + entry.close(); + if (iterator.hasNext()) { + iterator.forEachRemaining(LedgerEntry::close); + } + // padding and write header + entryReadComplete(future); + return; + } + } + + // not have read enough entries, read more. + checkState(!iterator.hasNext()); + if (writer.writableBytes() > 4 + 8) { + readLedgerEntries(future, startEntryId + blockEntryCount, readEntryCount()); + } else { + entryReadComplete(future); + } + } + + // read `number` entries start from `start`, if buffer is full, bufReadEntries will complete the future. + private void readLedgerEntries(CompletableFuture<Void> future, long start, int number) { + ledger.readAsync(start, Math.min(start + number - 1, ledger.getLastAddConfirmed())) + .whenComplete((ledgerEntries, exception) -> { + try { + if (exception != null) { + log.error("Meet exception in readAsync.", exception); + this.close(); + future.completeExceptionally(exception); + return; + } + + bufReadEntries(future, ledgerEntries); + } catch (Exception ioe) { + log.error("Meet exception while read entries.", ioe); + future.completeExceptionally(ioe); + } + }); + } + + // initialize and fill data from ReadHandle + public CompletableFuture<Void> initialize() { + CompletableFuture<Void> future = new CompletableFuture<>(); + readLedgerEntries(future, startEntryId, readEntryCount()); + return future; + } + + public ReadHandle getLedger() { + return ledger; + } + + public long getStartEntryId() { + return startEntryId; + } + + public int getBlockSize() { + return blockSize; + } + + public DataBlockHeader getDataBlockHeader() { + return dataBlockHeader; + } + + public int getBlockEntryCount() { + return blockEntryCount; + } + + public long getLastEntryIdWritten() { + return startEntryId + blockEntryCount - 1; + } + + public int getPayloadBytesWritten() { + return payloadBytesWritten; + } + + public byte getBlockEndPadding() { + return blockEndPadding; + } + +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java new file mode 100644 index 0000000000..e2063b368c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.PooledByteBufAllocator; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import org.apache.pulsar.broker.s3offload.DataBlockHeader; + +/** + * + * The data block header in code storage for each data block. + * + */ +public class DataBlockHeaderImpl implements DataBlockHeader { + // Magic Word for data block. + // It is a sequence of bytes used to identify the start of a block. + private static final int dataBlockMagicWord = 0xDBDBDBDB; + // This is bigger than header size. Leaving some place for alignment and future enhancement. + // Payload use this as the start offset. + private static final int dataBlockHeaderAlign = 1024; + // The size of this header. + private static final int dataBlockHeaderSize = 4 /* magic word */ + + 4 /* index block length */ + + 4 /* block entry count */ + + 8 /* first entry id */; + + public static DataBlockHeaderImpl of(int blockLength, int blockEntryCount, long firstEntryId) { + return new DataBlockHeaderImpl(blockLength, blockEntryCount, firstEntryId); + } + + // Construct DataBlockHeader from InputStream + public static DataBlockHeaderImpl fromStream(InputStream stream) throws IOException { + DataInputStream dis = new DataInputStream(stream); + int magic = dis.readInt(); + checkState(magic == dataBlockMagicWord); + return new DataBlockHeaderImpl(dis.readInt(), dis.readInt(), dis.readLong()); + } + + private final int blockLength; + private final int blockEntryCount; + private final long firstEntryId; + + @Override + public int getBlockMagicWord() { + return dataBlockMagicWord; + } + + @Override + public int getBlockLength() { + return this.blockLength; + } + + @Override + public int getBlockEntryCount() { + return this.blockEntryCount; + } + + @Override + public long getFirstEntryId() { + return this.firstEntryId; + } + + @Override + public int getHeaderSize() { + return dataBlockHeaderSize; + } + + static public int getDataStartOffset() { + return dataBlockHeaderAlign; + } + + public DataBlockHeaderImpl(int blockLength, int blockEntryCount, long firstEntryId) { + this.blockLength = blockLength; + this.blockEntryCount = blockEntryCount; + this.firstEntryId = firstEntryId; + } + + /** + * Get the content of the data block header as InputStream. + * Read out in format: + * [ magic_word -- int ][ block_len -- int ][ block_entry_count -- int ][ first_entry_id -- long] + */ + public InputStream toStream() throws IOException { + int headerSize = 4 /* magic word */ + + 4 /* index block length */ + + 4 /* block entry count */ + + 8 /* first entry id */; + + ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(headerSize, headerSize); + out.writeInt(dataBlockMagicWord) + .writeInt(blockLength) + .writeInt(blockEntryCount) + .writeLong(firstEntryId); + + return new ByteBufInputStream(out, true); + } +} + diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java new file mode 100644 index 0000000000..8e7fa6bf91 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/BlockAwareSegmentInputStreamTest.java @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; + +import com.google.common.io.ByteStreams; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStream; +import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +@Slf4j +public class BlockAwareSegmentInputStreamTest { + @Data + class MockLedgerEntry implements LedgerEntry { + byte blockPadding = 0xB; + long ledgerId; + long entryId; + long length; + byte entryBytes[]; + ByteBuf entryBuffer; + + MockLedgerEntry(long ledgerId, long entryId, long length) { + this.ledgerId = ledgerId; + this.entryId = entryId; + this.length = length; + this.entryBytes = new byte[(int)length]; + entryBuffer = Unpooled.wrappedBuffer(entryBytes); + entryBuffer.writerIndex(0); + IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(blockPadding)); + } + + @Override + public ByteBuffer getEntryNioBuffer() { + return null; + } + + @Override + public LedgerEntry duplicate() { + return null; + } + + @Override + public void close() { + entryBuffer.release(); + } + } + + @Data + class MockLedgerEntries implements LedgerEntries { + int ledgerId; + int startEntryId; + int count; + int entrySize; + List<LedgerEntry> entries; + + MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize) { + this.ledgerId = ledgerId; + this.startEntryId = startEntryId; + this.count = count; + this.entrySize = entrySize; + this.entries = Lists.newArrayList(count); + + IntStream.range(startEntryId, startEntryId + count).forEach(i -> + entries.add(new MockLedgerEntry(ledgerId, i, entrySize))); + } + + @Override + public void close() { + entries.clear(); + } + + @Override + public LedgerEntry getEntry(long entryId) { + if (entryId < startEntryId || entryId >= startEntryId + count) { + return null; + } + + return entries.get(((int)entryId - startEntryId)); + } + + @Override + public Iterator<LedgerEntry> iterator() { + return entries.iterator(); + } + } + + class MockReadHandle implements ReadHandle { + int ledgerId; + int entrySize; + int lac; + MockReadHandle(int ledgerId, int entrySize, int lac) { + this.ledgerId = ledgerId; + this.entrySize = entrySize; + this.lac = lac; + } + + @Override + public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { + CompletableFuture<LedgerEntries> future = new CompletableFuture<>(); + LedgerEntries entries = new MockLedgerEntries(ledgerId, + (int)firstEntry, + (int)(lastEntry - firstEntry + 1), + entrySize); + + Executors.newCachedThreadPool(new DefaultThreadFactory("test")) + .submit(() -> future.complete(entries)); + return future; + } + + @Override + public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) { + return readAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture<Long> readLastAddConfirmedAsync() { + return null; + } + + @Override + public CompletableFuture<Long> tryReadLastAddConfirmedAsync() { + return null; + } + + @Override + public long getLastAddConfirmed() { + return lac; + } + + @Override + public long getLength() { + return (lac + 1) * entrySize; + } + + @Override + public boolean isClosed() { + return true; + } + + @Override + public CompletableFuture<LastConfirmedAndEntry> + readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel) { + return null; + } + + @Override + public LedgerMetadata getLedgerMetadata() { + return null; + } + + @Override + public long getId() { + return ledgerId; + } + + @Override + public CompletableFuture<Void> closeAsync() { + return null; + } + } + + @Test + public void blockAwareSegmentInputStreamTest() throws Exception { + int ledgerId = 1; + int entrySize = 8; + int lac = 200; + ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac); + + int blockSize = 2 * 1024; // first 1024 for blockHeader, so it should only able to have entry: 1024/(8+4+8) = 51 + BlockAwareSegmentInputStream inputStream = new BlockAwareSegmentInputStream(readHandle, 0, blockSize); + CompletableFuture<Void> initFuture = inputStream.initialize(); + initFuture.get(); + + int expectedEntryCount = 1024 / (entrySize + 4 + 8); + // verify get methods + assertTrue(inputStream.getLedger() == readHandle); + assertTrue(inputStream.getStartEntryId() == 0); + assertTrue(inputStream.getBlockSize() == blockSize); + assertTrue(inputStream.getBlockEntryCount() == expectedEntryCount); + assertTrue(inputStream.getPayloadBytesWritten() == entrySize * expectedEntryCount); + assertTrue(inputStream.getLastEntryIdWritten() == expectedEntryCount - 1); + + // verify header + DataBlockHeader header = inputStream.getDataBlockHeader(); + assertTrue(header.getBlockMagicWord() == 0xDBDBDBDB); + assertTrue(header.getBlockLength() == blockSize); + assertTrue(header.getBlockEntryCount() == expectedEntryCount); + assertTrue(header.getFirstEntryId() == 0); + + // verify read inputStream + // 1. read header + byte headerB[] = new byte[DataBlockHeader.getDataStartOffset()]; + ByteStreams.readFully(inputStream, headerB); + DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB)); + assertTrue(headerRead.getBlockMagicWord() == 0xDBDBDBDB); + assertTrue(headerRead.getBlockLength() == blockSize); + assertTrue(headerRead.getBlockEntryCount() == expectedEntryCount); + assertTrue(headerRead.getFirstEntryId() == 0); + + byte[] entryData = new byte[entrySize]; + IntStream.range(0, entrySize).forEach(i -> { + entryData[i] = 0xB; + }); + + // 2. read Ledger entries + IntStream.range(0, expectedEntryCount).forEach(i -> { + try { + byte lengthBuf[] = new byte[4]; + byte entryIdBuf[] = new byte[8]; + byte content[] = new byte[entrySize]; + inputStream.read(lengthBuf); + inputStream.read(entryIdBuf); + inputStream.read(content); + + long entryId = Unpooled.wrappedBuffer(entryIdBuf).getLong(0); + + assertEquals(entrySize, Unpooled.wrappedBuffer(lengthBuf).getInt(0)); + assertEquals(i, entryId); + assertArrayEquals(entryData, content); + } catch (Exception e) { + fail("meet exception", e); + } + }); + + // 3. read padding + int left = blockSize - DataBlockHeader.getDataStartOffset() - expectedEntryCount * (entrySize + 4 + 8); + byte padding[] = new byte[left]; + inputStream.read(padding); + ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding); + IntStream.range(0, paddingBuf.capacity()).forEach(i -> + assertEquals(paddingBuf.readByte(), inputStream.getBlockEndPadding()) + ); + + // 4. reach end. + assertEquals(-1, inputStream.read()); + + inputStream.close(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java new file mode 100644 index 0000000000..530d73b4e8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static org.testng.Assert.assertTrue; + +import java.io.InputStream; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; +import org.testng.annotations.Test; + +@Slf4j +public class DataBlockHeaderTest { + + @Test + public void dataBlockHeaderImplTest() throws Exception { + int headerLength = 1024 * 1024; + int entryCount = 2222; + long firstEntryId = 3333L; + + DataBlockHeaderImpl dataBlockHeader = DataBlockHeaderImpl.of(headerLength, + entryCount, + firstEntryId); + + // verify get methods + assertTrue(dataBlockHeader.getBlockMagicWord() == 0xDBDBDBDB); + assertTrue(dataBlockHeader.getBlockLength() == headerLength); + assertTrue(dataBlockHeader.getBlockEntryCount() == entryCount); + assertTrue(dataBlockHeader.getFirstEntryId() == firstEntryId); + + // verify toStream and fromStream + InputStream stream = dataBlockHeader.toStream(); + DataBlockHeaderImpl rebuild = DataBlockHeaderImpl.fromStream(stream); + + assertTrue(rebuild.getBlockMagicWord() == 0xDBDBDBDB); + assertTrue(rebuild.getBlockLength() == headerLength); + assertTrue(rebuild.getBlockEntryCount() == entryCount); + assertTrue(rebuild.getFirstEntryId() == firstEntryId); + } + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
