This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1404944 PIP-17: the part of index block for offload. (#1593) 1404944 is described below commit 14049448e5d4170b7ad90fefb8568eb265f278e7 Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Thu May 3 01:03:02 2018 +0800 PIP-17: the part of index block for offload. (#1593) * add offload index block * change follow ivan's comments * change to use api.LedgerMetadata * change following @sijie's comments * avoid using org.apache.bookkeeper.client.LedgerMetadata serialization * change following @ivan's comments --- .../pulsar/broker/s3offload/OffloadIndexBlock.java | 63 ++++ .../broker/s3offload/OffloadIndexBlockBuilder.java | 72 +++++ .../pulsar/broker/s3offload/OffloadIndexEntry.java | 50 +++ .../impl/OffloadIndexBlockBuilderImpl.java | 77 +++++ .../s3offload/impl/OffloadIndexBlockImpl.java | 337 +++++++++++++++++++++ .../s3offload/impl/OffloadIndexEntryImpl.java | 58 ++++ .../apache/pulsar/s3offload/OffloadIndexTest.java | 237 +++++++++++++++ 7 files changed, 894 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java new file mode 100644 index 0000000..8f9d3ce --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; + +/** + * + * The Index block abstraction used for offload a ledger to long term storage. + * + */ +@Unstable +public interface OffloadIndexBlock extends Closeable { + + /** + * Get the content of the index block as InputStream. + * Read out in format: + * | index_magic_header | index_block_len | index_entry_count | + * |segment_metadata_length | segment metadata | index entries | + */ + InputStream toStream() throws IOException; + + /** + * Get the related OffloadIndexEntry that contains the given messageEntryId. + * + * @param messageEntryId + * the entry id of message + * @return the offload index entry + */ + OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException; + + /** + * Get the entry count that contained in this index Block. + */ + int getEntryCount(); + + /** + * Get LedgerMetadata. + */ + LedgerMetadata getLedgerMetadata(); + +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java new file mode 100644 index 0000000..8ec0395 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; +import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockBuilderImpl; + +/** + * Interface for builder of index block used for offload a ledger to long term storage. + */ +@Unstable +@LimitedPrivate +public interface OffloadIndexBlockBuilder { + + /** + * Build index block with the passed in ledger metadata. + * + * @param metadata the ledger metadata + */ + OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata); + + /** + * Add one payload block related information into index block. + * It contains the first entryId in payload block, the payload block Id, + * and payload block size. + * This information will be used to consist one index entry in OffloadIndexBlock. + * + * @param firstEntryId the first entryId in payload block + * @param partId the payload block Id + * @param blockSize the payload block size + */ + OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize); + + /** + * Finalize the immutable OffloadIndexBlock + */ + OffloadIndexBlock build(); + + /** + * Construct OffloadIndex from an InputStream + */ + OffloadIndexBlock fromStream(InputStream is) throws IOException; + + /** + * create an OffloadIndexBlockBuilder + */ + static OffloadIndexBlockBuilder create() { + return new OffloadIndexBlockBuilderImpl(); + } + + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java new file mode 100644 index 0000000..03927d3 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload; + +import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; + +/** + * + * The Index Entry in OffloadIndexBlock. + * It consists of the message entry id, the code storage block part id for this message entry, + * and the offset in code storage block for this message id. + * + */ +@Unstable +@LimitedPrivate +public interface OffloadIndexEntry { + + /** + * Get the entryId that this entry contains. + */ + long getEntryId(); + + /** + * Get the block part id of code storage. + */ + int getPartId(); + + /** + * Get the offset of this message entry in code storage. + */ + long getOffset(); +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java new file mode 100644 index 0000000..ced3bf3 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; +import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; + +/** + * Interface for builder of index block used for offload a ledger to long term storage. + */ +public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder { + + private LedgerMetadata ledgerMetadata; + private List<OffloadIndexEntryImpl> entries; + + public OffloadIndexBlockBuilderImpl() { + this.entries = Lists.newArrayList(); + } + + @Override + public OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata) { + this.ledgerMetadata = metadata; + return this; + } + + @Override + public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize) { + // we should added one by one. + long offset; + if(firstEntryId == 0) { + checkState(entries.size() == 0); + offset = 0; + } else { + checkState(entries.size() > 0); + offset = entries.get(entries.size() - 1).getOffset() + blockSize; + } + + this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, offset)); + return this; + } + + @Override + public OffloadIndexBlock fromStream(InputStream is) throws IOException { + return OffloadIndexBlockImpl.get(is); + } + + @Override + public OffloadIndexBlock build() { + checkState(ledgerMetadata != null); + checkState(!entries.isEmpty()); + return OffloadIndexBlockImpl.get(ledgerMetadata, entries); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java new file mode 100644 index 0000000..31058b4 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.DataFormats; +import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat; +import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State; +import org.apache.bookkeeper.shaded.com.google.protobuf.ByteString; +import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; +import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OffloadIndexBlockImpl implements OffloadIndexBlock { + private static final Logger log = LoggerFactory.getLogger(OffloadIndexBlockImpl.class); + + private static final int INDEX_MAGIC_WORD = 0xDE47DE47; + + private LedgerMetadata segmentMetadata; + private TreeMap<Long, OffloadIndexEntryImpl> indexEntries; + + private final Handle<OffloadIndexBlockImpl> recyclerHandle; + + private static final Recycler<OffloadIndexBlockImpl> RECYCLER = new Recycler<OffloadIndexBlockImpl>() { + @Override + protected OffloadIndexBlockImpl newObject(Recycler.Handle<OffloadIndexBlockImpl> handle) { + return new OffloadIndexBlockImpl(handle); + } + }; + + private OffloadIndexBlockImpl(Handle<OffloadIndexBlockImpl> recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + public static OffloadIndexBlockImpl get(LedgerMetadata metadata, List<OffloadIndexEntryImpl> entries) { + OffloadIndexBlockImpl block = RECYCLER.get(); + block.indexEntries = Maps.newTreeMap(); + entries.forEach(entry -> block.indexEntries.putIfAbsent(entry.getEntryId(), entry)); + checkState(entries.size() == block.indexEntries.size()); + block.segmentMetadata = metadata; + return block; + } + + public static OffloadIndexBlockImpl get(InputStream stream) throws IOException { + OffloadIndexBlockImpl block = RECYCLER.get(); + block.indexEntries = Maps.newTreeMap(); + block.fromStream(stream); + return block; + } + + public void recycle() { + segmentMetadata = null; + indexEntries.clear(); + indexEntries = null; + if (recyclerHandle != null) { + recyclerHandle.recycle(this); + } + } + + @Override + public OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException { + if(messageEntryId > segmentMetadata.getLastEntryId()) { + log.warn("Try to get entry: {}, which beyond lastEntryId {}, return null", + messageEntryId, segmentMetadata.getLastEntryId()); + throw new IndexOutOfBoundsException("Entry index: " + messageEntryId + + " beyond lastEntryId: " + segmentMetadata.getLastEntryId()); + } + // find the greatest mapping Id whose entryId <= messageEntryId + return this.indexEntries.floorEntry(messageEntryId).getValue(); + } + + @Override + public int getEntryCount() { + return this.indexEntries.size(); + } + + @Override + public LedgerMetadata getLedgerMetadata() { + return this.segmentMetadata; + } + + private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) { + LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder(); + builder.setQuorumSize(metadata.getWriteQuorumSize()) + .setAckQuorumSize(metadata.getAckQuorumSize()) + .setEnsembleSize(metadata.getEnsembleSize()) + .setLength(metadata.getLength()) + .setState(metadata.isClosed() ? LedgerMetadataFormat.State.CLOSED : LedgerMetadataFormat.State.OPEN) + .setLastEntryId(metadata.getLastEntryId()) + .setCtime(metadata.getCtime()) + .setDigestType(BookKeeper.DigestType.toProtoDigestType( + BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType()))); + + for (Map.Entry<String, byte[]> e : metadata.getCustomMetadata().entrySet()) { + builder.addCustomMetadataBuilder() + .setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue())); + } + + for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getAllEnsembles().entrySet()) { + builder.addSegmentBuilder() + .setFirstEntryId(e.getKey()) + .addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList())); + } + + return builder.build().toByteArray(); + } + + /** + * Get the content of the index block as InputStream. + * Read out in format: + * | index_magic_header | index_block_len | index_entry_count | + * |segment_metadata_len | segment metadata | index entries | + */ + @Override + public InputStream toStream() throws IOException { + int indexBlockLength; + int segmentMetadataLength; + int indexEntryCount = this.indexEntries.size(); + + byte[] ledgerMetadataByte = buildLedgerMetadataFormat(this.segmentMetadata); + segmentMetadataLength = ledgerMetadataByte.length; + + indexBlockLength = 4 /* magic header */ + + 4 /* index block length */ + + 4 /* segment metadata length */ + + 4 /* index entry count */ + + segmentMetadataLength + + indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */ + + ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength); + + out.writeInt(INDEX_MAGIC_WORD) + .writeInt(indexBlockLength) + .writeInt(segmentMetadataLength) + .writeInt(indexEntryCount); + + // write metadata + out.writeBytes(ledgerMetadataByte); + + // write entries + this.indexEntries.entrySet().forEach(entry -> + out.writeLong(entry.getValue().getEntryId()) + .writeInt(entry.getValue().getPartId()) + .writeLong(entry.getValue().getOffset())); + + return new ByteBufInputStream(out, true); + } + + static private class InternalLedgerMetadata implements LedgerMetadata { + private LedgerMetadataFormat ledgerMetadataFormat; + + private int ensembleSize; + private int writeQuorumSize; + private int ackQuorumSize; + private long lastEntryId; + private long length; + private DataFormats.LedgerMetadataFormat.DigestType digestType; + private long ctime; + private State state; + private Map<String, byte[]> customMetadata = Maps.newHashMap(); + private TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles = new TreeMap<Long, ArrayList<BookieSocketAddress>>(); + + InternalLedgerMetadata(LedgerMetadataFormat ledgerMetadataFormat) { + this.ensembleSize = ledgerMetadataFormat.getEnsembleSize(); + this.writeQuorumSize = ledgerMetadataFormat.getQuorumSize(); + this.ackQuorumSize = ledgerMetadataFormat.getAckQuorumSize(); + this.lastEntryId = ledgerMetadataFormat.getLastEntryId(); + this.length = ledgerMetadataFormat.getLength(); + this.digestType = ledgerMetadataFormat.getDigestType(); + this.ctime = ledgerMetadataFormat.getCtime(); + this.state = ledgerMetadataFormat.getState(); + + if (ledgerMetadataFormat.getCustomMetadataCount() > 0) { + ledgerMetadataFormat.getCustomMetadataList().forEach( + entry -> this.customMetadata.put(entry.getKey(), entry.getValue().toByteArray())); + } + + ledgerMetadataFormat.getSegmentList().forEach(segment -> { + ArrayList<BookieSocketAddress> addressArrayList = new ArrayList<BookieSocketAddress>(); + segment.getEnsembleMemberList().forEach(address -> { + try { + addressArrayList.add(new BookieSocketAddress(address)); + } catch (IOException e) { + log.error("Exception when create BookieSocketAddress. ", e); + } + }); + this.ensembles.put(segment.getFirstEntryId(), addressArrayList); + }); + } + + @Override + public int getEnsembleSize() { + return this.ensembleSize; + } + + @Override + public int getWriteQuorumSize() { + return this.writeQuorumSize; + } + + @Override + public int getAckQuorumSize() { + return this.ackQuorumSize; + } + + @Override + public long getLastEntryId() { + return this.lastEntryId; + } + + @Override + public long getLength() { + return this.length; + } + + @Override + public DigestType getDigestType() { + switch (this.digestType) { + case HMAC: + return DigestType.MAC; + case CRC32: + return DigestType.CRC32; + case CRC32C: + return DigestType.CRC32C; + case DUMMY: + return DigestType.DUMMY; + default: + throw new IllegalArgumentException("Unable to convert digest type " + digestType); + } + } + + @Override + public long getCtime() { + return this.ctime; + } + + @Override + public boolean isClosed() { + return this.state == State.CLOSED; + } + + @Override + public Map<String, byte[]> getCustomMetadata() { + return this.customMetadata; + } + + @Override + public List<BookieSocketAddress> getEnsembleAt(long entryId) { + return ensembles.get(ensembles.headMap(entryId + 1).lastKey()); + } + + @Override + public NavigableMap<Long, ? extends List<BookieSocketAddress>> getAllEnsembles() { + return this.ensembles; + } + } + + private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException { + LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder(); + builder.mergeFrom(bytes); + return new InternalLedgerMetadata(builder.build()); + } + + private OffloadIndexBlock fromStream(InputStream stream) throws IOException { + DataInputStream dis = new DataInputStream(stream); + int magic = dis.readInt(); + if (magic != this.INDEX_MAGIC_WORD) { + throw new IOException("Invalid MagicWord. read: " + magic + " expected: " + INDEX_MAGIC_WORD); + } + int indexBlockLength = dis.readInt(); + int segmentMetadataLength = dis.readInt(); + int indexEntryCount = dis.readInt(); + + byte[] metadataBytes = new byte[segmentMetadataLength]; + + if (segmentMetadataLength != dis.read(metadataBytes)) { + log.error("Read ledgerMetadata from bytes failed"); + throw new IOException("Read ledgerMetadata from bytes failed"); + } + this.segmentMetadata = parseLedgerMetadata(metadataBytes); + + for (int i = 0; i < indexEntryCount; i ++) { + long entryId = dis.readLong(); + this.indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(), dis.readLong())); + } + + return this; + } + + public static int getIndexMagicWord() { + return INDEX_MAGIC_WORD; + } + + @Override + public void close() { + recycle(); + } + +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java new file mode 100644 index 0000000..d74ba93 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; + +/** + * + * The Index Entry in OffloadIndexBlock. + * + */ +public class OffloadIndexEntryImpl implements OffloadIndexEntry { + public static OffloadIndexEntryImpl of(long entryId, int partId, long offset) { + return new OffloadIndexEntryImpl(entryId, partId, offset); + } + + private final long entryId; + + private final int partId; + + private final long offset; + + @Override + public long getEntryId() { + return entryId; + } + @Override + public int getPartId() { + return partId; + } + @Override + public long getOffset() { + return offset; + } + + public OffloadIndexEntryImpl(long entryId, int partId, long offset) { + this.entryId = entryId; + this.partId = partId; + this.offset = offset; + } +} + diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java new file mode 100644 index 0000000..aeecbee --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.s3offload; + +import static com.google.common.base.Charsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; +import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; +import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; +import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl; +import org.apache.pulsar.broker.s3offload.impl.OffloadIndexEntryImpl; +import org.testng.annotations.Test; + +@Slf4j +public class OffloadIndexTest { + + @Test + public void offloadIndexEntryImplTest() { + // verify OffloadIndexEntryImpl builder + OffloadIndexEntryImpl entry1 = OffloadIndexEntryImpl.of(0, 2, 0); + OffloadIndexEntryImpl entry2 = OffloadIndexEntryImpl.of(100, 3, 1234); + + // verify OffloadIndexEntryImpl get + assertEquals(entry1.getEntryId(), 0L); + assertEquals(entry1.getPartId(), 2); + assertEquals(entry1.getOffset(), 0L); + + assertEquals(entry2.getEntryId(), 100L); + assertEquals(entry2.getPartId(), 3); + assertEquals(entry2.getOffset(), 1234L); + } + + + // use mock to setLastEntryId + class LedgerMetadataMock extends org.apache.bookkeeper.client.LedgerMetadata { + long lastId = 0; + public LedgerMetadataMock(int ensembleSize, int writeQuorumSize, int ackQuorumSize, org.apache.bookkeeper.client.BookKeeper.DigestType digestType, byte[] password, Map<String, byte[]> customMetadata, boolean storeSystemtimeAsLedgerCreationTime) { + super(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password, customMetadata, storeSystemtimeAsLedgerCreationTime); + } + + @Override + public long getLastEntryId(){ + return lastId; + } + + public void setLastEntryId(long lastId) { + this.lastId = lastId; + } + } + + private LedgerMetadata createLedgerMetadata() throws Exception { + + Map<String, byte[]> metadataCustom = Maps.newHashMap(); + metadataCustom.put("key1", "value1".getBytes(UTF_8)); + metadataCustom.put("key7", "value7".getBytes(UTF_8)); + + ArrayList<BookieSocketAddress> bookies = Lists.newArrayList(); + BookieSocketAddress BOOKIE1 = new BookieSocketAddress("127.0.0.1:3181"); + BookieSocketAddress BOOKIE2 = new BookieSocketAddress("127.0.0.2:3181"); + BookieSocketAddress BOOKIE3 = new BookieSocketAddress("127.0.0.3:3181"); + bookies.add(0, BOOKIE1); + bookies.add(1, BOOKIE2); + bookies.add(2, BOOKIE3); + + LedgerMetadataMock metadata = new LedgerMetadataMock(3, 3, 2, + DigestType.CRC32C, "password".getBytes(UTF_8), metadataCustom, false); + + metadata.addEnsemble(0, bookies); + metadata.setLastEntryId(5000); + return metadata; + } + + // prepare metadata, then use builder to build a OffloadIndexBlockImpl + // verify get methods, readout and fromStream methods. + @Test + public void offloadIndexBlockImplTest() throws Exception { + OffloadIndexBlockBuilder blockBuilder = OffloadIndexBlockBuilder.create(); + LedgerMetadata metadata = createLedgerMetadata(); + log.debug("created metadata: {}", metadata.toString()); + + blockBuilder.withMetadata(metadata); + + blockBuilder.addBlock(0, 2, 0); + blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024); + blockBuilder.addBlock(2000, 4, 64 * 1024 * 1024); + OffloadIndexBlock indexBlock = blockBuilder.build(); + + // verify getEntryCount and getLedgerMetadata + assertEquals(indexBlock.getEntryCount(), 3); + assertEquals(indexBlock.getLedgerMetadata(), metadata); + + // verify getIndexEntryForEntry + OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(0); + assertEquals(entry1.getEntryId(), 0); + assertEquals(entry1.getPartId(),2); + assertEquals(entry1.getOffset(), 0); + + OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(500); + assertEquals(entry11, entry1); + + OffloadIndexEntry entry2 = indexBlock.getIndexEntryForEntry(1000); + assertEquals(entry2.getEntryId(), 1000); + assertEquals(entry2.getPartId(), 3); + assertEquals(entry2.getOffset(), 64 * 1024 * 1024); + + OffloadIndexEntry entry22 = indexBlock.getIndexEntryForEntry(1300); + assertEquals(entry22, entry2); + + OffloadIndexEntry entry3 = indexBlock.getIndexEntryForEntry(2000); + + assertEquals(entry3.getEntryId(), 2000); + assertEquals(entry3.getPartId(), 4); + assertEquals(entry3.getOffset(), 2 * 64 * 1024 * 1024); + + OffloadIndexEntry entry33 = indexBlock.getIndexEntryForEntry(3000); + assertEquals(entry33, entry3); + + try { + OffloadIndexEntry entry4 = indexBlock.getIndexEntryForEntry(6000); + fail("Should throw IndexOutOfBoundsException."); + } catch (Exception e) { + assertTrue(e instanceof IndexOutOfBoundsException); + assertEquals(e.getMessage(), "Entry index: 6000 beyond lastEntryId: 5000"); + } + + // verify toStream + InputStream out = indexBlock.toStream(); + byte b[] = new byte[1024]; + int readoutLen = out.read(b); + out.close(); + ByteBuf wrapper = Unpooled.wrappedBuffer(b); + int magic = wrapper.readInt(); + int indexBlockLength = wrapper.readInt(); + int segmentMetadataLength = wrapper.readInt(); + int indexEntryCount = wrapper.readInt(); + + // verify counter + assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord()); + assertEquals(indexBlockLength, readoutLen); + assertEquals(indexEntryCount, 3); + + wrapper.readBytes(segmentMetadataLength); + log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}", + magic, indexBlockLength, segmentMetadataLength, indexEntryCount); + + // verify entry + OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong()); + OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong()); + OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong());; + + assertEquals(e1.getEntryId(),entry1.getEntryId()); + assertEquals(e1.getPartId(), entry1.getPartId()); + assertEquals(e1.getOffset(), entry1.getOffset()); + assertEquals(e2.getEntryId(), entry2.getEntryId()); + assertEquals(e2.getPartId(), entry2.getPartId()); + assertEquals(e2.getOffset(), entry2.getOffset()); + assertEquals(e3.getEntryId(), entry3.getEntryId()); + assertEquals(e3.getPartId(), entry3.getPartId()); + assertEquals(e3.getOffset(), entry3.getOffset()); + wrapper.release(); + + // verify build OffloadIndexBlock from InputStream + InputStream out2 = indexBlock.toStream(); + int streamLength = out2.available(); + out2.mark(0); + OffloadIndexBlock indexBlock2 = blockBuilder.fromStream(out2); + // 1. verify metadata that got from inputstream success. + LedgerMetadata metadata2 = indexBlock2.getLedgerMetadata(); + log.debug("built metadata: {}", metadata2.toString()); + assertEquals(metadata2.getAckQuorumSize(), metadata.getAckQuorumSize()); + assertEquals(metadata2.getEnsembleSize(), metadata.getEnsembleSize()); + assertEquals(metadata2.getDigestType(), metadata.getDigestType()); + assertEquals(metadata2.getAllEnsembles().entrySet(), metadata.getAllEnsembles().entrySet()); + // 2. verify set all the entries + assertEquals(indexBlock2.getEntryCount(), indexBlock.getEntryCount()); + // 3. verify reach end + assertEquals(out2.read(), -1); + + + out2.reset(); + byte streamContent[] = new byte[streamLength]; + // stream with all 0, simulate junk data, should throw exception for header magic not match. + try(InputStream stream3 = new ByteArrayInputStream(streamContent, 0, streamLength)) { + OffloadIndexBlock indexBlock3 = blockBuilder.fromStream(stream3); + fail("Should throw IOException"); + } catch (Exception e) { + assertTrue(e instanceof IOException); + assertTrue(e.getMessage().contains("Invalid MagicWord")); + } + + // simulate read header too small, throw EOFException. + out2.read(streamContent); + try(InputStream stream4 = + new ByteArrayInputStream(streamContent, 0, streamLength - 1)) { + OffloadIndexBlock indexBlock4 = blockBuilder.fromStream(stream4); + fail("Should throw EOFException"); + } catch (Exception e) { + assertTrue(e instanceof java.io.EOFException); + } + + out2.close(); + indexBlock.close(); + } + +} -- To stop receiving notification emails like this one, please contact si...@apache.org.