This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b31a0ed4 [java] KUDU-2671 support creating table with custom hash 
schema per range
8b31a0ed4 is described below

commit 8b31a0ed48a8ee830c68dc74143c924f4b1f2116
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Dec 2 21:07:44 2021 -0800

    [java] KUDU-2671 support creating table with custom hash schema per range
    
    This patch introduces changes in the Kudu Java client API to make it
    possible to create a Kudu table with custom hash bucket schemas per
    range partition. Corresponding test coverage is present as well.
    
    This is a patch to complement 586b79132 at the Kudu Java client side
    (586b79132 introduced corresponding changes at the Kudu C++ client).
    
    The appropriate Spark bindings haven't been updated yet -- that
    to be done in a separate changelist.
    
    Change-Id: I5ccf77ea2c39808520e76351d62571d449d10894
    Reviewed-on: http://gerrit.cloudera.org:8080/18562
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Zoltan Chovan <[email protected]>
    Reviewed-by: Attila Bukor <[email protected]>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |   4 +-
 .../org/apache/kudu/client/CreateTableOptions.java |  99 ++-
 .../java/org/apache/kudu/client/KeyEncoder.java    |  21 +-
 .../java/org/apache/kudu/client/Operation.java     |  24 +-
 .../org/apache/kudu/client/PartitionPruner.java    | 312 +++++--
 .../org/apache/kudu/client/PartitionSchema.java    | 163 +++-
 .../org/apache/kudu/client/ProtobufHelper.java     |  53 +-
 .../org/apache/kudu/client/RangePartition.java     |  66 ++
 .../client/RangePartitionWithCustomHashSchema.java |  87 ++
 .../org/apache/kudu/client/TestKeyEncoding.java    |  59 +-
 .../org/apache/kudu/client/TestKuduClient.java     | 103 +++
 .../java/org/apache/kudu/client/TestKuduTable.java | 902 +++++++++++++++++++++
 .../java/org/apache/kudu/client/TestOperation.java |  26 +-
 .../apache/kudu/client/TestPartitionPruner.java    | 219 +++++
 14 files changed, 1969 insertions(+), 169 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 3a0780c14..1b1a690ca 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -633,8 +633,8 @@ public class AsyncKuduClient implements AutoCloseable {
     if (builder == null) {
       throw new IllegalArgumentException("CreateTableOptions may not be null");
     }
-    if (!builder.getBuilder().getPartitionSchema().hasRangeSchema() &&
-        builder.getBuilder().getPartitionSchema().getHashSchemaCount() == 0) {
+    final Common.PartitionSchemaPB ps = 
builder.getBuilder().getPartitionSchema();
+    if (!ps.hasRangeSchema() && ps.getHashSchemaCount() == 0) {
       throw new IllegalArgumentException("Table partitioning must be specified 
using " +
                                          "setRangePartitionColumns or 
addHashPartitions");
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableOptions.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableOptions.java
index 6ac83dd62..e1d35360c 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableOptions.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableOptions.java
@@ -38,8 +38,11 @@ public class CreateTableOptions {
 
   private final List<PartialRow> splitRows = Lists.newArrayList();
   private final List<RangePartition> rangePartitions = Lists.newArrayList();
+  private final List<RangePartitionWithCustomHashSchema> customRangePartitions 
=
+      Lists.newArrayList(); // range partitions with custom hash schemas
   private Master.CreateTableRequestPB.Builder pb = 
Master.CreateTableRequestPB.newBuilder();
   private boolean wait = true;
+  private boolean isPbGenerationDone = false;
 
   /**
    * Add a set of hash partitions to the table.
@@ -164,6 +167,23 @@ public class CreateTableOptions {
     return this;
   }
 
+  /**
+   * Add range partition with custom hash schema.
+   *
+   * @param rangePartition range partition with custom hash schema
+   * @return this CreateTableOptions object modified accordingly
+   */
+  public CreateTableOptions 
addRangePartition(RangePartitionWithCustomHashSchema rangePartition) {
+    if (!splitRows.isEmpty()) {
+      throw new IllegalArgumentException(
+          "no range partitions with custom hash schema are allowed when using 
" +
+              "split rows to define range partitioning for a table");
+    }
+    customRangePartitions.add(rangePartition);
+    
pb.getPartitionSchemaBuilder().addCustomHashSchemaRanges(rangePartition.toPB());
+    return this;
+  }
+
   /**
    * Add a range partition split. The split row must fall in a range partition,
    * and causes the range partition to split into two contiguous range 
partitions.
@@ -174,6 +194,11 @@ public class CreateTableOptions {
    * @return this instance
    */
   public CreateTableOptions addSplitRow(PartialRow row) {
+    if (!customRangePartitions.isEmpty()) {
+      throw new IllegalArgumentException(
+          "no split rows are allowed to define range partitioning for a table 
" +
+              "when range partitions with custom hash schema are present");
+    }
     splitRows.add(new PartialRow(row));
     return this;
   }
@@ -271,55 +296,51 @@ public class CreateTableOptions {
   }
 
   Master.CreateTableRequestPB.Builder getBuilder() {
-    if (!splitRows.isEmpty() || !rangePartitions.isEmpty()) {
-      pb.setSplitRowsRangeBounds(new Operation.OperationsEncoder()
-                                              
.encodeRangePartitions(rangePartitions, splitRows));
+    if (isPbGenerationDone) {
+      return pb;
+    }
+
+    if (!splitRows.isEmpty() && !customRangePartitions.isEmpty()) {
+      throw new IllegalArgumentException(
+          "no split rows are allowed to define range partitioning for a table 
" +
+              "when range partitions with custom hash schema are present");
     }
+    if (customRangePartitions.isEmpty()) {
+      if (!splitRows.isEmpty() || !rangePartitions.isEmpty()) {
+        pb.setSplitRowsRangeBounds(new Operation.OperationsEncoder()
+            .encodeRangePartitions(rangePartitions, splitRows));
+      }
+    } else {
+      // With the presence of a range with custom hash schema when the
+      // table-wide hash schema is used for a particular range, add proper
+      // element into PartitionSchemaPB::custom_hash_schema_ranges to satisfy
+      // the convention used by the backend. Do so for all the ranges with
+      // table-wide hash schemas.
+      for (RangePartition p : rangePartitions) {
+        org.apache.kudu.Common.PartitionSchemaPB.RangeWithHashSchemaPB.Builder 
b =
+            pb.getPartitionSchemaBuilder().addCustomHashSchemaRangesBuilder();
+        // Set the hash schema for the range.
+        for (org.apache.kudu.Common.PartitionSchemaPB.HashBucketSchemaPB 
hashSchema :
+            pb.getPartitionSchemaBuilder().getHashSchemaList()) {
+          b.addHashSchema(hashSchema);
+        }
+        b.setRangeBounds(
+            new Operation.OperationsEncoder().encodeLowerAndUpperBounds(
+                p.lowerBound, p.upperBound, p.lowerBoundType, 
p.upperBoundType));
+      }
+    }
+    isPbGenerationDone = true;
     return pb;
   }
 
   List<Integer> getRequiredFeatureFlags() {
-    if (rangePartitions.isEmpty()) {
+    if (rangePartitions.isEmpty() && customRangePartitions.isEmpty()) {
       return ImmutableList.of();
-    } else {
-      return 
ImmutableList.of(Master.MasterFeatures.RANGE_PARTITION_BOUNDS_VALUE);
     }
+    return 
ImmutableList.of(Master.MasterFeatures.RANGE_PARTITION_BOUNDS_VALUE);
   }
 
   boolean shouldWait() {
     return wait;
   }
-
-  static final class RangePartition {
-    private final PartialRow lowerBound;
-    private final PartialRow upperBound;
-    private final RangePartitionBound lowerBoundType;
-    private final RangePartitionBound upperBoundType;
-
-    public RangePartition(PartialRow lowerBound,
-                          PartialRow upperBound,
-                          RangePartitionBound lowerBoundType,
-                          RangePartitionBound upperBoundType) {
-      this.lowerBound = lowerBound;
-      this.upperBound = upperBound;
-      this.lowerBoundType = lowerBoundType;
-      this.upperBoundType = upperBoundType;
-    }
-
-    public PartialRow getLowerBound() {
-      return lowerBound;
-    }
-
-    public PartialRow getUpperBound() {
-      return upperBound;
-    }
-
-    public RangePartitionBound getLowerBoundType() {
-      return lowerBoundType;
-    }
-
-    public RangePartitionBound getUpperBoundType() {
-      return upperBoundType;
-    }
-  }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
index 0e3500e22..638df2bf3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
@@ -92,14 +92,18 @@ class KeyEncoder {
    * @return an encoded partition key
    */
   public static byte[] encodePartitionKey(PartialRow row, PartitionSchema 
partitionSchema) {
+    ByteVec rangeBuf = ByteVec.create();
+    encodeColumns(row, partitionSchema.getRangeSchema().getColumnIds(), 
rangeBuf);
+
+    // Get the hash bucket schema for the range.
+    final List<HashBucketSchema> hashSchemas =
+        partitionSchema.getHashSchemaForRange(rangeBuf.toArray());
     ByteVec buf = ByteVec.create();
-    if (!partitionSchema.getHashBucketSchemas().isEmpty()) {
-      for (final HashBucketSchema hashSchema : 
partitionSchema.getHashBucketSchemas()) {
-        encodeHashBucket(getHashBucket(row, hashSchema), buf);
-      }
+    for (final HashBucketSchema hashSchema : hashSchemas) {
+      encodeHashBucket(getHashBucket(row, hashSchema), buf);
     }
 
-    encodeColumns(row, partitionSchema.getRangeSchema().getColumnIds(), buf);
+    buf.append(rangeBuf);
     return buf.toArray();
   }
 
@@ -252,9 +256,9 @@ class KeyEncoder {
     ByteBuffer buf = ByteBuffer.wrap(key);
     buf.order(ByteOrder.BIG_ENDIAN);
 
+    final List<HashBucketSchema> hashSchemas = 
partitionSchema.getHashSchemaForRange(key);
     List<Integer> buckets = new ArrayList<>();
-
-    for (int i = 0; i < partitionSchema.getHashBucketSchemas().size(); i++) {
+    for (int i = 0; i < hashSchemas.size(); i++) {
       if (buf.hasRemaining()) {
         buckets.add(buf.getInt());
       } else {
@@ -448,7 +452,8 @@ class KeyEncoder {
                                                byte[] lowerBound,
                                                byte[] upperBound) {
     if (partitionSchema.getRangeSchema().getColumnIds().isEmpty() &&
-        partitionSchema.getHashBucketSchemas().isEmpty()) {
+        partitionSchema.getHashBucketSchemas().isEmpty() &&
+        partitionSchema.getRangesWithHashSchemas().isEmpty()) {
       assert lowerBound.length == 0 && upperBound.length == 0;
       return "<no-partitioning>";
     }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 0b2b933fa..fdd636a31 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -36,7 +36,6 @@ import org.apache.yetus.audience.InterfaceStability;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.RowOperations.RowOperationsPB;
 import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
 import org.apache.kudu.WireProtocol.AppStatusPB.ErrorCode;
 import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.client.Statistics.Statistic;
@@ -435,7 +434,7 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
     }
 
     public RowOperationsPB encodeRangePartitions(
-        List<CreateTableOptions.RangePartition> rangePartitions,
+        List<RangePartition> rangePartitions,
         List<PartialRow> splitRows) {
 
       if (splitRows.isEmpty() && rangePartitions.isEmpty()) {
@@ -450,15 +449,15 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
         encodeRow(row, ChangeType.SPLIT_ROWS);
       }
 
-      for (CreateTableOptions.RangePartition partition : rangePartitions) {
+      for (RangePartition partition : rangePartitions) {
         encodeRow(partition.getLowerBound(),
-                  partition.getLowerBoundType() == 
RangePartitionBound.INCLUSIVE_BOUND ?
-                      ChangeType.RANGE_LOWER_BOUND :
-                      ChangeType.EXCLUSIVE_RANGE_LOWER_BOUND);
+            partition.getLowerBoundType() == 
RangePartitionBound.INCLUSIVE_BOUND ?
+                ChangeType.RANGE_LOWER_BOUND :
+                ChangeType.EXCLUSIVE_RANGE_LOWER_BOUND);
         encodeRow(partition.getUpperBound(),
-                  partition.getUpperBoundType() == 
RangePartitionBound.EXCLUSIVE_BOUND ?
-                      ChangeType.RANGE_UPPER_BOUND :
-                      ChangeType.INCLUSIVE_RANGE_UPPER_BOUND);
+            partition.getUpperBoundType() == 
RangePartitionBound.EXCLUSIVE_BOUND ?
+                ChangeType.RANGE_UPPER_BOUND :
+                ChangeType.INCLUSIVE_RANGE_UPPER_BOUND);
       }
 
       return toPB();
@@ -505,7 +504,7 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
      * @param schema the table schema to use for decoding
      * @return a list of PangePartition objects with corresponding bounds
      */
-    public List<CreateTableOptions.RangePartition> decodeRangePartitions(
+    public List<RangePartition> decodeRangePartitions(
         RowOperationsPB pb, Schema schema) {
       if (pb == null) {
         return null;
@@ -531,7 +530,7 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
             "unexpected odd number of range partition bounds");
       }
 
-      List<CreateTableOptions.RangePartition> result = new ArrayList<>();
+      List<RangePartition> result = new ArrayList<>();
       for (int i = 0; i < decodedBounds.size(); i += 2) {
         Pair<PartialRow, RowOperationsPB.Type> lower = decodedBounds.get(i);
         Pair<PartialRow, RowOperationsPB.Type> upper = decodedBounds.get(i + 
1);
@@ -556,8 +555,7 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
               "%s: unexpected bound type for the upper bound", 
upper.getSecond().toString()));
         }
 
-        result.add(new CreateTableOptions.RangePartition(
-            lower.getFirst(), upper.getFirst(), lowerType, upperType));
+        result.add(new RangePartition(lower.getFirst(), upper.getFirst(), 
lowerType, upperType));
       }
       return result;
     }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
index 8785cc454..5b74d6e9e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionPruner.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.concurrent.NotThreadSafe;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.kudu.ColumnSchema;
@@ -43,7 +45,7 @@ public class PartitionPruner {
 
   /**
    * Constructs a new partition pruner.
-   * @param rangePartitions the valid partition key ranges, in reverse sorted 
order
+   * @param rangePartitions the valid partition key ranges, sorted in 
ascending order
    */
   private PartitionPruner(Deque<Pair<byte[], byte[]>> rangePartitions) {
     this.rangePartitions = rangePartitions;
@@ -70,11 +72,11 @@ public class PartitionPruner {
    */
   public static PartitionPruner create(AbstractKuduScannerBuilder<?, ?> 
scanner) {
     Schema schema = scanner.table.getSchema();
-    PartitionSchema partitionSchema = scanner.table.getPartitionSchema();
+    final PartitionSchema partitionSchema = scanner.table.getPartitionSchema();
     PartitionSchema.RangeSchema rangeSchema = partitionSchema.getRangeSchema();
     Map<String, KuduPredicate> predicates = scanner.predicates;
 
-    // Check if the scan can be short circuited entirely by checking the 
primary
+    // Check if the scan can be short-circuited entirely by checking the 
primary
     // key bounds and predicates. This also allows us to assume some 
invariants of the
     // scan, such as no None predicates and that the lower bound PK < upper
     // bound PK.
@@ -160,6 +162,7 @@ public class PartitionPruner {
     // key bounds, if they are tighter.
     byte[] rangeLowerBound = pushPredsIntoLowerBoundRangeKey(schema, 
rangeSchema, predicates);
     byte[] rangeUpperBound = pushPredsIntoUpperBoundRangeKey(schema, 
rangeSchema, predicates);
+
     if (partitionSchema.isSimpleRangePartitioning()) {
       if (Bytes.memcmp(rangeLowerBound, scanner.lowerBoundPrimaryKey) < 0) {
         rangeLowerBound = scanner.lowerBoundPrimaryKey;
@@ -170,100 +173,112 @@ public class PartitionPruner {
         rangeUpperBound = scanner.upperBoundPrimaryKey;
       }
     }
+    // Since the table can contain range-specific hash schemas, it's necessary
+    // to split the original range into sub-ranges where each subrange comes
+    // with appropriate hash schema.
+    List<PartitionSchema.EncodedRangeBoundsWithHashSchema> preliminaryRanges =
+        splitIntoHashSpecificRanges(rangeLowerBound, rangeUpperBound, 
partitionSchema);
+
+    List<Pair<byte[], byte[]>> partitionKeyRangeBytes = new ArrayList<>();
+
+    for (PartitionSchema.EncodedRangeBoundsWithHashSchema preliminaryRange : 
preliminaryRanges) {
+      // Step 2: Create the hash bucket portion of the partition key.
+      final List<PartitionSchema.HashBucketSchema> hashBucketSchemas =
+          preliminaryRange.hashSchemas;
+      // List of pruned hash buckets per hash component.
+      List<BitSet> hashComponents = new ArrayList<>(hashBucketSchemas.size());
+      for (PartitionSchema.HashBucketSchema hashSchema : hashBucketSchemas) {
+        hashComponents.add(pruneHashComponent(schema, hashSchema, predicates));
+      }
 
-    // Step 2: Create the hash bucket portion of the partition key.
-
-    // List of pruned hash buckets per hash component.
-    List<BitSet> hashComponents = new 
ArrayList<>(partitionSchema.getHashBucketSchemas().size());
-    for (PartitionSchema.HashBucketSchema hashSchema : 
partitionSchema.getHashBucketSchemas()) {
-      hashComponents.add(pruneHashComponent(schema, hashSchema, predicates));
-    }
-
-    // The index of the final constrained component in the partition key.
-    int constrainedIndex = 0;
-    if (rangeLowerBound.length > 0 || rangeUpperBound.length > 0) {
-      // The range component is constrained if either of the range bounds are
-      // specified (non-empty).
-      constrainedIndex = partitionSchema.getHashBucketSchemas().size();
-    } else {
-      // Search the hash bucket constraints from right to left, looking for the
-      // first constrained component.
-      for (int i = hashComponents.size(); i > 0; i--) {
-        int numBuckets = partitionSchema.getHashBucketSchemas().get(i - 
1).getNumBuckets();
-        BitSet hashBuckets = hashComponents.get(i - 1);
-        if (hashBuckets.nextClearBit(0) < numBuckets) {
-          constrainedIndex = i;
-          break;
+      // The index of the final constrained component in the partition key.
+      int constrainedIndex = 0;
+      if (preliminaryRange.lower.length > 0 || preliminaryRange.upper.length > 
0) {
+        // The range component is constrained if either of the range bounds are
+        // specified (non-empty).
+        constrainedIndex = hashBucketSchemas.size();
+      } else {
+        // Search the hash bucket constraints from right to left, looking for 
the
+        // first constrained component.
+        for (int i = hashComponents.size(); i > 0; i--) {
+          int numBuckets = hashBucketSchemas.get(i - 1).getNumBuckets();
+          BitSet hashBuckets = hashComponents.get(i - 1);
+          if (hashBuckets.nextClearBit(0) < numBuckets) {
+            constrainedIndex = i;
+            break;
+          }
         }
       }
-    }
 
-    // Build up a set of partition key ranges out of the hash components.
-    //
-    // Each hash component simply appends its bucket number to the
-    // partition key ranges (possibly incrementing the upper bound by one 
bucket
-    // number if this is the final constraint, see note 2 in the example 
above).
-    List<Pair<ByteVec, ByteVec>> partitionKeyRanges = new ArrayList<>();
-    partitionKeyRanges.add(new Pair<>(ByteVec.create(), ByteVec.create()));
-
-    for (int hashIdx = 0; hashIdx < constrainedIndex; hashIdx++) {
-      // This is the final partition key component if this is the final 
constrained
-      // bucket, and the range upper bound is empty. In this case we need to
-      // increment the bucket on the upper bound to convert from inclusive to
-      // exclusive.
-      boolean isLast = hashIdx + 1 == constrainedIndex && 
rangeUpperBound.length == 0;
-      BitSet hashBuckets = hashComponents.get(hashIdx);
-
-      List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
-          new ArrayList<>(partitionKeyRanges.size() * 
hashBuckets.cardinality());
-      for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
-        for (int bucket = hashBuckets.nextSetBit(0);
-             bucket != -1;
-             bucket = hashBuckets.nextSetBit(bucket + 1)) {
-          int bucketUpper = isLast ? bucket + 1 : bucket;
-          ByteVec lower = partitionKeyRange.getFirst().clone();
-          ByteVec upper = partitionKeyRange.getFirst().clone();
-          KeyEncoder.encodeHashBucket(bucket, lower);
-          KeyEncoder.encodeHashBucket(bucketUpper, upper);
-          newPartitionKeyRanges.add(new Pair<>(lower, upper));
+      // Build up a set of partition key ranges out of the hash components.
+      //
+      // Each hash component simply appends its bucket number to the
+      // partition key ranges (possibly incrementing the upper bound by one 
bucket
+      // number if this is the final constraint, see note 2 in the example 
above).
+      List<Pair<ByteVec, ByteVec>> partitionKeyRanges = new ArrayList<>();
+      partitionKeyRanges.add(new Pair<>(ByteVec.create(), ByteVec.create()));
+
+      for (int hashIdx = 0; hashIdx < constrainedIndex; hashIdx++) {
+        // This is the final partition key component if this is the final 
constrained
+        // bucket, and the range upper bound is empty. In this case we need to
+        // increment the bucket on the upper bound to convert from inclusive to
+        // exclusive.
+        boolean isLast = hashIdx + 1 == constrainedIndex && 
preliminaryRange.upper.length == 0;
+        BitSet hashBuckets = hashComponents.get(hashIdx);
+
+        List<Pair<ByteVec, ByteVec>> newPartitionKeyRanges =
+            new ArrayList<>(partitionKeyRanges.size() * 
hashBuckets.cardinality());
+        for (Pair<ByteVec, ByteVec> partitionKeyRange : partitionKeyRanges) {
+          for (int bucket = hashBuckets.nextSetBit(0);
+               bucket != -1;
+               bucket = hashBuckets.nextSetBit(bucket + 1)) {
+            int bucketUpper = isLast ? bucket + 1 : bucket;
+            ByteVec lower = partitionKeyRange.getFirst().clone();
+            ByteVec upper = partitionKeyRange.getFirst().clone();
+            KeyEncoder.encodeHashBucket(bucket, lower);
+            KeyEncoder.encodeHashBucket(bucketUpper, upper);
+            newPartitionKeyRanges.add(new Pair<>(lower, upper));
+          }
         }
+        partitionKeyRanges = newPartitionKeyRanges;
       }
-      partitionKeyRanges = newPartitionKeyRanges;
-    }
 
-    // Step 3: append the (possibly empty) range bounds to the partition key 
ranges.
-    for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
-      range.getFirst().append(rangeLowerBound);
-      range.getSecond().append(rangeUpperBound);
-    }
+      // Step 3: append the (possibly empty) range bounds to the partition key 
ranges.
+      for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
+        range.getFirst().append(preliminaryRange.lower);
+        range.getSecond().append(preliminaryRange.upper);
+      }
 
-    // Step 4: Filter ranges that fall outside the scan's upper and lower 
bound partition keys.
-    Deque<Pair<byte[], byte[]>> partitionKeyRangeBytes =
-        new ArrayDeque<>(partitionKeyRanges.size());
-    for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
-      byte[] lower = range.getFirst().toArray();
-      byte[] upper = range.getSecond().toArray();
+      // Step 4: Filter ranges that fall outside the scan's upper and lower 
bound partition keys.
+      for (Pair<ByteVec, ByteVec> range : partitionKeyRanges) {
+        byte[] lower = range.getFirst().toArray();
+        byte[] upper = range.getSecond().toArray();
 
-      // Sanity check that the lower bound is less than the upper bound.
-      assert upper.length == 0 || Bytes.memcmp(lower, upper) < 0;
+        // Sanity check that the lower bound is less than the upper bound.
+        assert upper.length == 0 || Bytes.memcmp(lower, upper) < 0;
 
-      // Find the intersection of the ranges.
-      if (scanner.lowerBoundPartitionKey.length > 0 &&
-          (lower.length == 0 || Bytes.memcmp(lower, 
scanner.lowerBoundPartitionKey) < 0)) {
-        lower = scanner.lowerBoundPartitionKey;
-      }
-      if (scanner.upperBoundPartitionKey.length > 0 &&
-          (upper.length == 0 || Bytes.memcmp(upper, 
scanner.upperBoundPartitionKey) > 0)) {
-        upper = scanner.upperBoundPartitionKey;
-      }
+        // Find the intersection of the ranges.
+        if (scanner.lowerBoundPartitionKey.length > 0 &&
+            (lower.length == 0 || Bytes.memcmp(lower, 
scanner.lowerBoundPartitionKey) < 0)) {
+          lower = scanner.lowerBoundPartitionKey;
+        }
+        if (scanner.upperBoundPartitionKey.length > 0 &&
+            (upper.length == 0 || Bytes.memcmp(upper, 
scanner.upperBoundPartitionKey) > 0)) {
+          upper = scanner.upperBoundPartitionKey;
+        }
 
-      // If the intersection is valid, then add it as a range partition.
-      if (upper.length == 0 || Bytes.memcmp(lower, upper) < 0) {
-        partitionKeyRangeBytes.add(new Pair<>(lower, upper));
+        // If the intersection is valid, then add it as a range partition.
+        if (upper.length == 0 || Bytes.memcmp(lower, upper) < 0) {
+          partitionKeyRangeBytes.add(new Pair<>(lower, upper));
+        }
       }
     }
 
-    return new PartitionPruner(partitionKeyRangeBytes);
+    // The PartitionPruner's constructor expects the collection to be sorted
+    // in ascending order.
+    Collections.sort(partitionKeyRangeBytes,
+        (lhs, rhs) -> Bytes.memcmp(lhs.getFirst(), rhs.getFirst()));
+    return new PartitionPruner(new ArrayDeque<>(partitionKeyRangeBytes));
   }
 
   /** @return {@code true} if there are more range partitions to scan. */
@@ -489,6 +504,137 @@ public class PartitionPruner {
     return KeyEncoder.encodeRangePartitionKey(row, rangeSchema);
   }
 
+  static List<PartitionSchema.EncodedRangeBoundsWithHashSchema> 
splitIntoHashSpecificRanges(
+      byte[] scanLowerBound, byte[] scanUpperBound, PartitionSchema ps) {
+    final List<PartitionSchema.EncodedRangeBoundsWithHashSchema> ranges =
+        ps.getEncodedRangesWithHashSchemas();
+    final List<PartitionSchema.HashBucketSchema> tableWideHashSchema =
+        ps.getHashBucketSchemas();
+
+    // If there aren't any ranges with custom hash schemas or there isn't an
+    // intersection between the set of ranges with custom hash schemas and the
+    // scan range, the result is trivial: the whole scan range is attributed
+    // to the table-wide hash schema.
+    if (ranges.isEmpty()) {
+      return ImmutableList.of(new 
PartitionSchema.EncodedRangeBoundsWithHashSchema(
+          scanLowerBound, scanUpperBound, tableWideHashSchema));
+    }
+
+    {
+      final byte[] rangesLowerBound = ranges.get(0).lower;
+      final byte[] rangesUpperBound = ranges.get(ranges.size() - 1).upper;
+
+      if ((scanUpperBound.length != 0 &&
+              Bytes.memcmp(scanUpperBound, rangesLowerBound) <= 0) ||
+          (scanLowerBound.length != 0 && rangesUpperBound.length != 0 &&
+              Bytes.memcmp(rangesUpperBound, scanLowerBound) <= 0)) {
+        return ImmutableList.of(new 
PartitionSchema.EncodedRangeBoundsWithHashSchema(
+            scanLowerBound, scanUpperBound, tableWideHashSchema));
+      }
+    }
+
+    // Index of the known range with custom hash schema that the iterator is
+    // currently pointing at or about to point if the iterator is currently
+    // at the scan boundary.
+    int curIdx = -1;
+
+    // Find the first range that is at or after the specified bounds.
+    // TODO(aserbin): maybe, do this in PartitionSchema with O(ln(N)) 
complexity?
+    for (int idx = 0; idx < ranges.size(); ++idx) {
+      final PartitionSchema.EncodedRangeBoundsWithHashSchema range = 
ranges.get(idx);
+
+      // Searching for the first range that is at or after the lower scan 
bound.
+      if (curIdx >= 0 ||
+          (range.upper.length != 0 && Bytes.memcmp(range.upper, 
scanLowerBound) <= 0)) {
+        continue;
+      }
+      curIdx = idx;
+    }
+
+    Preconditions.checkState(curIdx >= 0);
+    Preconditions.checkState(curIdx < ranges.size());
+
+    // Current position of the iterator.
+    byte[] curPoint = scanLowerBound;
+
+    // Iterate over the scan range from one known boundary to the next one,
+    // enumerating the resulting consecutive sub-ranges and attributing each
+    // sub-range to a proper hash schema. If that's a known range with custom 
hash
+    // schema, it's attributed to its range-specific hash schema; otherwise,
+    // a sub-range is attributed to the table-wide hash schema.
+    List<PartitionSchema.EncodedRangeBoundsWithHashSchema> result = new 
ArrayList<>();
+    while (curIdx < ranges.size() &&
+        (Bytes.memcmp(curPoint, scanUpperBound) < 0 || scanUpperBound.length 
== 0)) {
+      // Check the disposition of cur_point related to the lower boundary
+      // of the range pointed to by 'cur_idx'.
+      final PartitionSchema.EncodedRangeBoundsWithHashSchema curRange = 
ranges.get(curIdx);
+      if (Bytes.memcmp(curPoint, curRange.lower) < 0) {
+        // The iterator is before the current range:
+        //     |---|
+        //   ^
+        // The next known bound is either the upper bound of the current range
+        // or the upper bound of the scan.
+        byte[] upperBound;
+        if (scanUpperBound.length == 0) {
+          upperBound = curRange.lower;
+        } else {
+          if (Bytes.memcmp(curRange.lower, scanUpperBound) < 0) {
+            upperBound = curRange.lower;
+          } else {
+            upperBound = scanUpperBound;
+          }
+        }
+        result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+            curPoint, upperBound, tableWideHashSchema));
+        // Not advancing the 'cur_idx' since cur_point is either at the 
beginning
+        // of the range or before it at the upper bound of the scan.
+      } else if (Bytes.memcmp(curPoint, curRange.lower) == 0) {
+        // The iterator is at the lower boundary of the current range:
+        //   |---|
+        //   ^
+        if ((curRange.upper.length != 0 && Bytes.memcmp(curRange.upper, 
scanUpperBound) <= 0) ||
+            scanUpperBound.length == 0) {
+          // The current range is withing the scan boundaries.
+          result.add(curRange);
+        } else {
+          // The current range spans over the upper bound of the scan.
+          result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+              curPoint, scanUpperBound, curRange.hashSchemas));
+        }
+        // Done with the current range, advance to the next one, if any.
+        ++curIdx;
+      } else {
+        if ((scanUpperBound.length != 0 && Bytes.memcmp(scanUpperBound, 
curRange.upper) <= 0) ||
+            curRange.upper.length == 0) {
+          result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+              curPoint, scanUpperBound, curRange.hashSchemas));
+        } else {
+          result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+              curPoint, curRange.upper, curRange.hashSchemas));
+        }
+        // Done with the current range, advance to the next one, if any.
+        ++curIdx;
+      }
+      Preconditions.checkState(!result.isEmpty());
+      // Advance the iterator.
+      curPoint = result.get(result.size() - 1).upper;
+    }
+
+    // If exiting from the cycle above by the 'cur_idx < ranges.size()' 
condition,
+    // check if the upper bound of the scan is beyond the upper bound of the 
last
+    // range with custom hash schema. If so, add an extra range that spans from
+    // the upper bound of the last range to the upper bound of the scan.
+    Preconditions.checkState(!result.isEmpty());
+    final byte[] rangesUpperBound = result.get(result.size() - 1).upper;
+    if (Bytes.memcmp(rangesUpperBound, scanUpperBound) != 0) {
+      Preconditions.checkState(Bytes.memcmp(curPoint, rangesUpperBound) == 0);
+      result.add(new PartitionSchema.EncodedRangeBoundsWithHashSchema(
+          curPoint, scanUpperBound, tableWideHashSchema));
+    }
+
+    return result;
+  }
+
   /**
    * Search all combination of in-list and equality predicates for pruneable 
hash partitions.
    * @return a bitset containing {@code false} bits for hash buckets which may 
be pruned
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
index e34094316..aa234110c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
@@ -17,8 +17,15 @@
 
 package org.apache.kudu.client;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.TreeSet;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedBytes;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -32,8 +39,10 @@ import org.apache.kudu.Schema;
  * primary key column values of a row into a partition key that can be used to
  * find the tablet containing the key.
  *
- * The partition schema is made up of zero or more hash bucket components,
- * followed by a single range component.
+ * In case of table-wide hash partitioning, the partition schema is made up of
+ * zero or more hash bucket components, followed by a single range component.
+ * In case of custom hash bucketing per range, the partition schema contains
+ * information on hash bucket components per range.
  *
  * Each hash bucket component includes one or more columns from the primary key
  * column set, with the restriction that an individual primary key column may
@@ -45,24 +54,87 @@ import org.apache.kudu.Schema;
 @InterfaceStability.Unstable
 public class PartitionSchema {
 
+  private static final class BoundsComparator
+      implements Comparator<EncodedRangeBoundsWithHashSchema>, Serializable {
+    private static final long serialVersionUID = 36028797018963969L;
+    private static final Comparator<byte[]> comparator =
+        UnsignedBytes.lexicographicalComparator();
+
+    @Override
+    public int compare(EncodedRangeBoundsWithHashSchema lhs,
+                       EncodedRangeBoundsWithHashSchema rhs) {
+      return comparator.compare(lhs.lower, rhs.lower);
+    }
+  }
+
+  private static final Comparator<EncodedRangeBoundsWithHashSchema> COMPARATOR 
=
+      new BoundsComparator();
+
   private final RangeSchema rangeSchema;
   private final List<HashBucketSchema> hashBucketSchemas;
+  private final List<RangeWithHashSchema> rangesWithHashSchemas;
+  private final List<EncodedRangeBoundsWithHashSchema> 
encodedRangesWithHashSchemas;
+  private TreeSet<EncodedRangeBoundsWithHashSchema> hashSchemasPerRange;
   private final boolean isSimple;
 
+  static class EncodedRangeBoundsWithHashSchema {
+    final byte[] lower;
+    final byte[] upper;
+    final List<HashBucketSchema> hashSchemas;
+
+    public EncodedRangeBoundsWithHashSchema(
+        byte[] lower,
+        byte[] upper,
+        List<HashBucketSchema> hashSchemas) {
+      Preconditions.checkNotNull(lower);
+      Preconditions.checkNotNull(upper);
+      Preconditions.checkState(upper.length == 0 || Bytes.memcmp(lower, upper) 
< 0);
+      this.lower = lower;
+      this.upper = upper;
+      this.hashSchemas = hashSchemas;
+    }
+  }
+
   /**
    * Creates a new partition schema from the range and hash bucket schemas.
    *
    * @param rangeSchema the range schema
-   * @param hashBucketSchemas the hash bucket schemas
+   * @param hashBucketSchemas the table-wide hash schema
    * @param schema the table schema
    */
   public PartitionSchema(RangeSchema rangeSchema,
-                  List<HashBucketSchema> hashBucketSchemas,
-                  Schema schema) {
+                         List<HashBucketSchema> hashBucketSchemas,
+                         List<RangeWithHashSchema> rangesWithHashSchemas,
+                         Schema schema) {
     this.rangeSchema = rangeSchema;
     this.hashBucketSchemas = hashBucketSchemas;
+    this.rangesWithHashSchemas = rangesWithHashSchemas;
+    this.hashSchemasPerRange = new TreeSet<>(COMPARATOR);
+    this.encodedRangesWithHashSchemas = new 
ArrayList<>(rangesWithHashSchemas.size());
+
+    for (RangeWithHashSchema rhs : this.rangesWithHashSchemas) {
+      final boolean isLowerBoundEmpty =
+          rhs.lowerBound == null || 
rhs.lowerBound.getColumnsBitSet().isEmpty();
+      byte[] lower = isLowerBoundEmpty ? new byte[0]
+          : KeyEncoder.encodeRangePartitionKey(rhs.lowerBound, 
this.rangeSchema);
+      final boolean isUpperBoundEmpty =
+          rhs.upperBound == null || 
rhs.upperBound.getColumnsBitSet().isEmpty();
+      byte[] upper = isUpperBoundEmpty ? new byte[0]
+          : KeyEncoder.encodeRangePartitionKey(rhs.upperBound, 
this.rangeSchema);
+      if (!hashSchemasPerRange.add(
+          new EncodedRangeBoundsWithHashSchema(lower, upper, 
rhs.hashSchemas))) {
+        throw new IllegalArgumentException(
+            rhs.lowerBound.toString() + ": duplicate lower range boundary");
+      }
+    }
 
-    boolean isSimple = hashBucketSchemas.isEmpty() &&
+    // Populate the convenience collection storing the information on ranges
+    // with encoded bounds sorted in ascending order by lower bounds.
+    encodedRangesWithHashSchemas.addAll(this.hashSchemasPerRange);
+
+    boolean isSimple =
+        rangesWithHashSchemas.isEmpty() &&
+        hashBucketSchemas.isEmpty() &&
         rangeSchema.columns.size() == schema.getPrimaryKeyColumnCount();
     if (isSimple) {
       int i = 0;
@@ -76,6 +148,19 @@ public class PartitionSchema {
     this.isSimple = isSimple;
   }
 
+  /**
+   * Creates a new partition schema from the range and hash bucket schemas.
+   *
+   * @param rangeSchema the range schema
+   * @param hashBucketSchemas the table-wide hash schema
+   * @param schema the table schema
+   */
+  public PartitionSchema(RangeSchema rangeSchema,
+                         List<HashBucketSchema> hashBucketSchemas,
+                         Schema schema) {
+    this(rangeSchema, hashBucketSchemas, ImmutableList.of(), schema);
+  }
+
   /**
    * Returns the encoded partition key of the row.
    * @return a byte array containing the encoded partition key of the row
@@ -92,6 +177,14 @@ public class PartitionSchema {
     return hashBucketSchemas;
   }
 
+  List<RangeWithHashSchema> getRangesWithHashSchemas() {
+    return rangesWithHashSchemas;
+  }
+
+  List<EncodedRangeBoundsWithHashSchema> getEncodedRangesWithHashSchemas() {
+    return encodedRangesWithHashSchemas;
+  }
+
   /**
    * Returns true if the partition schema if the partition schema does not 
include any hash
    * components, and the range columns match the table's primary key columns.
@@ -102,6 +195,41 @@ public class PartitionSchema {
     return isSimple;
   }
 
+  /**
+   * @return whether the partition schema has ranges with custom hash schemas.
+   */
+  boolean hasCustomHashSchemas() {
+    return !rangesWithHashSchemas.isEmpty();
+  }
+
+  /**
+   * Find hash schema for the given encoded range key. Depending on the
+   * partition schema and the key, it might be either table-wide or a custom
+   * hash schema for a particular range.
+   *
+   * @return hash bucket schema for the encoded range key
+   */
+  List<HashBucketSchema> getHashSchemaForRange(byte[] rangeKey) {
+    if (!hasCustomHashSchemas()) {
+      // By definition, the table-wide hash schema provides the hash bucketing
+      // structure in the absence of per-range custom hash schemas.
+      return hashBucketSchemas;
+    }
+
+    final EncodedRangeBoundsWithHashSchema entry = hashSchemasPerRange.floor(
+        new EncodedRangeBoundsWithHashSchema(rangeKey, new byte[0], 
ImmutableList.of()));
+    if (entry == null) {
+      return hashBucketSchemas;
+    }
+    // Check if 'rangeKey' is in the range (null upper boundary means unbounded
+    // range partition).
+    final byte[] upper = entry.upper;
+    if (upper == null || Bytes.memcmp(rangeKey, upper) < 0) {
+      return entry.hashSchemas;
+    }
+    return hashBucketSchemas;
+  }
+
   public static class RangeSchema {
     private final List<Integer> columns;
 
@@ -155,4 +283,27 @@ public class PartitionSchema {
       return seed;
     }
   }
+
+  /**
+   * This utility class is used to represent information on a custom hash 
schema
+   * for a particular range.
+   */
+  public static class RangeWithHashSchema {
+    public PartialRow lowerBound;
+    public PartialRow upperBound;
+    public List<HashBucketSchema> hashSchemas;
+
+    public RangeWithHashSchema(
+        PartialRow lowerBound,
+        PartialRow upperBound,
+        List<HashBucketSchema> hashSchemas) {
+      Preconditions.checkNotNull(lowerBound);
+      Preconditions.checkNotNull(upperBound);
+      Preconditions.checkArgument(
+          lowerBound.getSchema().equals(upperBound.getSchema()));
+      this.lowerBound = lowerBound;
+      this.upperBound = upperBound;
+      this.hashSchemas = hashSchemas;
+    }
+  }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index 4381fd8ce..c9d94e7af 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -36,6 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Common;
+import org.apache.kudu.RowOperations;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.util.DateUtil;
@@ -222,7 +223,35 @@ public class ProtobufHelper {
       hashSchemas.add(hashSchema);
     }
 
-    return new PartitionSchema(rangeSchema, hashSchemas.build(), schema);
+    // Populate the list of ranges with custom hash schemas.
+    ImmutableList.Builder<PartitionSchema.RangeWithHashSchema> 
rangesWithHashSchemas =
+        ImmutableList.builder();
+
+    for (Common.PartitionSchemaPB.RangeWithHashSchemaPB rhsPB :
+        pb.getCustomHashSchemaRangesList()) {
+      List<PartitionSchema.HashBucketSchema> rangeHashSchemas = new 
ArrayList<>();
+      for (Common.PartitionSchemaPB.HashBucketSchemaPB hbs : 
rhsPB.getHashSchemaList()) {
+        rangeHashSchemas.add(new PartitionSchema.HashBucketSchema(
+            pbToIds(hbs.getColumnsList()), hbs.getNumBuckets(), 
hbs.getSeed()));
+      }
+
+      // Decode RowOperationsPB into the range bounds.
+      final RowOperations.RowOperationsPB rangeBounds = rhsPB.getRangeBounds();
+      Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
+      final List<RangePartition> partitions = 
dec.decodeRangePartitions(rangeBounds, schema);
+      if (partitions.size() != 1) {
+        throw new IllegalArgumentException("unexpected range bounds");
+      }
+      final RangePartition p = partitions.get(0);
+
+      PartitionSchema.RangeWithHashSchema rhs =
+          new PartitionSchema.RangeWithHashSchema(
+              p.lowerBound, p.upperBound, rangeHashSchemas);
+      rangesWithHashSchemas.add(rhs);
+    }
+
+    return new PartitionSchema(
+        rangeSchema, hashSchemas.build(), rangesWithHashSchemas.build(), 
schema);
   }
 
   public static Common.PartitionSchemaPB partitionSchemaToPb(PartitionSchema 
partitionSchema) {
@@ -244,6 +273,28 @@ public class ProtobufHelper {
               .build();
     builder.setRangeSchema(rangeSchemaPB);
 
+    // Based on the list of ranges with custom hash schemas, populate the
+    // PartitionSchemaPB.custom_hash_schema_ranges field.
+    for (PartitionSchema.RangeWithHashSchema rhs : 
partitionSchema.getRangesWithHashSchemas()) {
+      Common.PartitionSchemaPB.RangeWithHashSchemaPB.Builder rhsBuilder =
+          Common.PartitionSchemaPB.RangeWithHashSchemaPB.newBuilder();
+      for (PartitionSchema.HashBucketSchema hbs : rhs.hashSchemas) {
+        Common.PartitionSchemaPB.HashBucketSchemaPB.Builder hbsBuilder =
+            Common.PartitionSchemaPB.HashBucketSchemaPB.newBuilder()
+                .addAllColumns(idsToPb(hbs.getColumnIds()))
+                .setNumBuckets(hbs.getNumBuckets())
+                .setSeed(hbs.getSeed());
+        rhsBuilder.addHashSchema(hbsBuilder.build());
+      }
+
+      rhsBuilder.setRangeBounds(new 
Operation.OperationsEncoder().encodeLowerAndUpperBounds(
+          rhs.lowerBound,
+          rhs.upperBound,
+          RangePartitionBound.INCLUSIVE_BOUND,
+          RangePartitionBound.EXCLUSIVE_BOUND));
+      builder.addCustomHashSchemaRanges(rhsBuilder.build());
+    }
+
     return builder.build();
   }
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartition.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartition.java
new file mode 100644
index 000000000..903f46ac8
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartition.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.common.base.Preconditions;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * This class represents a range partition schema with table-wide hash schema.
+ *
+ * See also RangePartitionWithCustomHashSchema.
+ */
[email protected]
[email protected]
+class RangePartition {
+  final PartialRow lowerBound;
+  final PartialRow upperBound;
+  final RangePartitionBound lowerBoundType;
+  final RangePartitionBound upperBoundType;
+
+  public RangePartition(PartialRow lowerBound,
+                        PartialRow upperBound,
+                        RangePartitionBound lowerBoundType,
+                        RangePartitionBound upperBoundType) {
+    Preconditions.checkNotNull(lowerBound);
+    Preconditions.checkNotNull(upperBound);
+    Preconditions.checkArgument(
+        lowerBound.getSchema().equals(upperBound.getSchema()));
+    this.lowerBound = lowerBound;
+    this.upperBound = upperBound;
+    this.lowerBoundType = lowerBoundType;
+    this.upperBoundType = upperBoundType;
+  }
+
+  public PartialRow getLowerBound() {
+    return lowerBound;
+  }
+
+  public RangePartitionBound getLowerBoundType() {
+    return lowerBoundType;
+  }
+
+  public PartialRow getUpperBound() {
+    return upperBound;
+  }
+
+  public RangePartitionBound getUpperBoundType() {
+    return upperBoundType;
+  }
+}
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartitionWithCustomHashSchema.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartitionWithCustomHashSchema.java
new file mode 100644
index 000000000..df3c0c676
--- /dev/null
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RangePartitionWithCustomHashSchema.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.util.List;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.Common;
+
+/**
+ * This class represents a range partition with custom hash bucketing schema.
+ *
+ * See also RangePartition.
+ */
[email protected]
[email protected]
+public class RangePartitionWithCustomHashSchema extends RangePartition {
+  // Using the corresponding PB type to represent this range with its custom
+  // hash schema.
+  private Common.PartitionSchemaPB.RangeWithHashSchemaPB.Builder pb =
+      Common.PartitionSchemaPB.RangeWithHashSchemaPB.newBuilder();
+
+  /**
+   * @param lowerBound upper bound of the range partition
+   * @param upperBound lower bound of the range partition
+   * @param lowerBoundType lower bound type: inclusive/exclusive
+   * @param upperBoundType upper bound type: inclusive/exclusive
+   * @return new RangePartitionWithCustomHashSchema object
+   */
+  public RangePartitionWithCustomHashSchema(
+      PartialRow lowerBound,
+      PartialRow upperBound,
+      RangePartitionBound lowerBoundType,
+      RangePartitionBound upperBoundType) {
+    super(lowerBound, upperBound, lowerBoundType, upperBoundType);
+    pb.setRangeBounds(
+        new Operation.OperationsEncoder().encodeLowerAndUpperBounds(
+            lowerBound, upperBound, lowerBoundType, upperBoundType));
+  }
+
+  /**
+   * Add a level of hash sub-partitioning for this range partition.
+   *
+   * The hash schema for the range partition is defined by the whole set of
+   * its hash sub-partitioning levels. A range partition can have zero or
+   * multiple levels of hash sub-partitioning: this method can be called
+   * many times on the same object to define a multi-dimensional hash
+   * bucketing structure for the range.
+   *
+   * @param columns name of table's columns to use for hash bucketing
+   * @param numBuckets number of buckets used by the hash function
+   * @param seed the seed for the hash function
+   * @return this RangePartition object modified accordingly
+   */
+  public RangePartition addHashPartitions(
+      List<String> columns, int numBuckets, int seed) {
+    Common.PartitionSchemaPB.HashBucketSchemaPB.Builder b =
+        pb.addHashSchemaBuilder();
+    for (String column : columns) {
+      b.addColumnsBuilder().setName(column);
+    }
+    b.setNumBuckets(numBuckets);
+    b.setSeed(seed);
+    return this;
+  }
+
+  public Common.PartitionSchemaPB.RangeWithHashSchemaPB toPB() {
+    return pb.build();
+  }
+}
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
index 91d5f76b1..b200df152 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
@@ -94,8 +94,7 @@ public class TestKeyEncoding {
       columnIds.add(i);
     }
     return new PartitionSchema(
-        new PartitionSchema.RangeSchema(columnIds),
-        ImmutableList.of(), schema);
+        new PartitionSchema.RangeSchema(columnIds), ImmutableList.of(), 
schema);
   }
 
   /**
@@ -380,6 +379,62 @@ public class TestKeyEncoding {
                       });
   }
 
+  @Test
+  public void testPartitionKeyEncodingCustomHashSchema() {
+    Schema schema = buildSchema(
+        new ColumnSchemaBuilder("a", Type.INT32).key(true),
+        new ColumnSchemaBuilder("b", Type.STRING).key(true),
+        new ColumnSchemaBuilder("c", Type.STRING).key(true));
+
+    PartialRow lower = schema.newPartialRow();
+    lower.addInt("a", 0);
+    lower.addString("b", "B");
+    lower.addString("c", "C");
+
+    PartialRow upper = schema.newPartialRow();
+    upper.addInt("a", 10);
+    upper.addString("b", "b");
+    upper.addString("c", "c");
+
+    final PartitionSchema partitionSchema =
+        new PartitionSchema(
+            new RangeSchema(ImmutableList.of(0, 1, 2)),
+            ImmutableList.of(
+                new HashBucketSchema(ImmutableList.of(2), 3, 0)),
+            ImmutableList.of(
+                new PartitionSchema.RangeWithHashSchema(
+                    lower,
+                    upper,
+                    ImmutableList.of(new HashBucketSchema(ImmutableList.of(0, 
1), 32, 0)))),
+            schema);
+
+    // That's the row in the range having its own custom hash schema.
+    PartialRow rowA = schema.newPartialRow();
+    rowA.addInt("a", 1);
+    rowA.addString("b", "C");
+    rowA.addString("c", "D");
+    assertBytesEquals(KeyEncoder.encodePartitionKey(rowA, partitionSchema),
+        new byte[]{
+            0, 0, 0, 0x10,        // hash(1, "")
+            (byte) 0x80, 0, 0, 1, // a = 1
+            'C', 0, 0,            // b = "C"
+            'D'                   // c = "D"
+        });
+
+    // That's the row encoded with the table-wide hash schema.
+    PartialRow rowB = schema.newPartialRow();
+    rowB.addInt("a", 11);
+    rowB.addString("b", "");
+    rowB.addString("c", "d");
+    assertBytesEquals(KeyEncoder.encodePartitionKey(rowB, partitionSchema),
+        new byte[]{
+            0, 0, 0, 0x2,         // hash("d")
+            (byte) 0x80, 0, 0, 11,// a = 11
+            0, 0,                 // b = ""
+            'd'                   // c = "d"
+        });
+  }
+
   @Test(timeout = 100000)
   public void testAllPrimaryKeyTypes() throws Exception {
     Schema schema = buildSchema(
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 290ed3b7c..00e5d8f40 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -741,6 +741,109 @@ public class TestKuduClient {
     }
   }
 
+  /**
+   * Test inserting and retrieving rows from a table that has a range partition
+   * with custom hash schema.
+   */
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testRangeWithCustomHashSchema() throws Exception {
+    List<ColumnSchema> cols = new ArrayList<>();
+    cols.add(new ColumnSchema.ColumnSchemaBuilder("c0", 
Type.INT64).key(true).build());
+    cols.add(new ColumnSchema.ColumnSchemaBuilder("c1", 
Type.INT32).nullable(true).build());
+    Schema schema = new Schema(cols);
+
+    CreateTableOptions options = new CreateTableOptions();
+    options.setRangePartitionColumns(ImmutableList.of("c0"));
+    options.addHashPartitions(ImmutableList.of("c0"), 2);
+
+    // Add range partition with table-wide hash schema.
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addLong("c0", -100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addLong("c0", 100);
+      options.addRangePartition(lower, upper);
+    }
+
+    // Add a partition with custom hash schema.
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addLong("c0", 100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addLong("c0", 200);
+
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("c0"), 5, 0);
+      options.addRangePartition(rangePartition);
+    }
+
+    client.createTable(TABLE_NAME, schema, options);
+
+    KuduSession session = client.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+    KuduTable table = client.openTable(TABLE_NAME);
+
+    // Check the range with the table-wide hash schema.
+    {
+      for (int i = 0; i < 10; ++i) {
+        Insert insert = table.newInsert();
+        PartialRow row = insert.getRow();
+        row.addLong("c0", i);
+        row.addInt("c1", 1000 * i);
+        session.apply(insert);
+      }
+
+      // Scan all the rows in the table.
+      List<String> rowStringsAll = scanTableToStrings(table);
+      assertEquals(10, rowStringsAll.size());
+
+      // Now scan the rows that are in the range with the table-wide hash 
schema.
+      List<String> rowStrings = scanTableToStrings(table,
+          KuduPredicate.newComparisonPredicate(schema.getColumn("c0"), 
GREATER_EQUAL, 0),
+          KuduPredicate.newComparisonPredicate(schema.getColumn("c0"), LESS, 
100));
+      assertEquals(10, rowStrings.size());
+      for (int i = 0; i < rowStrings.size(); ++i) {
+        StringBuilder expectedRow = new StringBuilder();
+        expectedRow.append(String.format("INT64 c0=%d, INT32 c1=%d", i, 1000 * 
i));
+        assertEquals(expectedRow.toString(), rowStrings.get(i));
+      }
+    }
+
+    // Check the range with the custom hash schema.
+    {
+      for (int i = 100; i < 110; ++i) {
+        Insert insert = table.newInsert();
+        PartialRow row = insert.getRow();
+        row.addLong("c0", i);
+        row.addInt("c1", 2 * i);
+        session.apply(insert);
+      }
+
+      // Scan all the rows in the table.
+      List<String> rowStringsAll = scanTableToStrings(table);
+      assertEquals(20, rowStringsAll.size());
+
+      // Now scan the rows that are in the range with the custom hash schema.
+      List<String> rowStrings = scanTableToStrings(table,
+          KuduPredicate.newComparisonPredicate(schema.getColumn("c0"), 
GREATER_EQUAL, 100));
+      assertEquals(10, rowStrings.size());
+      for (int i = 0; i < rowStrings.size(); ++i) {
+        StringBuilder expectedRow = new StringBuilder();
+        expectedRow.append(String.format("INT64 c0=%d, INT32 c1=%d",
+            i + 100, 2 * (i + 100)));
+        assertEquals(expectedRow.toString(), rowStrings.get(i));
+      }
+    }
+  }
+
   /**
   * Test scanning with limits.
   */
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
index 3303d1151..5175ee95d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
@@ -17,6 +17,11 @@
 
 package org.apache.kudu.client;
 
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.EQUAL;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER_EQUAL;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS;
+import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
@@ -30,10 +35,14 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -483,6 +492,899 @@ public class TestKuduTable {
         client.openTable(tableName).getFormattedRangePartitions(10000));
   }
 
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testCreateTablePartitionWithEmptyCustomHashSchema() throws 
Exception {
+    PartialRow lower = basicSchema.newPartialRow();
+    lower.addInt(0, -100);
+    PartialRow upper = basicSchema.newPartialRow();
+    upper.addInt(0, 100);
+
+    CreateTableOptions builder = getBasicCreateTableOptions();
+
+    // Using an empty custom hash schema for the range.
+    RangePartitionWithCustomHashSchema rangePartition =
+        new RangePartitionWithCustomHashSchema(
+            lower,
+            upper,
+            RangePartitionBound.INCLUSIVE_BOUND,
+            RangePartitionBound.EXCLUSIVE_BOUND);
+    builder.addRangePartition(rangePartition);
+
+    KuduTable table = client.createTable(tableName, basicSchema, builder);
+
+    // Check the result: retrieve the information on tablets from master
+    // and check if each partition has the expected parameters.
+    {
+      for (KuduScanToken token : new 
KuduScanToken.KuduScanTokenBuilder(asyncClient, table)
+          .setTimeout(client.getDefaultOperationTimeoutMs()).build()) {
+        Partition p = token.getTablet().getPartition();
+        // No hash partitions are expected.
+        assertEquals(0, p.getHashBuckets().size());
+      }
+
+      final List<Partition> rangePartitions =
+          table.getRangePartitions(client.getDefaultOperationTimeoutMs());
+      assertEquals(1, rangePartitions.size());
+      final Partition p = rangePartitions.get(0);
+
+      assertTrue(p.getRangeKeyStart().length > 0);
+      PartialRow rangeKeyStartDecoded = p.getDecodedRangeKeyStart(table);
+      assertEquals(-100, rangeKeyStartDecoded.getInt(0));
+      assertTrue(p.getRangeKeyEnd().length > 0);
+      PartialRow rangeKeyEndDecoded = p.getDecodedRangeKeyEnd(table);
+      assertEquals(100, rangeKeyEndDecoded.getInt(0));
+    }
+
+    List<String> expected = Lists.newArrayList();
+    expected.add("-100 <= VALUES < 100");
+    assertEquals(
+        expected,
+        client.openTable(tableName).getFormattedRangePartitions(10000));
+  }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testCreateTablePartitionWithCustomHashSchema() throws Exception {
+    PartialRow lower = basicSchema.newPartialRow();
+    lower.addInt(0, -100);
+    PartialRow upper = basicSchema.newPartialRow();
+    upper.addInt(0, 200);
+
+    // Simple custom hash schema for the range: two buckets on the column 
"key".
+    RangePartitionWithCustomHashSchema rangePartition =
+        new RangePartitionWithCustomHashSchema(
+            lower,
+            upper,
+            RangePartitionBound.INCLUSIVE_BOUND,
+            RangePartitionBound.EXCLUSIVE_BOUND);
+    rangePartition.addHashPartitions(ImmutableList.of("key"), 2, 0);
+
+    CreateTableOptions builder = getBasicCreateTableOptions();
+    builder.addRangePartition(rangePartition);
+
+    // Add table-wide schema: it should have the same number of dimensions
+    // as the range-specific hash schema. However, this schema isn't used
+    // in this test scenario.
+    builder.addHashPartitions(ImmutableList.of("key"), 7, 0);
+
+    KuduTable table = client.createTable(tableName, basicSchema, builder);
+
+    List<String> expected = Lists.newArrayList();
+    expected.add("-100 <= VALUES < 200");
+    assertEquals(
+        expected,
+        client.openTable(tableName).getFormattedRangePartitions(10000));
+
+    // Check the result: retrieve the information on tablets from master
+    // and check if each partition has expected parameters.
+    {
+      Set<Integer> buckets = new HashSet();
+      for (KuduScanToken token : new 
KuduScanToken.KuduScanTokenBuilder(asyncClient, table)
+          .setTimeout(client.getDefaultOperationTimeoutMs()).build()) {
+        Partition p = token.getTablet().getPartition();
+        // Two hash partitions are expected per range.
+        assertEquals(1, p.getHashBuckets().size());
+        for (Integer idx : p.getHashBuckets()) {
+          buckets.add(idx);
+        }
+      }
+      assertEquals(2, buckets.size());
+      assertTrue(buckets.contains(0));
+      assertTrue(buckets.contains(1));
+
+      final List<Partition> rangePartitions =
+          table.getRangePartitions(client.getDefaultOperationTimeoutMs());
+      assertEquals(1, rangePartitions.size());
+      final Partition p = rangePartitions.get(0);
+
+      assertTrue(p.getRangeKeyStart().length > 0);
+      PartialRow rangeKeyStartDecoded = p.getDecodedRangeKeyStart(table);
+      assertEquals(-100, rangeKeyStartDecoded.getInt(0));
+      assertTrue(p.getRangeKeyEnd().length > 0);
+      PartialRow rangeKeyEndDecoded = p.getDecodedRangeKeyEnd(table);
+      assertEquals(200, rangeKeyEndDecoded.getInt(0));
+    }
+  }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testRangePartitionWithCustomHashSchemaBasic() throws Exception {
+    final int valLower = 10;
+    final int valUpper = 20;
+
+    PartialRow lower = basicSchema.newPartialRow();
+    lower.addInt(0, valLower);
+    PartialRow upper = basicSchema.newPartialRow();
+    upper.addInt(0, valUpper);
+
+    // Simple custom hash schema for the range: five buckets on the column 
"key".
+    RangePartitionWithCustomHashSchema rangePartition =
+        new RangePartitionWithCustomHashSchema(
+            lower,
+            upper,
+            RangePartitionBound.INCLUSIVE_BOUND,
+            RangePartitionBound.EXCLUSIVE_BOUND);
+    rangePartition.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+    CreateTableOptions builder = getBasicCreateTableOptions();
+    builder.addRangePartition(rangePartition);
+    // Add table-wide schema: it should have the same number of dimensions
+    // as the range-specific hash schema. However, this schema isn't used
+    // in this test scenario.
+    builder.addHashPartitions(ImmutableList.of("key"), 32, 0);
+
+    final KuduTable table = client.createTable(tableName, basicSchema, 
builder);
+    final PartitionSchema ps = table.getPartitionSchema();
+    assertTrue(ps.hasCustomHashSchemas());
+    assertFalse(ps.isSimpleRangePartitioning());
+
+    // NOTE: use schema from server since ColumnIDs are needed for row encoding
+    final PartialRow rowLower = table.getSchema().newPartialRow();
+    rowLower.addInt(0, valLower);
+
+    final PartialRow rowUpper = table.getSchema().newPartialRow();
+    rowUpper.addInt(0, valUpper);
+
+    {
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(rowLower, ps.getRangeSchema()));
+      // There should be just one dimension with five buckets.
+      assertEquals(1, s.size());
+      assertEquals(5, s.get(0).getNumBuckets());
+    }
+    {
+      // There should be 5 partitions: the newly created table has a single
+      // range with 5 hash buckets, but KuduTable.getRangePartitions() removes
+      // the 'duplicates' with hash code other than 0. So, the result should be
+      // just one partition with hash code 0.
+      List<Partition> partitions = table.getRangePartitions(50000);
+      assertEquals(1, partitions.size());
+      List<Integer> buckets = partitions.get(0).getHashBuckets();
+      assertEquals(1, buckets.size());  // there is just one hash dimension
+      assertEquals(0, buckets.get(0).intValue());
+    }
+    {
+      final byte[] rowLowerEnc = ps.encodePartitionKey(rowLower);
+      final byte[] rowUpperEnc = ps.encodePartitionKey(rowUpper);
+
+      // The range part comes after the hash part in an encoded partition key.
+      // The hash part contains 4 * number_of_hash_dimensions bytes.
+      byte[] hashLower = Arrays.copyOfRange(rowLowerEnc, 4, 
rowLowerEnc.length);
+      byte[] hashUpper = Arrays.copyOfRange(rowUpperEnc, 4, 
rowUpperEnc.length);
+
+      Set<Integer> buckets = new HashSet();
+      for (KuduScanToken token : new 
KuduScanToken.KuduScanTokenBuilder(asyncClient, table)
+          .setTimeout(client.getDefaultOperationTimeoutMs()).build()) {
+        final Partition p = token.getTablet().getPartition();
+        assertEquals(0, Bytes.memcmp(p.getRangeKeyStart(), hashLower));
+        assertEquals(0, Bytes.memcmp(p.getRangeKeyEnd(), hashUpper));
+        assertEquals(1, p.getHashBuckets().size());
+        buckets.add(p.getHashBuckets().get(0));
+      }
+
+      // Check the generated scan tokens cover all the tablets for the range:
+      // all hash bucket indices should be present.
+      assertEquals(5, buckets.size());
+      assertTrue(buckets.contains(0));
+      assertTrue(buckets.contains(1));
+      assertTrue(buckets.contains(2));
+      assertTrue(buckets.contains(3));
+      assertTrue(buckets.contains(4));
+    }
+  }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testCreateTableCustomHashSchemasTwoRanges() throws Exception {
+    CreateTableOptions builder = getBasicCreateTableOptions();
+
+    {
+      PartialRow lower = basicSchema.newPartialRow();
+      lower.addInt(0, 0);
+      PartialRow upper = basicSchema.newPartialRow();
+      upper.addInt(0, 100);
+
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("key"), 2, 0);
+      builder.addRangePartition(rangePartition);
+    }
+
+    {
+      PartialRow lower = basicSchema.newPartialRow();
+      lower.addInt(0, 100);
+      PartialRow upper = basicSchema.newPartialRow();
+      upper.addInt(0, 200);
+
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("key"), 3, 0);
+      builder.addRangePartition(rangePartition);
+    }
+
+    // Add table-wide schema as well -- that's to satisfy the constraint on
+    // the number of hash dimensions in table's hash schemas. However, this
+    // scenario isn't going to create a range with table-wide hash schema.
+    builder.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+    KuduTable table = client.createTable(tableName, basicSchema, builder);
+
+    // Check the result: retrieve the information on tablets from master
+    // and check if each partition has expected parameters.
+    List<LocatedTablet> tablets = table.getTabletsLocations(10000);
+    // There should be 5 tablets: 2 for [0, 100) range and 3 for [100, 200).
+    assertEquals(5, tablets.size());
+
+    List<String> expected = Lists.newArrayList();
+    expected.add("0 <= VALUES < 100");
+    expected.add("100 <= VALUES < 200");
+    assertEquals(
+        expected,
+        client.openTable(tableName).getFormattedRangePartitions(10000));
+
+    // Insert data into the newly created table and read it back.
+    KuduSession session = client.newSession();
+    for (int key = 0; key < 200; ++key) {
+      Insert insert = createBasicSchemaInsert(table, key);
+      session.apply(insert);
+    }
+    session.flush();
+
+    // Do full table scan.
+    List<String> rowStrings = scanTableToStrings(table);
+    assertEquals(200, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, -1));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 0));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 1));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 99));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 100));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 101));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 199));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 200));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 201));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER, 0));
+    assertEquals(199, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER, 100));
+    assertEquals(99, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER, 199));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER, 200));
+    assertEquals(0, rowStrings.size());
+
+    // Predicate to have all rows in the range with table-wide hash schema.
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 100));
+    assertEquals(100, rowStrings.size());
+
+    // Predicate to have all rows in the range with custom hash schema.
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 100),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 200));
+    assertEquals(100, rowStrings.size());
+
+    // Predicate to have one part of the rows in the range with table-wide hash
+    // schema, and the other part from the range with custom hash schema.
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 50),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 150));
+    assertEquals(100, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 150));
+    assertEquals(150, rowStrings.size());
+
+    // Predicates to almost cover the both ranges (sort of off-by-one 
situation).
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 1),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 199));
+    assertEquals(198, rowStrings.size());
+
+    // Predicates to almost cover the both ranges (sort of off-by-one 
situation).
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 1),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 200));
+    assertEquals(199, rowStrings.size());
+
+    // Predicates to almost cover the both ranges (sort of off-by-one 
situation).
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 199));
+    assertEquals(199, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 199));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS_EQUAL, 0));
+    assertEquals(1, rowStrings.size());
+
+    // Predicate to cover exactly both ranges.
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 200));
+    assertEquals(200, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 200));
+    assertEquals(200, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 0));
+    assertEquals(200, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 200));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 0));
+    assertEquals(0, rowStrings.size());
+  }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testCreateTableCustomHashSchemasTwoMixedRanges() throws 
Exception {
+    CreateTableOptions builder = getBasicCreateTableOptions();
+
+    {
+      PartialRow lower = basicSchema.newPartialRow();
+      lower.addInt(0, 0);
+      PartialRow upper = basicSchema.newPartialRow();
+      upper.addInt(0, 100);
+
+      // Simple custom hash schema for the range: two buckets on the column 
"key".
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("key"), 2, 0);
+      builder.addRangePartition(rangePartition);
+    }
+
+    // Add table-wide schema as well.
+    builder.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+    // Add a range to have the table-wide hash schema.
+    {
+      PartialRow lower = basicSchema.newPartialRow();
+      lower.addInt(0, 100);
+      PartialRow upper = basicSchema.newPartialRow();
+      upper.addInt(0, 200);
+      builder.addRangePartition(lower, upper);
+    }
+
+    KuduTable table = client.createTable(tableName, basicSchema, builder);
+
+    // Check the result: retrieve the information on tablets from master
+    // and check if each partition has expected parameters.
+    List<LocatedTablet> tablets = table.getTabletsLocations(10000);
+    // There should be 7 tablets: 2 for the [0, 100) range and 5 for [100, 
200).
+    assertEquals(7, tablets.size());
+
+    List<String> expected = Lists.newArrayList();
+    expected.add("0 <= VALUES < 100");
+    expected.add("100 <= VALUES < 200");
+    assertEquals(
+        expected,
+        client.openTable(tableName).getFormattedRangePartitions(10000));
+
+    // Insert data into the newly created table and read it back.
+    KuduSession session = client.newSession();
+    for (int key = 0; key < 200; ++key) {
+      Insert insert = createBasicSchemaInsert(table, key);
+      session.apply(insert);
+    }
+    session.flush();
+
+    // Do full table scan.
+    List<String> rowStrings = scanTableToStrings(table);
+    assertEquals(200, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, -1));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 0));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 1));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 99));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 100));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 101));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 199));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 200));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
EQUAL, 201));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER, 0));
+    assertEquals(199, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER, 100));
+    assertEquals(99, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER, 199));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER, 200));
+    assertEquals(0, rowStrings.size());
+
+    // Predicate to have all rows in the range with table-wide hash schema.
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 100));
+    assertEquals(100, rowStrings.size());
+
+    // Predicate to have all rows in the range with custom hash schema.
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 100),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 200));
+    assertEquals(100, rowStrings.size());
+
+    // Predicate to have one part of the rows in the range with table-wide hash
+    // schema, and the other part from the range with custom hash schema.
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 50),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 150));
+    assertEquals(100, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 150));
+    assertEquals(150, rowStrings.size());
+
+    // Predicates to almost cover the both ranges (sort of off-by-one 
situation).
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 1),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 199));
+    assertEquals(198, rowStrings.size());
+
+    // Predicates to almost cover the both ranges (sort of off-by-one 
situation).
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 1),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 200));
+    assertEquals(199, rowStrings.size());
+
+    // Predicates to almost cover the both ranges (sort of off-by-one 
situation).
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 199));
+    assertEquals(199, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 199));
+    assertEquals(1, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS_EQUAL, 0));
+    assertEquals(1, rowStrings.size());
+
+    // Predicate to cover exactly both ranges.
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 0),
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 200));
+    assertEquals(200, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 200));
+    assertEquals(200, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 0));
+    assertEquals(200, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
GREATER_EQUAL, 200));
+    assertEquals(0, rowStrings.size());
+
+    rowStrings = scanTableToStrings(table,
+        KuduPredicate.newComparisonPredicate(basicSchema.getColumn("key"), 
LESS, 0));
+    assertEquals(0, rowStrings.size());
+  }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testCreateTableCustomHashSchemaDifferentDimensions() throws 
Exception {
+    // Have the table-wide hash schema different from custom hash schema per
+    // various ranges: it should not be possible to create a table.
+    ArrayList<ColumnSchema> columns = new ArrayList<>(3);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0i", 
Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1i", 
Type.INT64).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c2s", 
Type.STRING).key(true).build());
+    final Schema schema = new Schema(columns);
+
+    CreateTableOptions builder = new 
CreateTableOptions().setRangePartitionColumns(
+        ImmutableList.of("c0i"));
+
+    // Add table-wide schema with two hash dimensions.
+    builder.addHashPartitions(ImmutableList.of("c1i"), 7, 0);
+    builder.addHashPartitions(ImmutableList.of("c2s"), 3, 0);
+
+    // Simple custom hash schema for the range: two buckets on the column 
"key".
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt(0, -100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt(0, 200);
+
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("c0i"), 2, 0);
+      builder.addRangePartition(rangePartition);
+    }
+
+    try {
+      client.createTable(tableName, schema, builder);
+      fail("shouldn't be able to create a table with hash schemas varying in " 
+
+          "number of hash dimensions across table partitions");
+    } catch (KuduException ex) {
+      assertTrue(ex.getStatus().isNotSupported());
+      final String errmsg = ex.getMessage();
+      assertTrue(errmsg, errmsg.matches(
+          "varying number of hash dimensions per range is not yet supported"));
+    }
+
+    // OK, now try a mixed case: one range with hash schema matching the number
+    // of hash dimensions of the table-wide hash schema, and a few more ranges
+    // with different number of hash dimensions in their hash schema.
+    // Simple custom hash schema for the range: two buckets on the column 
"key".
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addInt(0, 200);
+      PartialRow upper = schema.newPartialRow();
+      upper.addInt(0, 300);
+
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("c0i"), 2, 0);
+      rangePartition.addHashPartitions(ImmutableList.of("c1i"), 3, 0);
+      builder.addRangePartition(rangePartition);
+    }
+
+    try {
+      client.createTable(tableName, schema, builder);
+      fail("shouldn't be able to create a table with hash schemas varying in " 
+
+          "number of hash dimensions across table partitions");
+    } catch (KuduException ex) {
+      assertTrue(ex.getStatus().isNotSupported());
+      final String errmsg = ex.getMessage();
+      assertTrue(errmsg, errmsg.matches(
+          "varying number of hash dimensions per range is not yet supported"));
+    }
+  }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testGetHashSchemaForRange() throws Exception {
+    final int valLower = 100;
+    final int valUpper = 200;
+
+    PartialRow lower = basicSchema.newPartialRow();
+    lower.addInt(0, valLower);
+    PartialRow upper = basicSchema.newPartialRow();
+    upper.addInt(0, valUpper);
+
+    // Custom hash schema for the range: three buckets on the column "key".
+    RangePartitionWithCustomHashSchema rangePartition =
+        new RangePartitionWithCustomHashSchema(
+            lower,
+            upper,
+            RangePartitionBound.INCLUSIVE_BOUND,
+            RangePartitionBound.EXCLUSIVE_BOUND);
+    rangePartition.addHashPartitions(ImmutableList.of("key"), 3, 0);
+
+    CreateTableOptions builder = getBasicCreateTableOptions();
+    builder.addRangePartition(rangePartition);
+
+    // Add table-wide schema with one dimensions and five buckets.
+    builder.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+    final KuduTable table = client.createTable(tableName, basicSchema, 
builder);
+    final PartitionSchema ps = table.getPartitionSchema();
+
+    // Should get the table-wide schema as the result when asking for a point
+    // in a non-covered range.
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 99);
+
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(5, s.get(0).getNumBuckets());
+    }
+
+    // The exact range boundary: should get the custom hash schema.
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 100);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(3, s.get(0).getNumBuckets());
+    }
+
+    // A value within the range: should get the custom hash schema.
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 101);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(3, s.get(0).getNumBuckets());
+    }
+
+    // Should get the table-wide schema as the result when asking for the
+    // upper exclusive boundary.
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 200);
+
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(5, s.get(0).getNumBuckets());
+    }
+
+    // Should get the table-wide schema as the result when asking for a point
+    // in a non-covered range.
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 300);
+
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      // There should be just one dimension with two buckets.
+      assertEquals(1, s.size());
+      assertEquals(5, s.get(0).getNumBuckets());
+    }
+  }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testGetHashSchemaForRangeUnbounded() throws Exception {
+    // The test table is created with the following ranges:
+    //   (-inf, -100) [-100, 0) [0, 100), [100, +inf)
+
+    CreateTableOptions builder = getBasicCreateTableOptions();
+    // Add table-wide schema with one dimensions and two buckets.
+    builder.addHashPartitions(ImmutableList.of("key"), 2, 0);
+
+    // Add range partition with custom hash schema: (-inf, -100)
+    {
+      PartialRow lower = basicSchema.newPartialRow();
+      PartialRow upper = basicSchema.newPartialRow();
+      upper.addInt(0, -100);
+
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("key"), 3, 0);
+
+      builder.addRangePartition(rangePartition);
+    }
+
+    // Add range partition with table-wide hash schema: [-100, 0)
+    {
+      PartialRow lower = basicSchema.newPartialRow();
+      lower.addInt(0, -100);
+      PartialRow upper = basicSchema.newPartialRow();
+      upper.addInt(0, 0);
+
+      builder.addRangePartition(lower, upper);
+    }
+
+    // Add range partition with custom hash schema: [0, 100)
+    {
+      PartialRow lower = basicSchema.newPartialRow();
+      lower.addInt(0, 0);
+      PartialRow upper = basicSchema.newPartialRow();
+      upper.addInt(0, 100);
+
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("key"), 5, 0);
+
+      builder.addRangePartition(rangePartition);
+    }
+
+    // Add range partition with table-wide hash schema: [100, +inf)
+    {
+      PartialRow lower = basicSchema.newPartialRow();
+      lower.addInt(0, 100);
+      PartialRow upper = basicSchema.newPartialRow();
+
+      builder.addRangePartition(lower, upper);
+    }
+
+    final KuduTable table = client.createTable(tableName, basicSchema, 
builder);
+    final PartitionSchema ps = table.getPartitionSchema();
+
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, -2002);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(3, s.get(0).getNumBuckets());
+    }
+
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, -101);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(3, s.get(0).getNumBuckets());
+    }
+
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, -100);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(2, s.get(0).getNumBuckets());
+    }
+
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 0);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(5, s.get(0).getNumBuckets());
+    }
+
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 99);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(5, s.get(0).getNumBuckets());
+    }
+
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 100);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(2, s.get(0).getNumBuckets());
+    }
+
+    {
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, 1001);
+      final List<PartitionSchema.HashBucketSchema> s = 
ps.getHashSchemaForRange(
+          KeyEncoder.encodeRangePartitionKey(row, ps.getRangeSchema()));
+      assertEquals(1, s.size());
+      assertEquals(2, s.get(0).getNumBuckets());
+    }
+  }
+
   @Test(timeout = 100000)
   public void testFormatRangePartitionsCompoundColumns() throws Exception {
     ArrayList<ColumnSchema> columns = new ArrayList<>();
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
index 8b6e0d792..5cb2c15a6 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestOperation.java
@@ -198,7 +198,7 @@ public class TestOperation {
         lower, upper, RangePartitionBound.INCLUSIVE_BOUND, 
RangePartitionBound.EXCLUSIVE_BOUND);
 
     Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
-    List<CreateTableOptions.RangePartition> decoded =
+    List<RangePartition> decoded =
         dec.decodeRangePartitions(encoded, schema);
     assertEquals(1, decoded.size());
     assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
@@ -241,8 +241,7 @@ public class TestOperation {
         lower, upper, RangePartitionBound.INCLUSIVE_BOUND, 
RangePartitionBound.EXCLUSIVE_BOUND);
 
     Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
-    List<CreateTableOptions.RangePartition> decoded =
-        dec.decodeRangePartitions(encoded, schema);
+    List<RangePartition> decoded = dec.decodeRangePartitions(encoded, schema);
     assertEquals(1, decoded.size());
     assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
         decoded.get(0).getLowerBoundType());
@@ -294,8 +293,7 @@ public class TestOperation {
         lower, upper, RangePartitionBound.INCLUSIVE_BOUND, 
RangePartitionBound.EXCLUSIVE_BOUND);
 
     Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
-    List<CreateTableOptions.RangePartition> decoded =
-        dec.decodeRangePartitions(encoded, schema);
+    List<RangePartition> decoded = dec.decodeRangePartitions(encoded, schema);
     assertEquals(1, decoded.size());
     assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
         decoded.get(0).getLowerBoundType());
@@ -338,14 +336,14 @@ public class TestOperation {
     columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", 
Type.INT64).build());
     final Schema schema = new Schema(columns);
 
-    List<CreateTableOptions.RangePartition> rangePartitions = new 
ArrayList<>();
+    List<RangePartition> rangePartitions = new ArrayList<>();
     {
       final PartialRow lower = schema.newPartialRow();
       lower.addInt("c0", 0);
 
       final PartialRow upper = schema.newPartialRow();
       upper.addInt("c0", 100);
-      rangePartitions.add(new CreateTableOptions.RangePartition(
+      rangePartitions.add(new RangePartition(
           lower,
           upper,
           RangePartitionBound.INCLUSIVE_BOUND,
@@ -357,7 +355,7 @@ public class TestOperation {
 
       final PartialRow upper = schema.newPartialRow();
       upper.addInt("c0", 300);
-      rangePartitions.add(new CreateTableOptions.RangePartition(
+      rangePartitions.add(new RangePartition(
           lower,
           upper,
           RangePartitionBound.EXCLUSIVE_BOUND,
@@ -369,8 +367,7 @@ public class TestOperation {
         rangePartitions, ImmutableList.of());
 
     Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
-    List<CreateTableOptions.RangePartition> decoded =
-        dec.decodeRangePartitions(encoded, schema);
+    List<RangePartition> decoded = dec.decodeRangePartitions(encoded, schema);
     assertEquals(2, decoded.size());
 
     assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
@@ -417,7 +414,7 @@ public class TestOperation {
     columns.add(new ColumnSchema.ColumnSchemaBuilder("c3", 
Type.STRING).nullable(true).build());
     final Schema schema = new Schema(columns);
 
-    List<CreateTableOptions.RangePartition> rangePartitions = new 
ArrayList<>();
+    List<RangePartition> rangePartitions = new ArrayList<>();
     {
       final PartialRow lower = schema.newPartialRow();
       lower.addInt("c0", 0);
@@ -426,7 +423,7 @@ public class TestOperation {
       final PartialRow upper = schema.newPartialRow();
       upper.addInt("c0", 100);
       upper.addString("c1", "c");
-      rangePartitions.add(new CreateTableOptions.RangePartition(
+      rangePartitions.add(new RangePartition(
           lower,
           upper,
           RangePartitionBound.INCLUSIVE_BOUND,
@@ -440,7 +437,7 @@ public class TestOperation {
       final PartialRow upper = schema.newPartialRow();
       upper.addInt("c0", 300);
       upper.addString("c1", "f");
-      rangePartitions.add(new CreateTableOptions.RangePartition(
+      rangePartitions.add(new RangePartition(
           lower,
           upper,
           RangePartitionBound.EXCLUSIVE_BOUND,
@@ -452,8 +449,7 @@ public class TestOperation {
         rangePartitions, ImmutableList.of());
 
     Operation.OperationsDecoder dec = new Operation.OperationsDecoder();
-    List<CreateTableOptions.RangePartition> decoded =
-        dec.decodeRangePartitions(encoded, schema);
+    List<RangePartition> decoded = dec.decodeRangePartitions(encoded, schema);
     assertEquals(2, decoded.size());
 
     assertEquals(RangePartitionBound.INCLUSIVE_BOUND,
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
index 4bc01b294..60e576a4a 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
@@ -876,4 +877,222 @@ public class TestPartitionPruner {
     checkPartitions(0, 0, table, partitions,
                     KuduPredicate.newIsNullPredicate(timestamp));
   }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.MasterServerConfig(flags = {
+      "--enable_per_range_hash_schemas=true",
+  })
+  public void testPruningWithCustomHashSchemas() throws Exception {
+    // CREATE TABLE timeseries
+    //   (host STRING, metric STRING, timestamp UNIXTIME_MICROS, value DOUBLE)
+    // PRIMARY KEY (host, metric, timestamp)
+    //
+    //    RANGE(timestamp)
+    //        (PARTITION VALUES >= 0,
+    //         PARTITION VALUES <  100)
+    //    HASH (host, metric) 2 PARTITIONS,
+    //
+    //    RANGE(timestamp)
+    //        (PARTITION VALUES >= 100,
+    //         PARTITION VALUES <  200)
+    //    HASH (host) 5 PARTITIONS
+
+    ColumnSchema host =
+        new ColumnSchema.ColumnSchemaBuilder("host", 
Type.STRING).key(true).build();
+    ColumnSchema metric =
+        new ColumnSchema.ColumnSchemaBuilder("metric", 
Type.STRING).key(true).build();
+    ColumnSchema timestamp =
+        new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS)
+            .key(true).build();
+    ColumnSchema value = new ColumnSchema.ColumnSchemaBuilder("value", 
Type.DOUBLE).build();
+    final Schema schema = new Schema(ImmutableList.of(host, metric, timestamp, 
value));
+
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(ImmutableList.of("timestamp"));
+
+    // Add range partition with table-wide hash schema.
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addLong("timestamp", 0);
+      PartialRow upper = schema.newPartialRow();
+      upper.addLong("timestamp", 100);
+      tableBuilder.addRangePartition(lower, upper);
+    }
+
+    // Add range partition with custom hash schema.
+    {
+      PartialRow lower = schema.newPartialRow();
+      lower.addLong("timestamp", 100);
+      PartialRow upper = schema.newPartialRow();
+      upper.addLong("timestamp", 200);
+
+      RangePartitionWithCustomHashSchema rangePartition =
+          new RangePartitionWithCustomHashSchema(
+              lower,
+              upper,
+              RangePartitionBound.INCLUSIVE_BOUND,
+              RangePartitionBound.EXCLUSIVE_BOUND);
+      rangePartition.addHashPartitions(ImmutableList.of("host"), 5, 0);
+
+      tableBuilder.addRangePartition(rangePartition);
+    }
+
+    // Add table-wide hash schema.
+    tableBuilder.addHashPartitions(ImmutableList.of("host", "metric"), 2);
+
+    String tableName = "testPruningCHS";
+    client.createTable(tableName, schema, tableBuilder);
+    KuduTable table = client.openTable(tableName);
+
+    final List<Partition> partitions = getTablePartitions(table);
+    assertEquals(7, partitions.size());
+
+    // No Predicates
+    checkPartitions(7, 9, table, partitions);
+
+    checkPartitions(7, 7, table, partitions,
+        KuduPredicate.newComparisonPredicate(timestamp, 
ComparisonOp.GREATER_EQUAL, 50),
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 
150));
+
+    // host = "a"
+    // 2 tablets from the HASH(host, metric) range and 1 from the HASH(host) 
range.
+    checkPartitions(3, 5, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"));
+
+    // host = "a"
+    // metric = "a"
+    checkPartitions(2, 3, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp >= 9;
+    checkPartitions(2, 3, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(timestamp, 
ComparisonOp.GREATER_EQUAL, 9));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp >= 10;
+    // timestamp < 20;
+    checkPartitions(1, 1, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(timestamp, 
ComparisonOp.GREATER_EQUAL, 10),
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 
20));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp >= 100;
+    // timestamp < 200;
+    checkPartitions(1, 1, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(timestamp, 
ComparisonOp.GREATER_EQUAL, 10),
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 
20));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp < 10;
+    checkPartitions(1, 1, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 
10));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp < 100;
+    checkPartitions(1, 1, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.LESS, 
100));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp >= 10;
+    checkPartitions(2, 3, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(timestamp, 
ComparisonOp.GREATER_EQUAL, 10));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp >= 100;
+    checkPartitions(1, 2, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(timestamp, 
ComparisonOp.GREATER_EQUAL, 100));
+
+    // host = "a"
+    // metric = "a"
+    // timestamp = 100;
+    checkPartitions(1, 1, table, partitions,
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(metric, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 
100));
+
+    final byte[] hash1 = new byte[] { 0, 0, 0, 1 };
+
+    // partition key < (hash=1)
+    // scan partitions: 1 + 1 + 1
+    checkPartitions(2, 3, table, partitions, null, hash1);
+
+    // partition key >= (hash=1)
+    // scan partitions: 1 + 4 + 1
+    checkPartitions(5, 6, table, partitions, hash1, null);
+
+    // timestamp = 10
+    // partition key < (hash=1)
+    // scan partitions: 0 + 1 + 0
+    checkPartitions(1, 1, table, partitions, null, hash1,
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 
10));
+
+    // timestamp = 10
+    // partition key >= (hash=1)
+    checkPartitions(1, 1, table, partitions, hash1, null,
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 
10));
+
+    // timestamp = 100
+    // partition key >= (hash=1)
+    checkPartitions(4, 4, table, partitions, hash1, null,
+        KuduPredicate.newComparisonPredicate(timestamp, ComparisonOp.EQUAL, 
100));
+
+    // timestamp IN (99, 100)
+    // host = "a"
+    // metric IN ("foo", "baz")
+    checkPartitions(2, 2, table, partitions,
+        KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(99L, 
100L)),
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newInListPredicate(metric, ImmutableList.of("foo", 
"baz")));
+
+    // timestamp IN (100, 199)
+    // host = "a"
+    // metric IN ("foo", "baz")
+    checkPartitions(1, 1, table, partitions,
+        KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(100L, 
199L)),
+        KuduPredicate.newComparisonPredicate(host, ComparisonOp.EQUAL, "a"),
+        KuduPredicate.newInListPredicate(metric, ImmutableList.of("foo", 
"baz")));
+
+    // timestamp IN (0, 10)
+    checkPartitions(2, 2, table, partitions,
+        KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(0L, 
10L)));
+
+    // timestamp IN (100, 110)
+    checkPartitions(5, 5, table, partitions,
+        KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(100L, 
110L)));
+
+    // timestamp IN (99, 100)
+    checkPartitions(7, 7, table, partitions,
+        KuduPredicate.newInListPredicate(timestamp, ImmutableList.of(99L, 
100L)));
+
+    // timestamp IS NOT NULL
+    checkPartitions(7, 9, table, partitions,
+        KuduPredicate.newIsNotNullPredicate(timestamp));
+
+    // timestamp IS NULL
+    checkPartitions(0, 0, table, partitions,
+        KuduPredicate.newIsNullPredicate(timestamp));
+  }
 }

Reply via email to