This is an automated email from the ASF dual-hosted git repository.

binlijin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new d21be92fd0c HBASE-27314 Make index block be customized and configured 
(#4718)
d21be92fd0c is described below

commit d21be92fd0c6d9d8db9f770c96a109d80e168272
Author: binlijin <[email protected]>
AuthorDate: Sat Sep 3 15:52:43 2022 +0800

    HBASE-27314 Make index block be customized and configured (#4718)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../hbase/client/ColumnFamilyDescriptor.java       |   4 +
 .../client/ColumnFamilyDescriptorBuilder.java      |  32 ++
 .../client/TestColumnFamilyDescriptorBuilder.java  |   5 +-
 .../hbase/io/encoding/IndexBlockEncoding.java      | 121 ++++++
 .../apache/hadoop/hbase/io/hfile/HFileContext.java |  14 +-
 .../hadoop/hbase/io/hfile/HFileContextBuilder.java |  10 +-
 .../hadoop/hbase/io/hfile/BlockIndexChunk.java     |  54 +++
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java     | 181 +++++---
 .../hbase/io/hfile/HFileIndexBlockEncoder.java     |  74 ++++
 .../hbase/io/hfile/HFileIndexBlockEncoderImpl.java |  85 ++++
 .../apache/hadoop/hbase/io/hfile/HFileInfo.java    |  18 +-
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     |  13 +-
 .../hbase/io/hfile/NoOpIndexBlockEncoder.java      | 481 +++++++++++++++++++++
 .../storefiletracker/StoreFileTrackerBase.java     |   3 +-
 .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java |  11 +-
 .../hadoop/hbase/io/hfile/TestHFileWriterV3.java   |   2 +-
 .../hfile/TestHFileWriterV3WithDataEncoders.java   |   2 +-
 17 files changed, 1029 insertions(+), 81 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
index d73e67ee0fb..6a092a221fd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -107,6 +108,9 @@ public interface ColumnFamilyDescriptor {
   /** Returns the data block encoding algorithm used in block cache and 
optionally on disk */
   DataBlockEncoding getDataBlockEncoding();
 
+  /** Return the index block encoding algorithm used in block cache and 
optionally on disk */
+  IndexBlockEncoding getIndexBlockEncoding();
+
   /** Returns Return the raw crypto key attribute for the family, or null if 
not set */
   byte[] getEncryptionKey();
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
index cf4de2edfec..3c11bef53c7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.HBaseException;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.PrettyPrinter;
@@ -84,6 +85,10 @@ public class ColumnFamilyDescriptorBuilder {
   public static final String DATA_BLOCK_ENCODING = "DATA_BLOCK_ENCODING";
   private static final Bytes DATA_BLOCK_ENCODING_BYTES =
     new Bytes(Bytes.toBytes(DATA_BLOCK_ENCODING));
+  @InterfaceAudience.Private
+  public static final String INDEX_BLOCK_ENCODING = "INDEX_BLOCK_ENCODING";
+  private static final Bytes INDEX_BLOCK_ENCODING_BYTES =
+    new Bytes(Bytes.toBytes(INDEX_BLOCK_ENCODING));
   /**
    * Key for the BLOCKCACHE attribute. A more exact name would be 
CACHE_DATA_ON_READ because this
    * flag sets whether or not we cache DATA blocks. We always cache INDEX and 
BLOOM blocks; caching
@@ -199,6 +204,11 @@ public class ColumnFamilyDescriptorBuilder {
    */
   public static final DataBlockEncoding DEFAULT_DATA_BLOCK_ENCODING = 
DataBlockEncoding.NONE;
 
+  /**
+   * Default index block encoding algorithm.
+   */
+  public static final IndexBlockEncoding DEFAULT_INDEX_BLOCK_ENCODING = 
IndexBlockEncoding.NONE;
+
   /**
    * Default number of versions of a record to keep.
    */
@@ -301,6 +311,7 @@ public class ColumnFamilyDescriptorBuilder {
     DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE));
     DEFAULT_VALUES.put(KEEP_DELETED_CELLS, 
String.valueOf(DEFAULT_KEEP_DELETED));
     DEFAULT_VALUES.put(DATA_BLOCK_ENCODING, 
String.valueOf(DEFAULT_DATA_BLOCK_ENCODING));
+    DEFAULT_VALUES.put(INDEX_BLOCK_ENCODING, 
String.valueOf(DEFAULT_INDEX_BLOCK_ENCODING));
     // Do NOT add this key/value by default. NEW_VERSION_BEHAVIOR is NOT 
defined in hbase1 so
     // it is not possible to make an hbase1 HCD the same as an hbase2 HCD and 
so the replication
     // compare of schemas will fail. It is OK not adding the below to the 
initial map because of
@@ -501,6 +512,11 @@ public class ColumnFamilyDescriptorBuilder {
     return this;
   }
 
+  public ColumnFamilyDescriptorBuilder 
setIndexBlockEncoding(IndexBlockEncoding value) {
+    desc.setIndexBlockEncoding(value);
+    return this;
+  }
+
   public ColumnFamilyDescriptorBuilder setEncryptionKey(final byte[] value) {
     desc.setEncryptionKey(value);
     return this;
@@ -827,6 +843,22 @@ public class ColumnFamilyDescriptorBuilder {
         type == null ? DataBlockEncoding.NONE.name() : type.name());
     }
 
+    @Override
+    public IndexBlockEncoding getIndexBlockEncoding() {
+      return getStringOrDefault(INDEX_BLOCK_ENCODING_BYTES,
+        n -> IndexBlockEncoding.valueOf(n.toUpperCase()), 
IndexBlockEncoding.NONE);
+    }
+
+    /**
+     * Set index block encoding algorithm used in block cache.
+     * @param type What kind of index block encoding will be used.
+     * @return this (for chained invocation)
+     */
+    public ModifyableColumnFamilyDescriptor 
setIndexBlockEncoding(IndexBlockEncoding type) {
+      return setValue(INDEX_BLOCK_ENCODING_BYTES,
+        type == null ? IndexBlockEncoding.NONE.name() : type.name());
+    }
+
     /**
      * Set whether the tags should be compressed along with DataBlockEncoding. 
When no
      * DataBlockEncoding is been used, this is having no effect. n * @return 
this (for chained
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestColumnFamilyDescriptorBuilder.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestColumnFamilyDescriptorBuilder.java
index a0869c4bdfe..b7323879308 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestColumnFamilyDescriptorBuilder.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestColumnFamilyDescriptorBuilder.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.exceptions.HBaseException;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -219,7 +220,7 @@ public class TestColumnFamilyDescriptorBuilder {
   @Test
   public void testDefaultBuilder() {
     final Map<String, String> defaultValueMap = 
ColumnFamilyDescriptorBuilder.getDefaultValues();
-    assertEquals(defaultValueMap.size(), 11);
+    assertEquals(defaultValueMap.size(), 12);
     
assertEquals(defaultValueMap.get(ColumnFamilyDescriptorBuilder.BLOOMFILTER),
       BloomType.ROW.toString());
     
assertEquals(defaultValueMap.get(ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE),
 "0");
@@ -239,6 +240,8 @@ public class TestColumnFamilyDescriptorBuilder {
       KeepDeletedCells.FALSE.toString());
     
assertEquals(defaultValueMap.get(ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING),
       DataBlockEncoding.NONE.toString());
+    
assertEquals(defaultValueMap.get(ColumnFamilyDescriptorBuilder.INDEX_BLOCK_ENCODING),
+      IndexBlockEncoding.NONE.toString());
   }
 
   @Test
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/IndexBlockEncoding.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/IndexBlockEncoding.java
new file mode 100644
index 00000000000..ed97147ac9b
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/IndexBlockEncoding.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Provide access to all index block encoding algorithms. All of the 
algorithms are required to have
+ * unique id which should <b>NEVER</b> be changed. If you want to add a new 
algorithm/version,
+ * assign it a new id. Announce the new id in the HBase mailing list to 
prevent collisions.
+ */
[email protected]
+public enum IndexBlockEncoding {
+
+  /** Disable index block encoding. */
+  NONE(0, null),
+  // id 1 is reserved for the PREFIX_TREE algorithm to be added later
+  PREFIX_TREE(1, null);
+
+  private final short id;
+  private final byte[] idInBytes;
+  private final String encoderCls;
+
+  public static final int ID_SIZE = Bytes.SIZEOF_SHORT;
+
+  /** Maps data block encoding ids to enum instances. */
+  private static IndexBlockEncoding[] idArray = new 
IndexBlockEncoding[Byte.MAX_VALUE + 1];
+
+  static {
+    for (IndexBlockEncoding algo : values()) {
+      if (idArray[algo.id] != null) {
+        throw new RuntimeException(
+          String.format("Two data block encoder algorithms '%s' and '%s' have 
" + "the same id %d",
+            idArray[algo.id].toString(), algo.toString(), (int) algo.id));
+      }
+      idArray[algo.id] = algo;
+    }
+  }
+
+  private IndexBlockEncoding(int id, String encoderClsName) {
+    if (id < 0 || id > Byte.MAX_VALUE) {
+      throw new AssertionError("Data block encoding algorithm id is out of 
range: " + id);
+    }
+    this.id = (short) id;
+    this.idInBytes = Bytes.toBytes(this.id);
+    if (idInBytes.length != ID_SIZE) {
+      // White this may seem redundant, if we accidentally serialize
+      // the id as e.g. an int instead of a short, all encoders will break.
+      throw new RuntimeException("Unexpected length of encoder ID byte " + 
"representation: "
+        + Bytes.toStringBinary(idInBytes));
+    }
+    this.encoderCls = encoderClsName;
+  }
+
+  /** Returns name converted to bytes. */
+  public byte[] getNameInBytes() {
+    return Bytes.toBytes(toString());
+  }
+
+  /** Returns The id of a data block encoder. */
+  public short getId() {
+    return id;
+  }
+
+  /**
+   * Writes id in bytes.
+   * @param stream where the id should be written.
+   */
+  public void writeIdInBytes(OutputStream stream) throws IOException {
+    stream.write(idInBytes);
+  }
+
+  /**
+   * Writes id bytes to the given array starting from offset.
+   * @param dest   output array
+   * @param offset starting offset of the output array n
+   */
+  public void writeIdInBytes(byte[] dest, int offset) throws IOException {
+    System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE);
+  }
+
+  /**
+   * Find and return the name of data block encoder for the given id.
+   * @param encoderId id of data block encoder
+   * @return name, same as used in options in column family
+   */
+  public static String getNameFromId(short encoderId) {
+    return getEncodingById(encoderId).toString();
+  }
+
+  public static IndexBlockEncoding getEncodingById(short indexBlockEncodingId) 
{
+    IndexBlockEncoding algorithm = null;
+    if (indexBlockEncodingId >= 0 && indexBlockEncodingId <= Byte.MAX_VALUE) {
+      algorithm = idArray[indexBlockEncodingId];
+    }
+    if (algorithm == null) {
+      throw new IllegalArgumentException(String
+        .format("There is no index block encoder for given id '%d'", (int) 
indexBlockEncodingId));
+    }
+    return algorithm;
+  }
+
+}
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
index d152ec1900b..b371cf83867 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -58,6 +59,7 @@ public class HFileContext implements HeapSize, Cloneable {
   /** Number of uncompressed bytes we allow per block. */
   private int blockSize = HConstants.DEFAULT_BLOCKSIZE;
   private DataBlockEncoding encoding = DataBlockEncoding.NONE;
+  private IndexBlockEncoding indexBlockEncoding = IndexBlockEncoding.NONE;
   /** Encryption algorithm and key used */
   private Encryption.Context cryptoContext = Encryption.Context.NONE;
   private long fileCreateTime;
@@ -89,13 +91,14 @@ public class HFileContext implements HeapSize, Cloneable {
     this.columnFamily = context.columnFamily;
     this.tableName = context.tableName;
     this.cellComparator = context.cellComparator;
+    this.indexBlockEncoding = context.indexBlockEncoding;
   }
 
   HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean 
includesTags,
     Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType 
checksumType,
     int bytesPerChecksum, int blockSize, DataBlockEncoding encoding,
     Encryption.Context cryptoContext, long fileCreateTime, String hfileName, 
byte[] columnFamily,
-    byte[] tableName, CellComparator cellComparator) {
+    byte[] tableName, CellComparator cellComparator, IndexBlockEncoding 
indexBlockEncoding) {
     this.usesHBaseChecksum = useHBaseChecksum;
     this.includesMvcc = includesMvcc;
     this.includesTags = includesTags;
@@ -107,6 +110,9 @@ public class HFileContext implements HeapSize, Cloneable {
     if (encoding != null) {
       this.encoding = encoding;
     }
+    if (indexBlockEncoding != null) {
+      this.indexBlockEncoding = indexBlockEncoding;
+    }
     this.cryptoContext = cryptoContext;
     this.fileCreateTime = fileCreateTime;
     this.hfileName = hfileName;
@@ -186,6 +192,10 @@ public class HFileContext implements HeapSize, Cloneable {
     return encoding;
   }
 
+  public IndexBlockEncoding getIndexBlockEncoding() {
+    return indexBlockEncoding;
+  }
+
   public Encryption.Context getEncryptionContext() {
     return cryptoContext;
   }
@@ -253,6 +263,8 @@ public class HFileContext implements HeapSize, Cloneable {
     sb.append(blockSize);
     sb.append(", encoding=");
     sb.append(encoding);
+    sb.append(", indexBlockEncoding=");
+    sb.append(indexBlockEncoding);
     sb.append(", includesMvcc=");
     sb.append(includesMvcc);
     sb.append(", includesTags=");
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
index 97aea8ac0eb..f2131951000 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -50,6 +51,8 @@ public class HFileContextBuilder {
   /** Number of uncompressed bytes we allow per block. */
   private int blockSize = HConstants.DEFAULT_BLOCKSIZE;
   private DataBlockEncoding encoding = DataBlockEncoding.NONE;
+  /** the index block encoding type **/
+  private IndexBlockEncoding indexBlockEncoding = IndexBlockEncoding.NONE;
   /** Crypto context */
   private Encryption.Context cryptoContext = Encryption.Context.NONE;
   private long fileCreateTime = 0;
@@ -128,6 +131,11 @@ public class HFileContextBuilder {
     return this;
   }
 
+  public HFileContextBuilder withIndexBlockEncoding(IndexBlockEncoding 
indexBlockEncoding) {
+    this.indexBlockEncoding = indexBlockEncoding;
+    return this;
+  }
+
   public HFileContextBuilder withEncryptionContext(Encryption.Context 
cryptoContext) {
     this.cryptoContext = cryptoContext;
     return this;
@@ -161,6 +169,6 @@ public class HFileContextBuilder {
   public HFileContext build() {
     return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, 
compression,
       compressTags, checkSumType, bytesPerChecksum, blockSize, encoding, 
cryptoContext,
-      fileCreateTime, hfileName, columnFamily, tableName, cellComparator);
+      fileCreateTime, hfileName, columnFamily, tableName, cellComparator, 
indexBlockEncoding);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIndexChunk.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIndexChunk.java
new file mode 100644
index 00000000000..221b60bca92
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIndexChunk.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public interface BlockIndexChunk {
+
+  List<byte[]> getBlockKeys();
+
+  List<Integer> getSecondaryIndexOffsetMarks();
+
+  int getEntryBySubEntry(long k);
+
+  void add(byte[] firstKey, long blockOffset, int onDiskDataSize);
+
+  void add(byte[] firstKey, long blockOffset, int onDiskDataSize, long 
curTotalNumSubEntries);
+
+  int getRootSize();
+
+  int getCurTotalNonRootEntrySize();
+
+  int getNonRootSize();
+
+  int getNumEntries();
+
+  byte[] getBlockKey(int i);
+
+  long getBlockOffset(int i);
+
+  int getOnDiskDataSize(int i);
+
+  byte[] getMidKeyMetadata() throws IOException;
+
+  void clear();
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 0e886d9873c..6e72890be12 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -100,7 +100,7 @@ public class HFileBlockIndex {
    * the middle leaf-level index block offset (long), its on-disk size without 
header included
    * (int), and the mid-key entry's zero-based index in that leaf index block.
    */
-  private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG + 2 * 
Bytes.SIZEOF_INT;
+  protected static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG + 2 * 
Bytes.SIZEOF_INT;
 
   /**
    * An implementation of the BlockIndexReader that deals with block keys 
which are plain byte[]
@@ -225,7 +225,7 @@ public class HFileBlockIndex {
     /** Pre-computed mid-key */
     private AtomicReference<Cell> midKey = new AtomicReference<>();
     /** Needed doing lookup on blocks. */
-    private CellComparator comparator;
+    protected CellComparator comparator;
 
     public CellBasedKeyBlockIndexReader(final CellComparator c, final int 
treeLevel) {
       // Can be null for METAINDEX block
@@ -483,6 +483,82 @@ public class HFileBlockIndex {
     }
   }
 
+  static class CellBasedKeyBlockIndexReaderV2 extends 
CellBasedKeyBlockIndexReader {
+
+    private HFileIndexBlockEncoder indexBlockEncoder;
+
+    private HFileIndexBlockEncoder.EncodedSeeker seeker;
+
+    public CellBasedKeyBlockIndexReaderV2(final CellComparator c, final int 
treeLevel) {
+      this(c, treeLevel, null);
+    }
+
+    public CellBasedKeyBlockIndexReaderV2(final CellComparator c, final int 
treeLevel,
+      HFileIndexBlockEncoder indexBlockEncoder) {
+      super(c, treeLevel);
+      // Can be null for METAINDEX block
+      this.indexBlockEncoder =
+        indexBlockEncoder != null ? indexBlockEncoder : 
NoOpIndexBlockEncoder.INSTANCE;
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return seeker.isEmpty();
+    }
+
+    @Override
+    public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock 
currentBlock,
+      boolean cacheBlocks, boolean pread, boolean isCompaction,
+      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader 
cachingBlockReader)
+      throws IOException {
+      return seeker.loadDataBlockWithScanInfo(key, currentBlock, cacheBlocks, 
pread, isCompaction,
+        expectedDataBlockEncoding, cachingBlockReader);
+    }
+
+    @Override
+    public Cell midkey(CachingBlockReader cachingBlockReader) throws 
IOException {
+      return seeker.midkey(cachingBlockReader);
+    }
+
+    /**
+     * n * from 0 to {@link #getRootBlockCount() - 1}
+     */
+    public Cell getRootBlockKey(int i) {
+      return seeker.getRootBlockKey(i);
+    }
+
+    @Override
+    public int getRootBlockCount() {
+      return seeker.getRootBlockCount();
+    }
+
+    @Override
+    public int rootBlockContainingKey(Cell key) {
+      return seeker.rootBlockContainingKey(key);
+    }
+
+    @Override
+    protected long calculateHeapSizeForBlockKeys(long heapSize) {
+      heapSize = super.calculateHeapSizeForBlockKeys(heapSize);
+      if (seeker != null) {
+        heapSize += ClassSize.REFERENCE;
+        heapSize += ClassSize.align(seeker.heapSize());
+      }
+      return heapSize;
+    }
+
+    @Override
+    public void readMultiLevelIndexRoot(HFileBlock blk, final int numEntries) 
throws IOException {
+      seeker = indexBlockEncoder.createSeeker();
+      seeker.initRootIndex(blk, numEntries, comparator, searchTreeLevel);
+    }
+
+    @Override
+    public String toString() {
+      return seeker.toString();
+    }
+  }
+
   /**
    * The reader will always hold the root level index in the memory. Index 
blocks at all other
    * levels will be cached in the LRU cache in practice, although this API 
does not enforce that.
@@ -863,13 +939,13 @@ public class HFileBlockIndex {
      * block index. After all levels of the index were written by
      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final 
root-level index.
      */
-    private BlockIndexChunk rootChunk = new BlockIndexChunk();
+    private BlockIndexChunk rootChunk = new BlockIndexChunkImpl();
 
     /**
      * Current leaf-level chunk. New entries referencing data blocks get added 
to this chunk until
      * it grows large enough to be written to disk.
      */
-    private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
+    private BlockIndexChunk curInlineChunk = new BlockIndexChunkImpl();
 
     /**
      * The number of block index levels. This is one if there is only root 
level (even empty), two
@@ -910,9 +986,12 @@ public class HFileBlockIndex {
     /** Name to use for computing cache keys */
     private String nameForCaching;
 
+    /** Type of encoding used for index blocks in HFile */
+    private HFileIndexBlockEncoder indexBlockEncoder;
+
     /** Creates a single-level block index writer */
     public BlockIndexWriter() {
-      this(null, null, null);
+      this(null, null, null, null);
       singleLevelOnly = true;
     }
 
@@ -922,7 +1001,7 @@ public class HFileBlockIndex {
      * @param cacheConf   used to determine when and how a block should be 
cached-on-write.
      */
     public BlockIndexWriter(HFileBlock.Writer blockWriter, CacheConfig 
cacheConf,
-      String nameForCaching) {
+      String nameForCaching, HFileIndexBlockEncoder indexBlockEncoder) {
       if ((cacheConf == null) != (nameForCaching == null)) {
         throw new IllegalArgumentException(
           "Block cache and file name for " + "caching must be both specified 
or both null");
@@ -933,6 +1012,8 @@ public class HFileBlockIndex {
       this.nameForCaching = nameForCaching;
       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
       this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
+      this.indexBlockEncoder =
+        indexBlockEncoder != null ? indexBlockEncoder : 
NoOpIndexBlockEncoder.INSTANCE;
     }
 
     public void setMaxChunkSize(int maxChunkSize) {
@@ -989,7 +1070,7 @@ public class HFileBlockIndex {
 
       {
         DataOutput blockStream = 
blockWriter.startWriting(BlockType.ROOT_INDEX);
-        rootChunk.writeRoot(blockStream);
+        indexBlockEncoder.encode(rootChunk, true, blockStream);
         if (midKeyMetadata != null) blockStream.write(midKeyMetadata);
         blockWriter.writeHeaderAndData(out);
         if (cacheConf != null) {
@@ -1030,13 +1111,13 @@ public class HFileBlockIndex {
         throw new IOException("Root-level entries already added in " + 
"single-level mode");
 
       rootChunk = curInlineChunk;
-      curInlineChunk = new BlockIndexChunk();
+      curInlineChunk = new BlockIndexChunkImpl();
 
       if (LOG.isTraceEnabled()) {
         LOG.trace("Wrote a single-level " + description + " index with " + 
rootChunk.getNumEntries()
           + " entries, " + rootChunk.getRootSize() + " bytes");
       }
-      rootChunk.writeRoot(out);
+      indexBlockEncoder.encode(rootChunk, true, out);
     }
 
     /**
@@ -1050,10 +1131,10 @@ public class HFileBlockIndex {
     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
       BlockIndexChunk currentLevel) throws IOException {
       // Entries referencing intermediate-level blocks we are about to create.
-      BlockIndexChunk parent = new BlockIndexChunk();
+      BlockIndexChunk parent = new BlockIndexChunkImpl();
 
       // The current intermediate-level block index chunk.
-      BlockIndexChunk curChunk = new BlockIndexChunk();
+      BlockIndexChunk curChunk = new BlockIndexChunkImpl();
 
       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
         curChunk.add(currentLevel.getBlockKey(i), 
currentLevel.getBlockOffset(i),
@@ -1078,7 +1159,7 @@ public class HFileBlockIndex {
       BlockIndexChunk curChunk) throws IOException {
       long beginOffset = out.getPos();
       DataOutputStream dos = 
blockWriter.startWriting(BlockType.INTERMEDIATE_INDEX);
-      curChunk.writeNonRoot(dos);
+      indexBlockEncoder.encode(curChunk, false, dos);
       byte[] curFirstKey = curChunk.getBlockKey(0);
       blockWriter.writeHeaderAndData(out);
 
@@ -1172,7 +1253,7 @@ public class HFileBlockIndex {
 
       // Write the inline block index to the output stream in the non-root
       // index block format.
-      curInlineChunk.writeNonRoot(out);
+      indexBlockEncoder.encode(curInlineChunk, false, out);
 
       // Save the first key of the inline block so that we can add it to the
       // parent-level index.
@@ -1266,7 +1347,7 @@ public class HFileBlockIndex {
    * A single chunk of the block index in the process of writing. The data in 
this chunk can become
    * a leaf-level, intermediate-level, or root index block.
    */
-  static class BlockIndexChunk {
+  static class BlockIndexChunkImpl implements BlockIndexChunk {
 
     /** First keys of the key range corresponding to each index entry. */
     private final List<byte[]> blockKeys = new ArrayList<>();
@@ -1313,7 +1394,9 @@ public class HFileBlockIndex {
      *                              leaf-level chunks, including the one 
corresponding to the
      *                              second-level entry being added.
      */
-    void add(byte[] firstKey, long blockOffset, int onDiskDataSize, long 
curTotalNumSubEntries) {
+    @Override
+    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
+      long curTotalNumSubEntries) {
       // Record the offset for the secondary index
       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD + 
firstKey.length;
@@ -1341,10 +1424,12 @@ public class HFileBlockIndex {
      * account. Used for single-level indexes.
      * @see #add(byte[], long, int, long)
      */
+    @Override
     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
       add(firstKey, blockOffset, onDiskDataSize, -1);
     }
 
+    @Override
     public void clear() {
       blockKeys.clear();
       blockOffsets.clear();
@@ -1366,6 +1451,7 @@ public class HFileBlockIndex {
      * @param k sub-entry index, from 0 to the total number sub-entries - 1
      * @return the 0-based index of the entry corresponding to the given 
sub-entry
      */
+    @Override
     public int getEntryBySubEntry(long k) {
       // We define mid-key as the key corresponding to k'th sub-entry
       // (0-based).
@@ -1387,6 +1473,7 @@ public class HFileBlockIndex {
      * @return a few serialized fields for finding the mid-key
      * @throws IOException if could not create metadata for computing mid-key
      */
+    @Override
     public byte[] getMidKeyMetadata() throws IOException {
       ByteArrayOutputStream baos = new 
ByteArrayOutputStream(MID_KEY_METADATA_SIZE);
       DataOutputStream baosDos = new DataOutputStream(baos);
@@ -1422,62 +1509,32 @@ public class HFileBlockIndex {
       return baos.toByteArray();
     }
 
-    /**
-     * Writes the block index chunk in the non-root index block format. This 
format contains the
-     * number of entries, an index of integer offsets for quick binary search 
on variable-length
-     * records, and tuples of block offset, on-disk block size, and the first 
key for each entry. nn
-     */
-    void writeNonRoot(DataOutput out) throws IOException {
-      // The number of entries in the block.
-      out.writeInt(blockKeys.size());
-
-      if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
-        throw new IOException("Corrupted block index chunk writer: " + 
blockKeys.size()
-          + " entries but " + secondaryIndexOffsetMarks.size() + " secondary 
index items");
-      }
-
-      // For each entry, write a "secondary index" of relative offsets to the
-      // entries from the end of the secondary index. This works, because at
-      // read time we read the number of entries and know where the secondary
-      // index ends.
-      for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
-        out.writeInt(currentSecondaryIndex);
-
-      // We include one other element in the secondary index to calculate the
-      // size of each entry more easily by subtracting secondary index 
elements.
-      out.writeInt(curTotalNonRootEntrySize);
-
-      for (int i = 0; i < blockKeys.size(); ++i) {
-        out.writeLong(blockOffsets.get(i));
-        out.writeInt(onDiskDataSizes.get(i));
-        out.write(blockKeys.get(i));
-      }
-    }
-
     /** Returns the size of this chunk if stored in the non-root index block 
format */
-    int getNonRootSize() {
+    @Override
+    public int getNonRootSize() {
       return Bytes.SIZEOF_INT // Number of entries
         + Bytes.SIZEOF_INT * (blockKeys.size() + 1) // Secondary index
         + curTotalNonRootEntrySize; // All entries
     }
 
-    /**
-     * Writes this chunk into the given output stream in the root block index 
format. This format is
-     * similar to the {@link HFile} version 1 block index format, except that 
we store on-disk size
-     * of the block instead of its uncompressed size.
-     * @param out the data output stream to write the block index to. 
Typically a stream writing
-     *            into an {@link HFile} block. n
-     */
-    void writeRoot(DataOutput out) throws IOException {
-      for (int i = 0; i < blockKeys.size(); ++i) {
-        out.writeLong(blockOffsets.get(i));
-        out.writeInt(onDiskDataSizes.get(i));
-        Bytes.writeByteArray(out, blockKeys.get(i));
-      }
+    @Override
+    public int getCurTotalNonRootEntrySize() {
+      return curTotalNonRootEntrySize;
+    }
+
+    @Override
+    public List<byte[]> getBlockKeys() {
+      return blockKeys;
+    }
+
+    @Override
+    public List<Integer> getSecondaryIndexOffsetMarks() {
+      return secondaryIndexOffsetMarks;
     }
 
     /** Returns the size of this chunk if stored in the root index block 
format */
-    int getRootSize() {
+    @Override
+    public int getRootSize() {
       return curTotalRootSize;
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileIndexBlockEncoder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileIndexBlockEncoder.java
new file mode 100644
index 00000000000..a84204cadf1
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileIndexBlockEncoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Controls what kind of index block encoding is used. If index block encoding 
is not set or the
+ * given block is not a index block (encoded or not), methods should just 
return the unmodified
+ * block.
+ */
[email protected]
+public interface HFileIndexBlockEncoder {
+  /** Type of encoding used for index blocks in HFile. Stored in file info. */
+  byte[] INDEX_BLOCK_ENCODING = Bytes.toBytes("INDEX_BLOCK_ENCODING");
+
+  /**
+   * Save metadata in HFile which will be written to disk
+   * @param writer writer for a given HFile
+   * @exception IOException on disk problems
+   */
+  void saveMetadata(HFile.Writer writer) throws IOException;
+
+  void encode(BlockIndexChunk blockIndexChunk, boolean rootIndexBlock, 
DataOutput out)
+    throws IOException;
+
+  /** Returns the index block encoding */
+  IndexBlockEncoding getIndexBlockEncoding();
+
+  EncodedSeeker createSeeker();
+
+  interface EncodedSeeker extends HeapSize {
+    void initRootIndex(HFileBlock blk, int numEntries, CellComparator 
comparator, int treeLevel)
+      throws IOException;
+
+    boolean isEmpty();
+
+    Cell getRootBlockKey(int i);
+
+    int getRootBlockCount();
+
+    Cell midkey(HFile.CachingBlockReader cachingBlockReader) throws 
IOException;
+
+    int rootBlockContainingKey(Cell key);
+
+    BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock 
currentBlock,
+      boolean cacheBlocks, boolean pread, boolean isCompaction,
+      DataBlockEncoding expectedDataBlockEncoding, HFile.CachingBlockReader 
cachingBlockReader)
+      throws IOException;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileIndexBlockEncoderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileIndexBlockEncoderImpl.java
new file mode 100644
index 00000000000..f3bdb10b95c
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileIndexBlockEncoderImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Do different kinds of index block encoding according to column family 
options.
+ */
[email protected]
+public class HFileIndexBlockEncoderImpl implements HFileIndexBlockEncoder {
+  private final IndexBlockEncoding indexBlockEncoding;
+
+  /**
+   * Do index block encoding with specified options.
+   * @param encoding What kind of data block encoding will be used.
+   */
+  public HFileIndexBlockEncoderImpl(IndexBlockEncoding encoding) {
+    this.indexBlockEncoding = encoding != null ? encoding : 
IndexBlockEncoding.NONE;
+  }
+
+  public static HFileIndexBlockEncoder createFromFileInfo(HFileInfo fileInfo) 
throws IOException {
+    IndexBlockEncoding encoding = IndexBlockEncoding.NONE;
+    byte[] dataBlockEncodingType = fileInfo.get(INDEX_BLOCK_ENCODING);
+    if (dataBlockEncodingType != null) {
+      String dataBlockEncodingStr = Bytes.toString(dataBlockEncodingType);
+      try {
+        encoding = IndexBlockEncoding.valueOf(dataBlockEncodingStr);
+      } catch (IllegalArgumentException ex) {
+        throw new IOException(
+          "Invalid data block encoding type in file info: " + 
dataBlockEncodingStr, ex);
+      }
+    }
+
+    if (encoding == IndexBlockEncoding.NONE) {
+      return NoOpIndexBlockEncoder.INSTANCE;
+    }
+    return new HFileIndexBlockEncoderImpl(encoding);
+  }
+
+  @Override
+  public void saveMetadata(HFile.Writer writer) throws IOException {
+    writer.appendFileInfo(INDEX_BLOCK_ENCODING, 
indexBlockEncoding.getNameInBytes());
+  }
+
+  @Override
+  public IndexBlockEncoding getIndexBlockEncoding() {
+    return indexBlockEncoding;
+  }
+
+  @Override
+  public void encode(BlockIndexChunk blockIndexChunk, boolean rootIndexBlock, 
DataOutput out)
+    throws IOException {
+    // TODO
+  }
+
+  @Override
+  public EncodedSeeker createSeeker() {
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(indexBlockEncoding=" + 
indexBlockEncoding + ")";
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
index 104d9a679d8..032fa054f86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
@@ -364,17 +364,21 @@ public class HFileInfo implements SortedMap<byte[], 
byte[]> {
         context.getFileSize() - trailer.getTrailerSize());
       // Data index. We also read statistics about the block index written 
after
       // the root level.
-      this.dataIndexReader = new HFileBlockIndex.CellBasedKeyBlockIndexReader(
-        trailer.createComparator(), trailer.getNumDataIndexLevels());
-      dataIndexReader.readMultiLevelIndexRoot(
-        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), 
trailer.getDataIndexCount());
+      HFileBlock dataBlockRootIndex = 
blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
+      HFileBlock metaBlockIndex = 
blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
+      loadMetaInfo(blockIter, hfileContext);
+
+      HFileIndexBlockEncoder indexBlockEncoder =
+        HFileIndexBlockEncoderImpl.createFromFileInfo(this);
+      this.dataIndexReader = new 
HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(
+        trailer.createComparator(), trailer.getNumDataIndexLevels(), 
indexBlockEncoder);
+      dataIndexReader.readMultiLevelIndexRoot(dataBlockRootIndex, 
trailer.getDataIndexCount());
       reader.setDataBlockIndexReader(dataIndexReader);
       // Meta index.
       this.metaIndexReader = new 
HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
-      
metaIndexReader.readRootIndex(blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
-        trailer.getMetaIndexCount());
+      metaIndexReader.readRootIndex(metaBlockIndex, 
trailer.getMetaIndexCount());
       reader.setMetaBlockIndexReader(metaIndexReader);
-      loadMetaInfo(blockIter, hfileContext);
+
       
reader.setDataBlockEncoder(HFileDataBlockEncoderImpl.createFromFileInfo(this));
       // Load-On-Open info
       HFileBlock b;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index d58be5fd1ce..6b7cf3caaa3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
@@ -124,6 +125,8 @@ public class HFileWriterImpl implements HFile.Writer {
    */
   protected final HFileDataBlockEncoder blockEncoder;
 
+  protected final HFileIndexBlockEncoder indexBlockEncoder;
+
   protected final HFileContext hFileContext;
 
   private int maxTagsLength = 0;
@@ -172,6 +175,12 @@ public class HFileWriterImpl implements HFile.Writer {
     } else {
       this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
     }
+    IndexBlockEncoding indexBlockEncoding = 
hFileContext.getIndexBlockEncoding();
+    if (indexBlockEncoding != IndexBlockEncoding.NONE) {
+      this.indexBlockEncoder = new 
HFileIndexBlockEncoderImpl(indexBlockEncoding);
+    } else {
+      this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE;
+    }
     closeOutputStream = path != null;
     this.cacheConf = cacheConf;
     float encodeBlockSizeRatio = 
conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
@@ -299,7 +308,7 @@ public class HFileWriterImpl implements HFile.Writer {
     // Data block index writer
     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
-      cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : 
null);
+      cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : 
null, indexBlockEncoder);
     
dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
     
dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
     inlineBlockWriters.add(dataBlockIndexWriter);
@@ -590,6 +599,8 @@ public class HFileWriterImpl implements HFile.Writer {
     }
     // Save data block encoder metadata in the file info.
     blockEncoder.saveMetadata(this);
+    // Save index block encoder metadata in the file info.
+    indexBlockEncoder.saveMetadata(this);
     // Write out the end of the data blocks, then write meta data blocks.
     // followed by fileinfo, data block index and meta block index.
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
new file mode 100644
index 00000000000..20cb21c1b14
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static 
org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MID_KEY_METADATA_SIZE;
+import static 
org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.SECONDARY_INDEX_ENTRY_OVERHEAD;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.ObjectIntPair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Does not perform any kind of encoding/decoding.
+ */
[email protected]
+public class NoOpIndexBlockEncoder implements HFileIndexBlockEncoder {
+
+  public static final NoOpIndexBlockEncoder INSTANCE = new 
NoOpIndexBlockEncoder();
+
+  /** Cannot be instantiated. Use {@link #INSTANCE} instead. */
+  private NoOpIndexBlockEncoder() {
+  }
+
+  @Override
+  public void saveMetadata(HFile.Writer writer) {
+  }
+
+  @Override
+  public void encode(BlockIndexChunk blockIndexChunk, boolean rootIndexBlock, 
DataOutput out)
+    throws IOException {
+    if (rootIndexBlock) {
+      writeRoot(blockIndexChunk, out);
+    } else {
+      writeNonRoot(blockIndexChunk, out);
+    }
+  }
+
+  /**
+   * Writes the block index chunk in the non-root index block format. This 
format contains the
+   * number of entries, an index of integer offsets for quick binary search on 
variable-length
+   * records, and tuples of block offset, on-disk block size, and the first 
key for each entry. nn
+   */
+  private void writeNonRoot(BlockIndexChunk blockIndexChunk, DataOutput out) 
throws IOException {
+    // The number of entries in the block.
+    out.writeInt(blockIndexChunk.getNumEntries());
+
+    if (
+      blockIndexChunk.getSecondaryIndexOffsetMarks().size() != 
blockIndexChunk.getBlockKeys().size()
+    ) {
+      throw new IOException("Corrupted block index chunk writer: "
+        + blockIndexChunk.getBlockKeys().size() + " entries but "
+        + blockIndexChunk.getSecondaryIndexOffsetMarks().size() + " secondary 
index items");
+    }
+
+    // For each entry, write a "secondary index" of relative offsets to the
+    // entries from the end of the secondary index. This works, because at
+    // read time we read the number of entries and know where the secondary
+    // index ends.
+    for (int currentSecondaryIndex : 
blockIndexChunk.getSecondaryIndexOffsetMarks())
+      out.writeInt(currentSecondaryIndex);
+
+    // We include one other element in the secondary index to calculate the
+    // size of each entry more easily by subtracting secondary index elements.
+    out.writeInt(blockIndexChunk.getCurTotalNonRootEntrySize());
+
+    for (int i = 0; i < blockIndexChunk.getNumEntries(); ++i) {
+      out.writeLong(blockIndexChunk.getBlockOffset(i));
+      out.writeInt(blockIndexChunk.getOnDiskDataSize(i));
+      out.write(blockIndexChunk.getBlockKey(i));
+    }
+  }
+
+  /**
+   * Writes this chunk into the given output stream in the root block index 
format. This format is
+   * similar to the {@link HFile} version 1 block index format, except that we 
store on-disk size
+   * of the block instead of its uncompressed size.
+   * @param out the data output stream to write the block index to. Typically 
a stream writing
+   *            into an {@link HFile} block. n
+   */
+  private void writeRoot(BlockIndexChunk blockIndexChunk, DataOutput out) 
throws IOException {
+    for (int i = 0; i < blockIndexChunk.getNumEntries(); ++i) {
+      out.writeLong(blockIndexChunk.getBlockOffset(i));
+      out.writeInt(blockIndexChunk.getOnDiskDataSize(i));
+      Bytes.writeByteArray(out, blockIndexChunk.getBlockKey(i));
+    }
+  }
+
+  @Override
+  public IndexBlockEncoding getIndexBlockEncoding() {
+    return IndexBlockEncoding.NONE;
+  }
+
+  @Override
+  public EncodedSeeker createSeeker() {
+    return new NoOpEncodedSeeker();
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+  protected static class NoOpEncodedSeeker implements EncodedSeeker {
+
+    protected long[] blockOffsets;
+    protected int[] blockDataSizes;
+    protected int rootCount = 0;
+
+    // Mid-key metadata.
+    protected long midLeafBlockOffset = -1;
+    protected int midLeafBlockOnDiskSize = -1;
+    protected int midKeyEntry = -1;
+
+    private Cell[] blockKeys;
+    private CellComparator comparator;
+    protected int searchTreeLevel;
+
+    /** Pre-computed mid-key */
+    private AtomicReference<Cell> midKey = new AtomicReference<>();
+
+    @Override
+    public long heapSize() {
+      long heapSize = ClassSize.align(ClassSize.OBJECT);
+
+      // Mid-key metadata.
+      heapSize += MID_KEY_METADATA_SIZE;
+
+      if (blockOffsets != null) {
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length * 
Bytes.SIZEOF_LONG);
+      }
+
+      if (blockDataSizes != null) {
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length * 
Bytes.SIZEOF_INT);
+      }
+
+      if (blockKeys != null) {
+        heapSize += ClassSize.REFERENCE;
+        // Adding array + references overhead
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * 
ClassSize.REFERENCE);
+
+        // Adding blockKeys
+        for (Cell key : blockKeys) {
+          heapSize += ClassSize.align(key.heapSize());
+        }
+      }
+      // Add comparator and the midkey atomicreference
+      heapSize += 2 * ClassSize.REFERENCE;
+      // Add rootCount and searchTreeLevel
+      heapSize += 2 * Bytes.SIZEOF_INT;
+
+      return ClassSize.align(heapSize);
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return blockKeys.length == 0;
+    }
+
+    @Override
+    public Cell getRootBlockKey(int i) {
+      return blockKeys[i];
+    }
+
+    @Override
+    public int getRootBlockCount() {
+      return rootCount;
+    }
+
+    @Override
+    public void initRootIndex(HFileBlock blk, int numEntries, CellComparator 
comparator,
+      int treeLevel) throws IOException {
+      this.comparator = comparator;
+      this.searchTreeLevel = treeLevel;
+      init(blk, numEntries);
+    }
+
+    private void init(HFileBlock blk, int numEntries) throws IOException {
+      DataInputStream in = readRootIndex(blk, numEntries);
+      // after reading the root index the checksum bytes have to
+      // be subtracted to know if the mid key exists.
+      int checkSumBytes = blk.totalChecksumBytes();
+      if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
+        // No mid-key metadata available.
+        return;
+      }
+      midLeafBlockOffset = in.readLong();
+      midLeafBlockOnDiskSize = in.readInt();
+      midKeyEntry = in.readInt();
+    }
+
+    private DataInputStream readRootIndex(HFileBlock blk, final int 
numEntries) throws IOException {
+      DataInputStream in = blk.getByteStream();
+      readRootIndex(in, numEntries);
+      return in;
+    }
+
+    private void readRootIndex(DataInput in, final int numEntries) throws 
IOException {
+      blockOffsets = new long[numEntries];
+      initialize(numEntries);
+      blockDataSizes = new int[numEntries];
+
+      // If index size is zero, no index was written.
+      if (numEntries > 0) {
+        for (int i = 0; i < numEntries; ++i) {
+          long offset = in.readLong();
+          int dataSize = in.readInt();
+          byte[] key = Bytes.readByteArray(in);
+          add(key, offset, dataSize);
+        }
+      }
+    }
+
+    private void initialize(int numEntries) {
+      blockKeys = new Cell[numEntries];
+    }
+
+    private void add(final byte[] key, final long offset, final int dataSize) {
+      blockOffsets[rootCount] = offset;
+      // Create the blockKeys as Cells once when the reader is opened
+      blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
+      blockDataSizes[rootCount] = dataSize;
+      rootCount++;
+    }
+
+    @Override
+    public Cell midkey(HFile.CachingBlockReader cachingBlockReader) throws 
IOException {
+      if (rootCount == 0) throw new IOException("HFile empty");
+
+      Cell targetMidKey = this.midKey.get();
+      if (targetMidKey != null) {
+        return targetMidKey;
+      }
+
+      if (midLeafBlockOffset >= 0) {
+        if (cachingBlockReader == null) {
+          throw new IOException(
+            "Have to read the middle leaf block but " + "no block reader 
available");
+        }
+
+        // Caching, using pread, assuming this is not a compaction.
+        HFileBlock midLeafBlock = 
cachingBlockReader.readBlock(midLeafBlockOffset,
+          midLeafBlockOnDiskSize, true, true, false, true, 
BlockType.LEAF_INDEX, null);
+        try {
+          ByteBuff b = midLeafBlock.getBufferWithoutHeader();
+          int numDataBlocks = b.getIntAfterPosition(0);
+          int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * 
(midKeyEntry + 1));
+          int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 
2)) - keyRelOffset
+            - SECONDARY_INDEX_ENTRY_OVERHEAD;
+          int keyOffset =
+            Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset + 
SECONDARY_INDEX_ENTRY_OVERHEAD;
+          byte[] bytes = b.toBytes(keyOffset, keyLen);
+          targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
+        } finally {
+          midLeafBlock.release();
+        }
+      } else {
+        // The middle of the root-level index.
+        targetMidKey = blockKeys[rootCount / 2];
+      }
+
+      this.midKey.set(targetMidKey);
+      return targetMidKey;
+    }
+
+    @Override
+    public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock 
currentBlock,
+      boolean cacheBlocks, boolean pread, boolean isCompaction,
+      DataBlockEncoding expectedDataBlockEncoding, HFile.CachingBlockReader 
cachingBlockReader)
+      throws IOException {
+      int rootLevelIndex = rootBlockContainingKey(key);
+      if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
+        return null;
+      }
+
+      // the next indexed key
+      Cell nextIndexedKey = null;
+
+      // Read the next-level (intermediate or leaf) index block.
+      long currentOffset = blockOffsets[rootLevelIndex];
+      int currentOnDiskSize = blockDataSizes[rootLevelIndex];
+
+      if (rootLevelIndex < blockKeys.length - 1) {
+        nextIndexedKey = blockKeys[rootLevelIndex + 1];
+      } else {
+        nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
+      }
+
+      int lookupLevel = 1; // How many levels deep we are in our lookup.
+      int index = -1;
+
+      HFileBlock block = null;
+      KeyValue.KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
+      while (true) {
+        try {
+          // Must initialize it with null here, because if don't and once an 
exception happen in
+          // readBlock, then we'll release the previous assigned block twice 
in the finally block.
+          // (See HBASE-22422)
+          block = null;
+          if (currentBlock != null && currentBlock.getOffset() == 
currentOffset) {
+            // Avoid reading the same block again, even with caching turned 
off.
+            // This is crucial for compaction-type workload which might have
+            // caching turned off. This is like a one-block cache inside the
+            // scanner.
+            block = currentBlock;
+          } else {
+            // Call HFile's caching block reader API. We always cache index
+            // blocks, otherwise we might get terrible performance.
+            boolean shouldCache = cacheBlocks || (lookupLevel < 
searchTreeLevel);
+            BlockType expectedBlockType;
+            if (lookupLevel < searchTreeLevel - 1) {
+              expectedBlockType = BlockType.INTERMEDIATE_INDEX;
+            } else if (lookupLevel == searchTreeLevel - 1) {
+              expectedBlockType = BlockType.LEAF_INDEX;
+            } else {
+              // this also accounts for ENCODED_DATA
+              expectedBlockType = BlockType.DATA;
+            }
+            block = cachingBlockReader.readBlock(currentOffset, 
currentOnDiskSize, shouldCache,
+              pread, isCompaction, true, expectedBlockType, 
expectedDataBlockEncoding);
+          }
+
+          if (block == null) {
+            throw new IOException("Failed to read block at offset " + 
currentOffset
+              + ", onDiskSize=" + currentOnDiskSize);
+          }
+
+          // Found a data block, break the loop and check our level in the 
tree.
+          if (block.getBlockType().isData()) {
+            break;
+          }
+
+          // Not a data block. This must be a leaf-level or intermediate-level
+          // index block. We don't allow going deeper than searchTreeLevel.
+          if (++lookupLevel > searchTreeLevel) {
+            throw new IOException("Search Tree Level overflow: lookupLevel=" + 
lookupLevel
+              + ", searchTreeLevel=" + searchTreeLevel);
+          }
+
+          // Locate the entry corresponding to the given key in the non-root
+          // (leaf or intermediate-level) index block.
+          ByteBuff buffer = block.getBufferWithoutHeader();
+          index = 
HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, key, 
comparator);
+          if (index == -1) {
+            // This has to be changed
+            // For now change this to key value
+            throw new IOException("The key " + 
CellUtil.getCellKeyAsString(key) + " is before the"
+              + " first key of the non-root index block " + block);
+          }
+
+          currentOffset = buffer.getLong();
+          currentOnDiskSize = buffer.getInt();
+
+          // Only update next indexed key if there is a next indexed key in 
the current level
+          byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
+          if (nonRootIndexedKey != null) {
+            tmpNextIndexKV.setKey(nonRootIndexedKey, 0, 
nonRootIndexedKey.length);
+            nextIndexedKey = tmpNextIndexKV;
+          }
+        } finally {
+          if (block != null && !block.getBlockType().isData()) {
+            // Release the block immediately if it is not the data block
+            block.release();
+          }
+        }
+      }
+
+      if (lookupLevel != searchTreeLevel) {
+        assert block.getBlockType().isData();
+        // Though we have retrieved a data block we have found an issue
+        // in the retrieved data block. Hence returned the block so that
+        // the ref count can be decremented
+        if (block != null) {
+          block.release();
+        }
+        throw new IOException("Reached a data block at level " + lookupLevel
+          + " but the number of levels is " + searchTreeLevel);
+      }
+
+      // set the next indexed key for the current block.
+      return new BlockWithScanInfo(block, nextIndexedKey);
+    }
+
+    @Override
+    public int rootBlockContainingKey(Cell key) {
+      // Here the comparator should not be null as this happens for the 
root-level block
+      int pos = Bytes.binarySearch(blockKeys, key, comparator);
+      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
+      // binarySearch's javadoc.
+
+      if (pos >= 0) {
+        // This means this is an exact match with an element of blockKeys.
+        assert pos < blockKeys.length;
+        return pos;
+      }
+
+      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < 
blockKeys[i],
+      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such 
that
+      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
+      // key < blockKeys[0], meaning the file does not contain the given key.
+
+      int i = -pos - 1;
+      assert 0 <= i && i <= blockKeys.length;
+      return i - 1;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("size=" + rootCount).append("\n");
+      for (int i = 0; i < rootCount; i++) {
+        sb.append("key=").append((blockKeys[i])).append("\n  
offset=").append(blockOffsets[i])
+          .append(", dataSize=" + blockDataSizes[i]).append("\n");
+      }
+      return sb.toString();
+    }
+
+    /**
+     * The indexed key at the ith position in the nonRootIndex. The position 
starts at 0. n * @param
+     * i the ith position
+     * @return The indexed key at the ith position in the nonRootIndex.
+     */
+    protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
+      int numEntries = nonRootIndex.getInt(0);
+      if (i < 0 || i >= numEntries) {
+        return null;
+      }
+
+      // Entries start after the number of entries and the secondary index.
+      // The secondary index takes numEntries + 1 ints.
+      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
+      // Targetkey's offset relative to the end of secondary index
+      int targetKeyRelOffset = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 1));
+
+      // The offset of the target key in the blockIndex buffer
+      int targetKeyOffset = entriesOffset // Skip secondary index
+        + targetKeyRelOffset // Skip all entries until mid
+        + SECONDARY_INDEX_ENTRY_OVERHEAD; // Skip offset and on-disk-size
+
+      // We subtract the two consecutive secondary index elements, which
+      // gives us the size of the whole (offset, onDiskSize, key) tuple. We
+      // then need to subtract the overhead of offset and onDiskSize.
+      int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) - 
targetKeyRelOffset
+        - SECONDARY_INDEX_ENTRY_OVERHEAD;
+
+      // TODO check whether we can make BB backed Cell here? So can avoid 
bytes copy.
+      return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
index 1c910b9670e..bdf3b92db65 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -116,7 +116,8 @@ abstract class StoreFileTrackerBase implements 
StoreFileTracker {
       .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
       
.withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
       
.withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
-      
.withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator()).build();
+      
.withTableName(ctx.getTableName().getName()).withCellComparator(ctx.getComparator())
+      .withIndexBlockEncoding(family.getIndexBlockEncoding()).build();
     return hFileContext;
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index ec4e38242da..f93f1e39a2b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
 import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.MultiByteBuff;
@@ -257,7 +256,8 @@ public class TestHFileBlockIndex {
       .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
     HFileBlock.Writer hbw = new 
HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
     FSDataOutputStream outputStream = fs.create(path);
-    HFileBlockIndex.BlockIndexWriter biw = new 
HFileBlockIndex.BlockIndexWriter(hbw, null, null);
+    HFileBlockIndex.BlockIndexWriter biw =
+      new HFileBlockIndex.BlockIndexWriter(hbw, null, null, null);
     for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
       
hbw.startWriting(BlockType.DATA).write(Bytes.toBytes(String.valueOf(RNG.nextInt(1000))));
       long blockOffset = outputStream.getPos();
@@ -435,7 +435,8 @@ public class TestHFileBlockIndex {
 
   @Test
   public void testBlockIndexChunk() throws IOException {
-    BlockIndexChunk c = new BlockIndexChunk();
+    BlockIndexChunk c = new HFileBlockIndex.BlockIndexChunkImpl();
+    HFileIndexBlockEncoder indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE;
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     int N = 1000;
     int[] numSubEntriesAt = new int[N];
@@ -443,12 +444,12 @@ public class TestHFileBlockIndex {
     for (int i = 0; i < N; ++i) {
       baos.reset();
       DataOutputStream dos = new DataOutputStream(baos);
-      c.writeNonRoot(dos);
+      indexBlockEncoder.encode(c, false, dos);
       assertEquals(c.getNonRootSize(), dos.size());
 
       baos.reset();
       dos = new DataOutputStream(baos);
-      c.writeRoot(dos);
+      indexBlockEncoder.encode(c, true, dos);
       assertEquals(c.getRootSize(), dos.size());
 
       byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
index 0824c2a3a2a..6227740f66d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
@@ -177,7 +177,7 @@ public class TestHFileWriterV3 {
     // Comparator class name is stored in the trailer in version 3.
     CellComparator comparator = trailer.createComparator();
     HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
-      new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator, 
trailer.getNumDataIndexLevels());
+      new HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(comparator, 
trailer.getNumDataIndexLevels());
     HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
       new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3WithDataEncoders.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3WithDataEncoders.java
index 3d60b97eab7..9945b57ae2f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3WithDataEncoders.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3WithDataEncoders.java
@@ -160,7 +160,7 @@ public class TestHFileWriterV3WithDataEncoders {
     // Comparator class name is stored in the trailer in version 3.
     CellComparator comparator = trailer.createComparator();
     HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
-      new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator, 
trailer.getNumDataIndexLevels());
+      new HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(comparator, 
trailer.getNumDataIndexLevels());
     HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
       new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
 

Reply via email to