This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1404944  PIP-17:  the part of index block for offload. (#1593)
1404944 is described below

commit 14049448e5d4170b7ad90fefb8568eb265f278e7
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Thu May 3 01:03:02 2018 +0800

    PIP-17:  the part of index block for offload. (#1593)
    
    * add offload index block
    
    * change follow ivan's comments
    
    * change to use api.LedgerMetadata
    
    * change following @sijie's comments
    
    * avoid using org.apache.bookkeeper.client.LedgerMetadata serialization
    
    * change following @ivan's comments
---
 .../pulsar/broker/s3offload/OffloadIndexBlock.java |  63 ++++
 .../broker/s3offload/OffloadIndexBlockBuilder.java |  72 +++++
 .../pulsar/broker/s3offload/OffloadIndexEntry.java |  50 +++
 .../impl/OffloadIndexBlockBuilderImpl.java         |  77 +++++
 .../s3offload/impl/OffloadIndexBlockImpl.java      | 337 +++++++++++++++++++++
 .../s3offload/impl/OffloadIndexEntryImpl.java      |  58 ++++
 .../apache/pulsar/s3offload/OffloadIndexTest.java  | 237 +++++++++++++++
 7 files changed, 894 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
new file mode 100644
index 0000000..8f9d3ce
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ *
+ * The Index block abstraction used for offload a ledger to long term storage.
+ *
+ */
+@Unstable
+public interface OffloadIndexBlock extends Closeable {
+
+    /**
+     * Get the content of the index block as InputStream.
+     * Read out in format:
+     *   | index_magic_header | index_block_len | index_entry_count |
+     *   |segment_metadata_length | segment metadata | index entries |
+     */
+    InputStream toStream() throws IOException;
+
+    /**
+     * Get the related OffloadIndexEntry that contains the given 
messageEntryId.
+     *
+     * @param messageEntryId
+     *                      the entry id of message
+     * @return the offload index entry
+     */
+    OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws 
IOException;
+
+    /**
+     * Get the entry count that contained in this index Block.
+     */
+    int getEntryCount();
+
+    /**
+     * Get LedgerMetadata.
+     */
+    LedgerMetadata getLedgerMetadata();
+
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
new file mode 100644
index 0000000..8ec0395
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import 
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockBuilderImpl;
+
+/**
+ * Interface for builder of index block used for offload a ledger to long term 
storage.
+ */
+@Unstable
+@LimitedPrivate
+public interface OffloadIndexBlockBuilder {
+
+    /**
+     * Build index block with the passed in ledger metadata.
+     *
+     * @param metadata the ledger metadata
+     */
+    OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata);
+
+    /**
+     * Add one payload block related information into index block.
+     * It contains the first entryId in payload block, the payload block Id,
+     * and payload block size.
+     * This information will be used to consist one index entry in 
OffloadIndexBlock.
+     *
+     * @param firstEntryId the first entryId in payload block
+     * @param partId the payload block Id
+     * @param blockSize the payload block size
+     */
+    OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int 
blockSize);
+
+    /**
+     * Finalize the immutable OffloadIndexBlock
+     */
+    OffloadIndexBlock build();
+
+    /**
+     * Construct OffloadIndex from an InputStream
+     */
+    OffloadIndexBlock fromStream(InputStream is) throws IOException;
+
+    /**
+     * create an OffloadIndexBlockBuilder
+     */
+    static OffloadIndexBlockBuilder create() {
+        return new OffloadIndexBlockBuilderImpl();
+    }
+
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
new file mode 100644
index 0000000..03927d3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import 
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ *
+ * The Index Entry in OffloadIndexBlock.
+ * It consists of the message entry id, the code storage block part id for 
this message entry,
+ * and the offset in code storage block for this message id.
+ *
+ */
+@Unstable
+@LimitedPrivate
+public interface OffloadIndexEntry {
+
+    /**
+     * Get the entryId that this entry contains.
+     */
+    long getEntryId();
+
+    /**
+     * Get the block part id of code storage.
+     */
+    int getPartId();
+
+    /**
+     * Get the offset of this message entry in code storage.
+     */
+    long getOffset();
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
new file mode 100644
index 0000000..ced3bf3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
+
+/**
+ * Interface for builder of index block used for offload a ledger to long term 
storage.
+ */
+public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
+
+    private LedgerMetadata ledgerMetadata;
+    private List<OffloadIndexEntryImpl> entries;
+
+    public OffloadIndexBlockBuilderImpl() {
+        this.entries = Lists.newArrayList();
+    }
+
+    @Override
+    public OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata) {
+        this.ledgerMetadata = metadata;
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, 
int blockSize) {
+        // we should added one by one.
+        long offset;
+        if(firstEntryId == 0) {
+            checkState(entries.size() == 0);
+            offset = 0;
+        } else {
+            checkState(entries.size() > 0);
+            offset = entries.get(entries.size() - 1).getOffset() + blockSize;
+        }
+
+        this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, 
offset));
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlock fromStream(InputStream is) throws IOException {
+        return OffloadIndexBlockImpl.get(is);
+    }
+
+    @Override
+    public OffloadIndexBlock build() {
+        checkState(ledgerMetadata != null);
+        checkState(!entries.isEmpty());
+        return OffloadIndexBlockImpl.get(ledgerMetadata, entries);
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
new file mode 100644
index 0000000..31058b4
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
+import org.apache.bookkeeper.shaded.com.google.protobuf.ByteString;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OffloadIndexBlockImpl implements OffloadIndexBlock {
+    private static final Logger log = 
LoggerFactory.getLogger(OffloadIndexBlockImpl.class);
+
+    private static final int INDEX_MAGIC_WORD = 0xDE47DE47;
+
+    private LedgerMetadata segmentMetadata;
+    private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
+
+    private final Handle<OffloadIndexBlockImpl> recyclerHandle;
+
+    private static final Recycler<OffloadIndexBlockImpl> RECYCLER = new 
Recycler<OffloadIndexBlockImpl>() {
+        @Override
+        protected OffloadIndexBlockImpl 
newObject(Recycler.Handle<OffloadIndexBlockImpl> handle) {
+            return new OffloadIndexBlockImpl(handle);
+        }
+    };
+
+    private OffloadIndexBlockImpl(Handle<OffloadIndexBlockImpl> 
recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    public static OffloadIndexBlockImpl get(LedgerMetadata metadata, 
List<OffloadIndexEntryImpl> entries) {
+        OffloadIndexBlockImpl block = RECYCLER.get();
+        block.indexEntries = Maps.newTreeMap();
+        entries.forEach(entry -> 
block.indexEntries.putIfAbsent(entry.getEntryId(), entry));
+        checkState(entries.size() == block.indexEntries.size());
+        block.segmentMetadata = metadata;
+        return block;
+    }
+
+    public static OffloadIndexBlockImpl get(InputStream stream) throws 
IOException {
+        OffloadIndexBlockImpl block = RECYCLER.get();
+        block.indexEntries = Maps.newTreeMap();
+        block.fromStream(stream);
+        return block;
+    }
+
+    public void recycle() {
+        segmentMetadata = null;
+        indexEntries.clear();
+        indexEntries = null;
+        if (recyclerHandle != null) {
+            recyclerHandle.recycle(this);
+        }
+    }
+
+    @Override
+    public OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws 
IOException {
+        if(messageEntryId > segmentMetadata.getLastEntryId()) {
+            log.warn("Try to get entry: {}, which beyond lastEntryId {}, 
return null",
+                messageEntryId, segmentMetadata.getLastEntryId());
+            throw new IndexOutOfBoundsException("Entry index: " + 
messageEntryId +
+                " beyond lastEntryId: " + segmentMetadata.getLastEntryId());
+        }
+        // find the greatest mapping Id whose entryId <= messageEntryId
+        return this.indexEntries.floorEntry(messageEntryId).getValue();
+    }
+
+    @Override
+    public int getEntryCount() {
+        return this.indexEntries.size();
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        return this.segmentMetadata;
+    }
+
+    private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
+        LedgerMetadataFormat.Builder builder = 
LedgerMetadataFormat.newBuilder();
+        builder.setQuorumSize(metadata.getWriteQuorumSize())
+            .setAckQuorumSize(metadata.getAckQuorumSize())
+            .setEnsembleSize(metadata.getEnsembleSize())
+            .setLength(metadata.getLength())
+            .setState(metadata.isClosed() ? LedgerMetadataFormat.State.CLOSED 
: LedgerMetadataFormat.State.OPEN)
+            .setLastEntryId(metadata.getLastEntryId())
+            .setCtime(metadata.getCtime())
+            .setDigestType(BookKeeper.DigestType.toProtoDigestType(
+                
BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType())));
+
+        for (Map.Entry<String, byte[]> e : 
metadata.getCustomMetadata().entrySet()) {
+            builder.addCustomMetadataBuilder()
+                
.setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
+        }
+
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : 
metadata.getAllEnsembles().entrySet()) {
+            builder.addSegmentBuilder()
+                .setFirstEntryId(e.getKey())
+                .addAllEnsembleMember(e.getValue().stream().map(a -> 
a.toString()).collect(Collectors.toList()));
+        }
+
+        return builder.build().toByteArray();
+    }
+
+    /**
+     * Get the content of the index block as InputStream.
+     * Read out in format:
+     *   | index_magic_header | index_block_len | index_entry_count |
+     *   |segment_metadata_len | segment metadata | index entries |
+     */
+    @Override
+    public InputStream toStream() throws IOException {
+        int indexBlockLength;
+        int segmentMetadataLength;
+        int indexEntryCount = this.indexEntries.size();
+
+        byte[] ledgerMetadataByte = 
buildLedgerMetadataFormat(this.segmentMetadata);
+        segmentMetadataLength = ledgerMetadataByte.length;
+
+        indexBlockLength = 4 /* magic header */
+            + 4 /* index block length */
+            + 4 /* segment metadata length */
+            + 4 /* index entry count */
+            + segmentMetadataLength
+            + indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + 
blockOffset */
+
+        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, 
indexBlockLength);
+
+        out.writeInt(INDEX_MAGIC_WORD)
+            .writeInt(indexBlockLength)
+            .writeInt(segmentMetadataLength)
+            .writeInt(indexEntryCount);
+
+        // write metadata
+        out.writeBytes(ledgerMetadataByte);
+
+        // write entries
+        this.indexEntries.entrySet().forEach(entry ->
+            out.writeLong(entry.getValue().getEntryId())
+                .writeInt(entry.getValue().getPartId())
+                .writeLong(entry.getValue().getOffset()));
+
+        return new ByteBufInputStream(out, true);
+    }
+
+    static private class InternalLedgerMetadata implements LedgerMetadata {
+        private LedgerMetadataFormat ledgerMetadataFormat;
+
+        private int ensembleSize;
+        private int writeQuorumSize;
+        private int ackQuorumSize;
+        private long lastEntryId;
+        private long length;
+        private DataFormats.LedgerMetadataFormat.DigestType digestType;
+        private long ctime;
+        private State state;
+        private Map<String, byte[]> customMetadata = Maps.newHashMap();
+        private TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles = new 
TreeMap<Long, ArrayList<BookieSocketAddress>>();
+
+        InternalLedgerMetadata(LedgerMetadataFormat ledgerMetadataFormat) {
+            this.ensembleSize = ledgerMetadataFormat.getEnsembleSize();
+            this.writeQuorumSize = ledgerMetadataFormat.getQuorumSize();
+            this.ackQuorumSize = ledgerMetadataFormat.getAckQuorumSize();
+            this.lastEntryId = ledgerMetadataFormat.getLastEntryId();
+            this.length = ledgerMetadataFormat.getLength();
+            this.digestType = ledgerMetadataFormat.getDigestType();
+            this.ctime = ledgerMetadataFormat.getCtime();
+            this.state = ledgerMetadataFormat.getState();
+
+            if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
+                ledgerMetadataFormat.getCustomMetadataList().forEach(
+                    entry -> this.customMetadata.put(entry.getKey(), 
entry.getValue().toByteArray()));
+            }
+
+            ledgerMetadataFormat.getSegmentList().forEach(segment -> {
+                ArrayList<BookieSocketAddress> addressArrayList = new 
ArrayList<BookieSocketAddress>();
+                segment.getEnsembleMemberList().forEach(address -> {
+                    try {
+                        addressArrayList.add(new BookieSocketAddress(address));
+                    } catch (IOException e) {
+                        log.error("Exception when create BookieSocketAddress. 
", e);
+                    }
+                });
+                this.ensembles.put(segment.getFirstEntryId(), 
addressArrayList);
+            });
+        }
+
+        @Override
+        public int getEnsembleSize() {
+            return this.ensembleSize;
+        }
+
+        @Override
+        public int getWriteQuorumSize() {
+            return this.writeQuorumSize;
+        }
+
+        @Override
+        public int getAckQuorumSize() {
+            return this.ackQuorumSize;
+        }
+
+        @Override
+        public long getLastEntryId() {
+            return this.lastEntryId;
+        }
+
+        @Override
+        public long getLength() {
+            return this.length;
+        }
+
+        @Override
+        public DigestType getDigestType() {
+            switch (this.digestType) {
+                case HMAC:
+                    return DigestType.MAC;
+                case CRC32:
+                    return DigestType.CRC32;
+                case CRC32C:
+                    return DigestType.CRC32C;
+                case DUMMY:
+                    return DigestType.DUMMY;
+                default:
+                    throw new IllegalArgumentException("Unable to convert 
digest type " + digestType);
+            }
+        }
+
+        @Override
+        public long getCtime() {
+            return this.ctime;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return this.state == State.CLOSED;
+        }
+
+        @Override
+        public Map<String, byte[]> getCustomMetadata() {
+            return this.customMetadata;
+        }
+
+        @Override
+        public List<BookieSocketAddress> getEnsembleAt(long entryId) {
+            return ensembles.get(ensembles.headMap(entryId + 1).lastKey());
+        }
+
+        @Override
+        public NavigableMap<Long, ? extends List<BookieSocketAddress>> 
getAllEnsembles() {
+            return this.ensembles;
+        }
+    }
+
+    private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws 
IOException {
+        LedgerMetadataFormat.Builder builder = 
LedgerMetadataFormat.newBuilder();
+        builder.mergeFrom(bytes);
+        return new InternalLedgerMetadata(builder.build());
+    }
+
+    private OffloadIndexBlock fromStream(InputStream stream) throws 
IOException {
+        DataInputStream dis = new DataInputStream(stream);
+        int magic = dis.readInt();
+        if (magic != this.INDEX_MAGIC_WORD) {
+            throw new IOException("Invalid MagicWord. read: " + magic + " 
expected: " + INDEX_MAGIC_WORD);
+        }
+        int indexBlockLength = dis.readInt();
+        int segmentMetadataLength = dis.readInt();
+        int indexEntryCount = dis.readInt();
+
+        byte[] metadataBytes = new byte[segmentMetadataLength];
+
+        if (segmentMetadataLength != dis.read(metadataBytes)) {
+            log.error("Read ledgerMetadata from bytes failed");
+            throw new IOException("Read ledgerMetadata from bytes failed");
+        }
+        this.segmentMetadata = parseLedgerMetadata(metadataBytes);
+
+        for (int i = 0; i < indexEntryCount; i ++) {
+            long entryId = dis.readLong();
+            this.indexEntries.putIfAbsent(entryId, 
OffloadIndexEntryImpl.of(entryId, dis.readInt(), dis.readLong()));
+        }
+
+        return this;
+    }
+
+    public static int getIndexMagicWord() {
+        return INDEX_MAGIC_WORD;
+    }
+
+    @Override
+    public void close() {
+        recycle();
+    }
+
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
new file mode 100644
index 0000000..d74ba93
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+
+/**
+ *
+ * The Index Entry in OffloadIndexBlock.
+ *
+ */
+public class OffloadIndexEntryImpl implements OffloadIndexEntry {
+    public static OffloadIndexEntryImpl of(long entryId, int partId, long 
offset) {
+        return new OffloadIndexEntryImpl(entryId, partId, offset);
+    }
+
+    private final long entryId;
+
+    private final int partId;
+
+    private final long offset;
+
+    @Override
+    public long getEntryId() {
+        return entryId;
+    }
+    @Override
+    public int getPartId() {
+        return partId;
+    }
+    @Override
+    public long getOffset() {
+        return offset;
+    }
+
+    public OffloadIndexEntryImpl(long entryId, int partId, long offset) {
+        this.entryId = entryId;
+        this.partId = partId;
+        this.offset = offset;
+    }
+}
+
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java
new file mode 100644
index 0000000..aeecbee
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
+import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl;
+import org.apache.pulsar.broker.s3offload.impl.OffloadIndexEntryImpl;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class OffloadIndexTest {
+
+    @Test
+    public void offloadIndexEntryImplTest() {
+        // verify OffloadIndexEntryImpl builder
+        OffloadIndexEntryImpl entry1 = OffloadIndexEntryImpl.of(0, 2, 0);
+        OffloadIndexEntryImpl entry2 = OffloadIndexEntryImpl.of(100, 3, 1234);
+
+        // verify OffloadIndexEntryImpl get
+        assertEquals(entry1.getEntryId(), 0L);
+        assertEquals(entry1.getPartId(), 2);
+        assertEquals(entry1.getOffset(), 0L);
+
+        assertEquals(entry2.getEntryId(), 100L);
+        assertEquals(entry2.getPartId(), 3);
+        assertEquals(entry2.getOffset(), 1234L);
+    }
+
+
+    // use mock to setLastEntryId
+    class LedgerMetadataMock extends 
org.apache.bookkeeper.client.LedgerMetadata {
+        long lastId = 0;
+        public LedgerMetadataMock(int ensembleSize, int writeQuorumSize, int 
ackQuorumSize, org.apache.bookkeeper.client.BookKeeper.DigestType digestType, 
byte[] password, Map<String, byte[]> customMetadata, boolean 
storeSystemtimeAsLedgerCreationTime) {
+            super(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, 
password, customMetadata, storeSystemtimeAsLedgerCreationTime);
+        }
+
+        @Override
+        public long getLastEntryId(){
+            return  lastId;
+        }
+
+        public void setLastEntryId(long lastId) {
+            this.lastId = lastId;
+        }
+    }
+
+    private LedgerMetadata createLedgerMetadata() throws Exception {
+
+        Map<String, byte[]> metadataCustom = Maps.newHashMap();
+        metadataCustom.put("key1", "value1".getBytes(UTF_8));
+        metadataCustom.put("key7", "value7".getBytes(UTF_8));
+
+        ArrayList<BookieSocketAddress> bookies = Lists.newArrayList();
+        BookieSocketAddress BOOKIE1 = new 
BookieSocketAddress("127.0.0.1:3181");
+        BookieSocketAddress BOOKIE2 = new 
BookieSocketAddress("127.0.0.2:3181");
+        BookieSocketAddress BOOKIE3 = new 
BookieSocketAddress("127.0.0.3:3181");
+        bookies.add(0, BOOKIE1);
+        bookies.add(1, BOOKIE2);
+        bookies.add(2, BOOKIE3);
+
+        LedgerMetadataMock metadata = new LedgerMetadataMock(3, 3, 2,
+            DigestType.CRC32C, "password".getBytes(UTF_8), metadataCustom, 
false);
+
+        metadata.addEnsemble(0, bookies);
+        metadata.setLastEntryId(5000);
+        return metadata;
+    }
+
+    // prepare metadata, then use builder to build a OffloadIndexBlockImpl
+    // verify get methods, readout and fromStream methods.
+    @Test
+    public void offloadIndexBlockImplTest() throws Exception {
+        OffloadIndexBlockBuilder blockBuilder = 
OffloadIndexBlockBuilder.create();
+        LedgerMetadata metadata = createLedgerMetadata();
+        log.debug("created metadata: {}", metadata.toString());
+
+        blockBuilder.withMetadata(metadata);
+
+        blockBuilder.addBlock(0, 2, 0);
+        blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
+        blockBuilder.addBlock(2000, 4, 64 * 1024 * 1024);
+        OffloadIndexBlock indexBlock = blockBuilder.build();
+
+        // verify getEntryCount and getLedgerMetadata
+        assertEquals(indexBlock.getEntryCount(), 3);
+        assertEquals(indexBlock.getLedgerMetadata(), metadata);
+
+        // verify getIndexEntryForEntry
+        OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(0);
+        assertEquals(entry1.getEntryId(), 0);
+        assertEquals(entry1.getPartId(),2);
+        assertEquals(entry1.getOffset(), 0);
+
+        OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(500);
+        assertEquals(entry11, entry1);
+
+        OffloadIndexEntry entry2 = indexBlock.getIndexEntryForEntry(1000);
+        assertEquals(entry2.getEntryId(), 1000);
+        assertEquals(entry2.getPartId(), 3);
+        assertEquals(entry2.getOffset(), 64 * 1024 * 1024);
+
+        OffloadIndexEntry entry22 = indexBlock.getIndexEntryForEntry(1300);
+        assertEquals(entry22, entry2);
+
+        OffloadIndexEntry entry3 = indexBlock.getIndexEntryForEntry(2000);
+
+        assertEquals(entry3.getEntryId(), 2000);
+        assertEquals(entry3.getPartId(), 4);
+        assertEquals(entry3.getOffset(), 2 * 64 * 1024 * 1024);
+
+        OffloadIndexEntry entry33 = indexBlock.getIndexEntryForEntry(3000);
+        assertEquals(entry33, entry3);
+
+        try {
+            OffloadIndexEntry entry4 = indexBlock.getIndexEntryForEntry(6000);
+            fail("Should throw IndexOutOfBoundsException.");
+        } catch (Exception e) {
+            assertTrue(e instanceof IndexOutOfBoundsException);
+            assertEquals(e.getMessage(), "Entry index: 6000 beyond 
lastEntryId: 5000");
+        }
+
+        // verify toStream
+        InputStream out = indexBlock.toStream();
+        byte b[] = new byte[1024];
+        int readoutLen = out.read(b);
+        out.close();
+        ByteBuf wrapper = Unpooled.wrappedBuffer(b);
+        int magic = wrapper.readInt();
+        int indexBlockLength = wrapper.readInt();
+        int segmentMetadataLength = wrapper.readInt();
+        int indexEntryCount = wrapper.readInt();
+
+        // verify counter
+        assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord());
+        assertEquals(indexBlockLength, readoutLen);
+        assertEquals(indexEntryCount, 3);
+
+        wrapper.readBytes(segmentMetadataLength);
+        log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: 
{}",
+            magic, indexBlockLength, segmentMetadataLength, indexEntryCount);
+
+        // verify entry
+        OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(), wrapper.readLong());
+        OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(), wrapper.readLong());
+        OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(), wrapper.readLong());;
+
+        assertEquals(e1.getEntryId(),entry1.getEntryId());
+        assertEquals(e1.getPartId(), entry1.getPartId());
+        assertEquals(e1.getOffset(), entry1.getOffset());
+        assertEquals(e2.getEntryId(), entry2.getEntryId());
+        assertEquals(e2.getPartId(), entry2.getPartId());
+        assertEquals(e2.getOffset(), entry2.getOffset());
+        assertEquals(e3.getEntryId(), entry3.getEntryId());
+        assertEquals(e3.getPartId(), entry3.getPartId());
+        assertEquals(e3.getOffset(), entry3.getOffset());
+        wrapper.release();
+
+        // verify build OffloadIndexBlock from InputStream
+        InputStream out2 = indexBlock.toStream();
+        int streamLength = out2.available();
+        out2.mark(0);
+        OffloadIndexBlock indexBlock2 = blockBuilder.fromStream(out2);
+        // 1. verify metadata that got from inputstream success.
+        LedgerMetadata metadata2 = indexBlock2.getLedgerMetadata();
+        log.debug("built metadata: {}", metadata2.toString());
+        assertEquals(metadata2.getAckQuorumSize(), 
metadata.getAckQuorumSize());
+        assertEquals(metadata2.getEnsembleSize(), metadata.getEnsembleSize());
+        assertEquals(metadata2.getDigestType(), metadata.getDigestType());
+        assertEquals(metadata2.getAllEnsembles().entrySet(), 
metadata.getAllEnsembles().entrySet());
+        // 2. verify set all the entries
+        assertEquals(indexBlock2.getEntryCount(), indexBlock.getEntryCount());
+        // 3. verify reach end
+        assertEquals(out2.read(), -1);
+
+
+        out2.reset();
+        byte streamContent[] = new byte[streamLength];
+        // stream with all 0, simulate junk data, should throw exception for 
header magic not match.
+        try(InputStream stream3 = new ByteArrayInputStream(streamContent, 0, 
streamLength)) {
+            OffloadIndexBlock indexBlock3 = blockBuilder.fromStream(stream3);
+            fail("Should throw IOException");
+        } catch (Exception e) {
+            assertTrue(e instanceof IOException);
+            assertTrue(e.getMessage().contains("Invalid MagicWord"));
+        }
+
+        // simulate read header too small, throw EOFException.
+        out2.read(streamContent);
+        try(InputStream stream4 =
+                new ByteArrayInputStream(streamContent, 0, streamLength - 1)) {
+            OffloadIndexBlock indexBlock4 = blockBuilder.fromStream(stream4);
+            fail("Should throw EOFException");
+        } catch (Exception e) {
+            assertTrue(e instanceof java.io.EOFException);
+        }
+
+        out2.close();
+        indexBlock.close();
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to